Conector de Azure SQL Database y SQL Server para Apache Spark

El conector de Apache Spark para Azure SQL Database y SQL Server permite que estas bases de datos actúen como orígenes de datos de entrada y receptores de datos de salida en los trabajos de Apache Spark. Permite usar datos transaccionales en tiempo real en análisis de macrodatos y conservar los resultados para informes o consultas ad hoc.

En comparación con el conector JDBC integrado, este conector proporciona la capacidad para insertar datos de forma masiva en bases de datos SQL. Puede mejorar el rendimiento de la inserción de fila en fila, ya que puede insertar datos entre 10 y 20 veces más rápido. El conector de Spark para SQL Server y Azure SQL Database también admite la autenticaciónde Microsoft Entra, lo que permite conectarse de forma segura a las bases de datos de Azure SQL Database desde Azure Synapse Analytics.

En este artículo se explica cómo usar DataFrame API para conectarse a bases de datos SQL mediante el conector de MS SQL. En este artículo se proporcionan ejemplos detallados del uso de PySpark API. Para ver todos los ejemplos y argumentos admitidos para conectarse a bases de datos SQL mediante el conector de MS SQL, consulte Ejemplos de Azure Data SQL.

Detalles de conexión

En este ejemplo, usaremos las utilidades de Spark para Microsoft para facilitar la adquisición de secretos de una instancia de Key Vault preconfigurada. Para más información sobre las utilidades de Spark para Microsoft, visite Introducción a las utilidades de Spark para Microsoft.

# The servername is in the format "jdbc:sqlserver://<AzureSQLServerName>.database.windows.net:1433"
servername = "<< server name >>"
dbname = "<< database name >>"
url = servername + ";" + "databaseName=" + dbname + ";"
dbtable = "<< table name >> "
user = "<< username >>" 
principal_client_id = "<< service principal client id >>" 
principal_secret = "<< service principal secret ho>>"
password = mssparkutils.credentials.getSecret('azure key vault name','secret name')

Nota:

Actualmente, no hay ningún servicio vinculado ni compatibilidad de paso a través de Microsoft Entra con el conector de Azure SQL.

Uso del conector de Azure SQL y SQL Server

Lectura de datos

#Read from SQL table using MS SQL Connector
print("read data from SQL server table  ")
jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", dbtable) \
        .option("user", user) \
        .option("password", password).load()

jdbcDF.show(5)

Escritura de datos

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", dbtable) \
    .option("user", user) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("MSSQL Connector write failed", error)

print("MSSQL Connector write(overwrite) succeeded  ")

Anexión de datos

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Autenticación de Microsoft Entra

Ejemplo de Python con una entidad de servicio

import msal

# Located in App Registrations from Azure Portal
tenant_id = "<< tenant id >> "

# Located in App Registrations from Azure Portal
resource_app_id_url = "https://database.windows.net/"

# Define scope of the Service for the app registration before requesting from AAD
scope ="https://database.windows.net/.default"

# Authority
authority = "https://login.microsoftonline.net/" + tenant_id

# Get service principal 
service_principal_id = mssparkutils.credentials.getSecret('azure key vault name','principal_client_id')
service_principal_secret = mssparkutils.credentials.getSecret('azure key vault name','principal_secret')


context = msal.ConfidentialClientApplication(
    service_principal_id, service_principal_secret, authority
    )

token = app.acquire_token_silent([scope])
access_token = token["access_token"]

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", dbtable) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Ejemplo de Python con una contraseña de Active Directory

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Importante

  • Se debe instalar una dependencia necesaria para poder autenticarse mediante Active Directory.
  • El formato de user cuando se usa ActiveDirectoryPassword debe ser el formato UPN, por ejemplo username@domainname.com.
    • Para Scala, se debe instalar el artefacto com.microsoft.aad.adal4j.
    • Para Python, se debe instalar la biblioteca adal. Está disponible mediante PIP.
  • Consulte los cuadernos de ejemplo para ver ejemplos; y para obtener los controladores y versiones más recientes, visite Conector de Apache Spark: SQL Server y Azure SQL.

Soporte técnico

El conector de Apache Spark para Azure SQL y SQL Server es un proyecto de código abierto. Este conector no incluye soporte técnico de Microsoft. Si surgen problemas con el conector o preguntas relacionadas, cree una incidencia en este repositorio del proyecto. La comunidad de conectores está activa y supervisa las consultas.

Pasos siguientes