Banco de Dados SQL do Azure e conector do SQL Server para Apache Spark

O conector do Apache Spark para Banco de Dados SQL do Azure e SQL Server permite que esses bancos de dados atuem como fontes de dados de entrada e coletores de dados de saída para trabalhos do Apache Spark. Ele permite que você utilize dados transacionais em tempo real na análise de Big Data e persiste resultados para relatórios ou consultas ad hoc.

Em comparação com o conector interno do JDBC, este conector fornece a capacidade de inserir dados em massa em bancos de dados SQL. Ele pode superar a inserção de linha por linha com desempenho de 10 a 20 vezes mais rápido. O conector do Spark para SQL Server e Banco de Dados SQL do Azure também dá suporte à autenticação do Azure AD (Azure Active Directory), permitindo que você se conecte com segurança aos bancos de dados SQL do Azure do Synapse Analytics.

Este artigo aborda como usar a API do DataFrame para se conectar a bancos de dados SQL usando o conector de SQL do MS. Este artigo fornece exemplos detalhados usando a API do PySpark. Para todos os argumentos e exemplos com suporte para se conectar a bancos de dados SQL usando o conector de SQL do MS, confira Exemplos do SQL de Dados do Azure.

Detalhes da conexão

Neste exemplo, vamos usar os utilitários do Microsoft Spark para facilitar a aquisição de segredos de um Key Vault pré-configurado. Para saber mais sobre os utilitários do Microsoft Spark, visite a introdução aos utilitários do Microsoft Spark.

servername = "<< server name >>"
dbname = "<< database name >>"
url = servername + ";" + "databaseName=" + dbname + ";"
dbtable = "<< table name >> "
user = "<< username >>" 
password = mssparkutils.credentials.getSecret('azure key vault name','secret name')

Observação

Atualmente, não há nenhum serviço vinculado ou suporte de passagem do AAD com o conector do SQL do Azure.

Usar o conector do SQL do Azure e do SQL Server

Ler dados

#Read from SQL table using MS SQL Connector
print("read data from SQL server table  ")
jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", dbtable) \
        .option("user", user) \
        .option("password", password).load()

jdbcDF.show(5)

Gravar dados

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", dbtable) \
    .option("user", user) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("MSSQL Connector write failed", error)

print("MSSQL Connector write(overwrite) succeeded  ")

Acrescentar dados

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Autenticação do Azure Active Directory

Exemplo do Python com entidade de serviço

import adal

# Located in App Registrations from Azure Portal
tenant_id = "<< tenant id >> "

# Located in App Registrations from Azure Portal
resource_app_id_url = "https://database.windows.net/"

# Authority
authority = "https://login.windows.net/" + tenant_id

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", dbtable) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Exemplo do Python com senha do Active Directory

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Próximas etapas