Compartilhar via


Conectando-se ao Azure Databricks e ao Azure Synapse com o PolyBase (herdado)

Importante

Esta documentação foi desativada e pode não estar atualizada. Não há mais suporte para os produtos, serviços ou tecnologias mencionados neste conteúdo. Confira Consultar dados no Azure Synapse Analytics.

O Databricks recomenda usar a funcionalidade padrão COPY com o Azure Data Lake Storage Gen2 para conexões com o Azure Synapse. Este artigo inclui a documentação herdada sobre o PolyBase e o armazenamento de blobs.

O Azure Synapse Analytics (antigo SQL Data Warehouse) é um data warehouse corporativo baseado em nuvem que aproveita o MPP (processamento paralelo maciço) para executar rapidamente consultas complexas em petabytes de dados. Use o Azure como um componente fundamental de uma solução de Big Data. Importe o Big Data para o Azure com consultas T-SQL simples do PolyBase ou a instrução COPY e use o poder do MPP para executar a análise de alto desempenho. À medida que você integrar e analisar, o data warehouse se tornará a única versão da verdade com que sua empresa poderá contar para obter informações.

Você pode acessar o Azure Sinapse no Azure Databricks usando o conector do Azure Synapse, uma implementação de fonte de dados para o Apache Spark que usa o Armazenamento de Blobs do Azure e o PolyBase ou a instrução COPY no Azure Synapse para transferir grandes volumes de dados com eficiência entre um cluster do Azure Databricks e uma instância do Azure Synapse.

O cluster do Azure Databricks e a instância do Azure Synapse acessam um contêiner comum do Armazenamento de Blobs para trocar dados entre esses dois sistemas. No Azure Databricks, os trabalhos do Apache Spark são disparados pelo conector do Azure Synapse Analytics para ler dados do contêiner do armazenamento de blobs e gravar dados nele. No lado do Azure Synapse, as operações de carregamento e descarregamento de dados realizadas pelo PolyBase são disparadas pelo conector do Azure Synapse por meio do JDBC. No Databricks Runtime 7.0 e superior, COPY é usado por padrão para carregar dados no Azure Synapse pelo conector do Azure Synapse por meio do JDBC.

Observação

A instrução COPY só está disponível em instâncias do Azure Synapse Gen2, que fornecem melhor desempenho. Se o banco de dados ainda usa instâncias do Gen1, recomendamos que você migre o banco de dados para o Gen2.

O conector do Azure Synapse é mais adequado para ETL do que para consultas interativas, pois cada execução de consulta pode extrair grandes volumes de dados para o armazenamento de blobs. Se você pretende executar várias consultas na mesma tabela do Azure Synapse, recomendamos que você salve os dados extraídos em um formato como Parquet.

Requisitos

Uma chave mestra de banco de dados do Azure Synapse.

Autenticação

O conector do Azure Synapse usa três tipos de conexões de rede:

  • Driver do Spark para o Azure Synapse
  • Driver e executores do Spark para a conta de armazenamento do Azure
  • Azure Synapse para a conta de armazenamento do Azure
                                 ┌─────────┐
      ┌─────────────────────────>│ STORAGE │<────────────────────────┐
      │   Storage acc key /      │ ACCOUNT │  Storage acc key /      │
      │   Managed Service ID /   └─────────┘  OAuth 2.0 /            │
      │                               │                              │
      │                               │                              │
      │                               │ Storage acc key /            │
      │                               │ OAuth 2.0 /                  │
      │                               │                              │
      v                               v                       ┌──────v────┐
┌──────────┐                      ┌──────────┐                │┌──────────┴┐
│ Synapse  │                      │  Spark   │                ││ Spark     │
│ Analytics│<────────────────────>│  Driver  │<───────────────>│ Executors │
└──────────┘  JDBC with           └──────────┘    Configured   └───────────┘
              username & password /                in Spark

As seções a seguir descrevem as opções de configuração de autenticação de cada conexão.

Driver do Spark para o Azure Synapse

O driver do Spark pode se conectar ao Azure Synapse usando o JDBC com um nome de usuário e uma senha ou o OAuth 2.0 com uma entidade de serviço para autenticação.

Nome de usuário e senha

Recomendamos que você use as cadeias de conexão fornecidas pelo portal do Azure para os dois tipos de autenticação, que habilitam a criptografia do protocolo SSL para todos os dados enviados entre o driver do Spark e a instância do Azure Synapse por meio da conexão JDBC. Para verificar se a criptografia SSL está habilitada, procure encrypt=true na cadeia de conexão.

Para permitir que o driver do Spark acesse o Azure Synapse, defina Permitir que os serviços e recursos do Azure acessem este workspace como ATIVADO no painel Rede, em Segurança do workspace do Azure Synapse, por meio do portal do Azure. Essa configuração permite a comunicação de todos os endereços IP do Azure e de todas as sub-redes do Azure, o que permite que os drivers do Spark acessem a instância do Azure Synapse.

OAuth 2.0 com uma entidade de serviço

Você pode se autenticar no Azure Synapse Analytics usando uma entidade de serviço com acesso à conta de armazenamento subjacente. Para obter mais informações sobre como usar as credenciais da entidade de serviço para acessar uma conta de armazenamento do Azure, confira Conectar-se ao Azure Data Lake Storage Gen2 e ao Armazenamento de Blobs. É preciso definir a opção enableServicePrincipalAuth como true nos Parâmetros de configuração da conexão a fim de habilitar o conector para se autenticar em uma entidade de serviço.

Opcionalmente, você pode usar uma entidade de serviço diferente para a conexão do Azure Synapse Analytics. Um exemplo que configura as credenciais da entidade de serviço para a conta de armazenamento e as credenciais da entidade de serviço opcionais para o Azure Synapse:

ini
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token

; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
Scala
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Python
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
R
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")

# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

Driver e executores do Spark para a conta de armazenamento do Azure

O contêiner de armazenamento do Azure funciona como um intermediário para armazenar dados em massa durante a leitura ou a gravação no Azure Synapse. O Spark se conecta ao ADLS Gen2 ou ao Armazenamento de Blobs usando o driver abfss.

As seguintes opções de autenticação estão disponíveis:

Os exemplos abaixo ilustram essas duas maneiras usando a abordagem de chave de acesso da conta de armazenamento. O mesmo se aplica à configuração do OAuth 2.0.

Configuração de sessão do notebook (preferencial)

Usando essa abordagem, a chave de acesso da conta é definida na configuração da sessão associada ao notebook que executa o comando. Essa configuração não afeta outros notebooks anexados ao mesmo cluster. spark é o objeto SparkSession fornecido no notebook.

spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

Configuração global do Hadoop

Essa abordagem atualiza a configuração global do Hadoop associada ao objeto SparkContext compartilhado por todos os notebooks.

Scala
sc.hadoopConfiguration.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")
Python

hadoopConfiguration não é exposto em todas as versões do PySpark. Embora o seguinte comando dependa de alguns elementos internos do Spark, ele deve funcionar com todas as versões do PySpark e provavelmente será desfeito ou alterado no futuro:

sc._jsc.hadoopConfiguration().set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

Azure Synapse para a conta de armazenamento do Azure

O Azure Synapse também se conecta a uma conta de armazenamento durante o carregamento e o descarregamento de dados temporários.

Caso você tenha configurado uma chave de conta e um segredo para a conta de armazenamento, defina forwardSparkAzureStorageCredentials como true. Nesse caso, o conector do Azure Synapse descobre automaticamente a chave de acesso da conta definida na configuração de sessão do notebook ou a configuração global do Hadoop e encaminha a chave de acesso da conta de armazenamento para a instância do Azure Synapse conectada criando uma credencial temporária no escopo do banco de dados do Azure.

Como alternativa, se você usar ADLS Gen2 com autenticação OAuth 2.0 ou se a instância do Azure Synapse estiver configurada para ter uma identidade de serviço gerenciada (normalmente com uma configuração de VNet e pontos de extremidade de serviço), defina useAzureMSI como true. Nesse caso, o conector especificará IDENTITY = 'Managed Service Identity' para a credencial no escopo do banco de dados e nenhum SECRET.

Suporte para streaming

O conector do Azure Synapse oferece suporte à gravação do Streaming Estruturado eficiente e escalonável para o Azure Synapse, que fornece uma experiência de usuário consistente com gravações em lote e usa o PolyBase ou COPY para transferências de dados grandes entre um cluster do Azure Databricks e a instância do Azure Synapse. Semelhante às gravações em lote, o streaming foi projetado amplamente para o ETL, fornecendo assim uma latência mais alta que, em alguns casos, pode não ser adequada para o processamento de dados em tempo real.

Semântica de tolerância a falhas

Por padrão, o Azure Synapse streaming oferece uma garantia de ponta a ponta exatamente uma vez para gravar dados em uma tabela do Azure Synapse, acompanhando de maneira confiável o progresso da consulta usando uma combinação de localização de ponto de verificação no DBFS, tabela de ponto de verificação no Azure Synapse e mecanismo de bloqueio para garantir que o streaming possa lidar com qualquer tipo de falha, tentativa e reinicialização de consulta. Opcionalmente, você pode selecionar uma semântica menos restritiva de pelo menos uma vez para o streaming do Azure Synapse, definindo a opção spark.databricks.sqldw.streaming.exactlyOnce.enabled como false. Nesse caso, a duplicação de dados pode ocorrer no caso de falhas de conexão intermitente com o Azure Synapse ou o encerramento inesperado da consulta.

Uso (lote)

Use esse conector por meio da API de fonte de dados em notebooks do Scala, do Python, do SQL e do R.

Scala


// Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .load()

// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select x, count(*) as cnt from table group by x")
  .load()

// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .save()

Python


# Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .load()

# Load data from an Azure Synapse query.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("query", "select x, count(*) as cnt from table group by x") \
  .load()

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .save()

SQL


-- Otherwise, set up the Blob storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;

-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);

-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;

R

# Load SparkR
library(SparkR)

# Otherwise, set up the Blob storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   dbTable = "<your-table-name>",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

# Load data from an Azure Synapse query.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   query = "select x, count(*) as cnt from table group by x",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
  df,
  source = "com.databricks.spark.sqldw",
  url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
  forward_spark_azure_storage_credentials = "true",
  dbTable = "<your-table-name>",
  tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

Uso (streaming)

Grave dados usando o streaming estruturado em notebooks do Scala e do Python.

Scala

// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100000")
  .option("numPartitions", "16")
  .load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()

Python

# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

Configuração

Esta seção descreve como configurar a semântica de gravação para o conector, as permissões necessárias e parâmetros de configuração diversos.

Nesta seção:

Modos de salvamento compatíveis com gravações em lote

O conector do Azure Synapse dá suporte aos modos de salvamento ErrorIfExists, Ignore, Append e Overwrite, sendo ErrorIfExists o modo padrão. Para obter mais informações sobre os modos de salvamento compatíveis com o Apache Spark, confira a documentação do Spark SQL sobre modos de salvamento.

Modos de saída compatíveis com gravações de streaming

O conector do Azure Synapse dá suporte aos modos de saída Append e Complete para acréscimos e agregações de gravação. Para obter mais detalhes sobre os modos de saída e a matriz de compatibilidade, confira o guia do Streaming Estruturado.

Semântica de gravação

Observação

COPY está disponível no Databricks Runtime 7.0 e superior.

Além do PolyBase, o conector do Azure Synapse dá suporte à instrução COPY. A instrução COPY oferece uma forma mais conveniente de carregar dados no Azure Synapse sem a necessidade de criar uma tabela externa, exige menos permissões para carregar dados e aprimora o desempenho da ingestão de dados no Azure Synapse.

Por padrão, o conector descobre automaticamente a melhor semântica de gravação (COPY, quando uma instância do Azure Synapse Gen2 é o destino, caso contrário, o PolyBase). Você também pode especificar a semântica de gravação com a seguinte configuração:

Scala

// Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")

Python

# Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")

SQL

-- Configure the write semantics for Azure Synapse connector in the notebook session conf.
SET spark.databricks.sqldw.writeSemantics=<write-semantics>;

R

# Load SparkR
library(SparkR)

# Configure the write semantics for Azure Synapse connector in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.writeSemantics", "<write-semantics>")

em que <write-semantics> é polybase, para usar o PolyBase, ou copy, para usar a instrução COPY.

Permissões do Azure Synapse necessárias para o PolyBase

Quando você usa o PolyBase, o conector do Azure Synapse exige que o usuário da conexão JDBC tenha permissão para executar os seguintes comandos na instância conectada do Azure Synapse:

Como um pré-requisito para o primeiro comando, o conector espera que uma chave mestra de banco de dados já exista para a instância especificada do Azure Synapse. Caso contrário, você pode criar uma chave usando o comando CREATE MASTER KEY.

Além disso, para ler a tabela do Azure Synapse definida por meio de dbTable ou das tabelas referenciadas em query, o usuário do JDBC precisa ter permissão para acessar as tabelas do Azure Synapse necessárias. Para gravar dados novamente em uma tabela do Azure Synapse definida por meio de dbTable, o usuário do JDBC precisa ter permissão para gravar nessa tabela do Azure Synapse.

A seguinte tabela resume as permissões necessárias para todas as operações com o PolyBase:

Operação Permissões Permissões necessárias ao usar uma fonte de dados externa
Gravação em lote CONTROL Confira Gravação em lote
Gravação de streaming CONTROL Confira Gravação de streaming
Ler CONTROL Confira Leitura

Permissões do Azure Synapse necessárias para o PolyBase com a opção de fonte de dados externa

Você pode usar o PolyBase com uma fonte de dados externa previamente provisionada. Confira o parâmetro externalDataSource em Parâmetros para obter mais informações.

Para usar o PolyBase com uma fonte de dados externa previamente provisionada, o conector do Azure Synapse exige que o usuário da conexão JDBC tenha permissão para executar os seguintes comandos na instância conectada do Azure Synapse:

Para criar uma fonte de dados externa, primeiro, você deve criar uma credencial no escopo do banco de dados. Os seguintes links descrevem como criar uma credencial no escopo para entidades de serviço e uma fonte de dados externa para uma localização do ABFS:

Observação

A localização da fonte de dados externa precisa apontar para um contêiner. O conector não funcionará se a localização for um diretório em um contêiner.

A seguinte tabela resume as permissões para operações de gravação do PolyBase com a opção de fonte de dados externa:

Operação Permissões (inserir em uma tabela existente) Permissões (inserir em uma nova tabela)
Gravação em lote ADMINISTRAR OPERAÇÕES EM LOTE DO BANCO DE DADOS

INSERT

CREATE TABLE

ALTER ANY SCHEMA

ALTER ANY EXTERNAL DATA SOURCE

ALTER ANY EXTERNAL FILE FORMAT
ADMINISTRAR OPERAÇÕES EM LOTE DO BANCO DE DADOS

INSERT

CREATE TABLE

ALTER ANY SCHEMA

ALTER ANY EXTERNAL DATA SOURCE

ALTER ANY EXTERNAL FILE FORMAT
Gravação de streaming ADMINISTRAR OPERAÇÕES EM LOTE DO BANCO DE DADOS

INSERT

CREATE TABLE

ALTER ANY SCHEMA

ALTER ANY EXTERNAL DATA SOURCE

ALTER ANY EXTERNAL FILE FORMAT
ADMINISTRAR OPERAÇÕES EM LOTE DO BANCO DE DADOS

INSERT

CREATE TABLE

ALTER ANY SCHEMA

ALTER ANY EXTERNAL DATA SOURCE

ALTER ANY EXTERNAL FILE FORMAT

A seguinte tabela resume as permissões para operações de leitura do PolyBase com a opção de fonte de dados externa:

Operação Permissões
Ler CREATE TABLE

ALTER ANY SCHEMA

ALTER ANY EXTERNAL DATA SOURCE

ALTER ANY EXTERNAL FILE FORMAT

Use esse conector para leitura por meio da API de fonte de dados em notebooks do Scala, do Python, do SQL e do R.

Scala
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("externalDataSource", "<your-pre-provisioned-data-source>")
  .option("dbTable", "<your-table-name>")
  .load()
Python
# Get some data from an Azure Synapse table.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("externalDataSource", "<your-pre-provisioned-data-source>") \
  .option("dbTable", "<your-table-name>") \
  .load()
SQL
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>',
  externalDataSource '<your-pre-provisioned-data-source>'
);
R
# Get some data from an Azure Synapse table.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   dbTable = "<your-table-name>",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>"
   externalDataSource = "<your-pre-provisioned-data-source>")

Permissões do Azure Synapse necessárias para a instrução COPY

Observação

Disponível no Databricks Runtime 7.0 e superior.

Quando você usa a instrução COPY, o conector do Azure Synapse exige que o usuário da conexão JDBC tenha permissão para executar os seguintes comandos na instância conectada do Azure Synapse:

Se a tabela de destino não existir no Azure Synapse, a permissão para executar o seguinte comando será necessária, além do comando acima:

A seguinte tabela resume as permissões para gravações em lote e de streaming com COPY:

Operação Permissões (inserir em uma tabela existente) Permissões (inserir em uma nova tabela)
Gravação em lote ADMINISTRAR OPERAÇÕES EM LOTE DO BANCO DE DADOS

INSERT
ADMINISTRAR OPERAÇÕES EM LOTE DO BANCO DE DADOS

INSERT

CREATE TABLE

ALTER ON SCHEMA :: dbo
Gravação de streaming ADMINISTRAR OPERAÇÕES EM LOTE DO BANCO DE DADOS

INSERT
ADMINISTRAR OPERAÇÕES EM LOTE DO BANCO DE DADOS

INSERT

CREATE TABLE

ALTER ON SCHEMA :: dbo

Parâmetros

O mapa de parâmetros ou o OPTIONS fornecido no Spark SQL dá suporte às seguintes configurações:

Parâmetro Obrigatório Padrão Observações
dbTable Sim, a menos que query seja especificado Nenhum padrão A tabela necessária para criação ou leitura no Azure Synapse. Esse parâmetro é necessário quando os dados são salvos novamente no Azure Synapse.

Use também {SCHEMA NAME}.{TABLE NAME} para acessar uma tabela em determinado esquema. Se o nome do esquema não for fornecido, o esquema padrão associado ao usuário do JDBC será usado.

A variante dbtable anteriormente compatível foi preterida e será ignorada em versões futuras. Em vez disso, use o nome em minúsculas concatenadas.
query Sim, a menos que dbTable seja especificado Nenhum padrão A consulta a ser lida no Azure Synapse.

Para as tabelas referenciadas na consulta, use também {SCHEMA NAME}.{TABLE NAME} para acessar uma tabela em determinado esquema. Se o nome do esquema não for fornecido, o esquema padrão associado ao usuário do JDBC será usado.
user No Nenhum padrão O nome de usuário do Azure Synapse. Precisa ser usado em conjunto com a opção password. Só poderá ser usado se o usuário e a senha não forem transmitidos na URL. A transmissão dos dois resultará em um erro.
password No Nenhum padrão A senha do Azure Synapse. Precisa ser usado em conjunto com a opção user. Só poderá ser usado se o usuário e a senha não forem transmitidos na URL. A transmissão dos dois resultará em um erro.
url Sim Nenhum padrão Uma URL do JDBC com sqlserver definido como o subprotocolo. Recomendamos usar a cadeia de conexão fornecida pelo portal do Azure. Configuração
encrypt=true é altamente recomendável, pois habilita a criptografia SSL da conexão JDBC. Se user e password forem definidos separadamente, você não precisará incluí-los na URL.
jdbcDriver No Determinado pelo subprotocolo da URL do JDBC O nome de classe do driver JDBC a ser usado. Essa classe precisa estar no caminho de classe. Na maioria dos casos, não deve ser necessário especificar essa opção, pois o nome de classe apropriado do driver deve ser determinado automaticamente pelo subprotocolo da URL do JDBC.

A variante jdbc_driver anteriormente compatível foi preterida e será ignorada em versões futuras. Em vez disso, use o nome em minúsculas concatenadas.
tempDir Sim Nenhum padrão Um URI abfss. Recomendamos que você use um contêiner dedicado do armazenamento de blobs para o Azure Synapse.

A variante tempdir anteriormente compatível foi preterida e será ignorada em versões futuras. Em vez disso, use o nome em minúsculas concatenadas.
tempFormat No PARQUET O formato no qual os arquivos temporários serão salvos no armazenamento de blobs durante a gravação no Azure Synapse. Usa PARQUET como padrão. Nenhum outro valor é permitido no momento.
tempCompression No SNAPPY O algoritmo de compactação a ser usado para codificar/decodificar arquivos temporários pelo Spark e pelo Azure Synapse. Os valores atualmente compatíveis são: UNCOMPRESSED, SNAPPY e GZIP.
forwardSparkAzureStorageCredentials No false Se o valor for true, a biblioteca descobrirá automaticamente as credenciais que o Spark está usando para se conectar ao contêiner do armazenamento de blobs e encaminhará essas credenciais para o Azure Synapse por meio do JDBC. Essas credenciais são enviadas como parte da consulta do JDBC. Portanto, é altamente recomendável que você habilite a criptografia SSL da conexão JDBC ao usar essa opção.

A versão atual do conector do Azure Synapse exige (exatamente) que um forwardSparkAzureStorageCredentials, um enableServicePrincipalAuth ou um useAzureMSI seja definido explicitamente como true.

A variante forward_spark_azure_storage_credentials anteriormente compatível foi preterida e será ignorada em versões futuras. Em vez disso, use o nome em minúsculas concatenadas.
useAzureMSI No false Se ele for true, a biblioteca especificará IDENTITY = 'Managed Service Identity' e nenhum SECRET para as credenciais no escopo do banco de dados criadas por ela.

A versão atual do conector do Azure Synapse exige (exatamente) que um forwardSparkAzureStorageCredentials, um enableServicePrincipalAuth ou um useAzureMSI seja definido explicitamente como true.
enableServicePrincipalAuth No false Se ele for true, a biblioteca usará as credenciais da entidade de serviço fornecidas para se conectar à conta de armazenamento do Azure e ao Azure Synapse Analytics por meio do JDBC.

A versão atual do conector do Azure Synapse exige (exatamente) que um forwardSparkAzureStorageCredentials, um enableServicePrincipalAuth ou um useAzureMSI seja definido explicitamente como true.
tableOptions No CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN Uma cadeia de caracteres usada para especificar opções de tabela ao criar a tabela do Azure Synapse definida por meio de dbTable. Essa cadeia de caracteres é transmitida literalmente à cláusula WITH da instrução SQL CREATE TABLE que é emitida no Azure Synapse.

A variante table_options anteriormente compatível foi preterida e será ignorada em versões futuras. Em vez disso, use o nome em minúsculas concatenadas.
preActions No Sem padrão (cadeia de caracteres vazia) Uma lista separada por ; de comandos SQL a serem executados no Azure Synapse antes da gravação de dados na instância do Azure Synapse. Esses comandos SQL precisam ser comandos válidos aceitos pelo Azure Synapse.

Se um desses comandos falhar, ele será tratado como um erro e a operação de gravação não será executada.
postActions No Sem padrão (cadeia de caracteres vazia) Uma lista separada por ; de comandos SQL a serem executados no Azure Synapse depois que o conector gravar os dados com êxito na instância do Azure Synapse. Esses comandos SQL precisam ser comandos válidos aceitos pelo Azure Synapse.

Se um desses comandos falhar, ele será tratado como um erro e você receberá uma exceção depois que os dados forem gravados com êxito na instância do Azure Synapse.
maxStrLength No 256 StringType no Spark é mapeado para o tipo NVARCHAR(maxStrLength) no Azure Synapse. Você pode usar maxStrLength para definir o comprimento da cadeia de caracteres para todas as colunas do tipo NVARCHAR(maxStrLength) que estão na tabela com o nome
dbTable no Azure Synapse.

A variante maxstrlength anteriormente compatível foi preterida e será ignorada em versões futuras. Em vez disso, use o nome em minúsculas concatenadas.
checkpointLocation Sim Nenhum padrão Localização no DBFS que será usada pelo Streaming Estruturado para gravar metadados e informações de ponto de verificação. Confira Recuperando-se de falhas com a definição de ponto de verificação no guia de programação do Streaming Estruturado.
numStreamingTempDirsToKeep No 0 Indica quantos diretórios temporários (mais recentes) devem ser mantidos para limpeza periódica de microlotes em streaming. Quando esse valor é definido como 0, a exclusão de diretório é disparada imediatamente depois que o microlote é comprometido, caso contrário, o número de microlotes mais recentes é mantido e o restante dos diretórios é removido. Use -1 para desabilitar a limpeza periódica.
applicationName No Databricks-User-Query A marca da conexão para cada consulta. Se um valor não for especificado ou o valor for uma cadeia de caracteres vazia, o valor padrão da marca será adicionado à URL do JDBC. O valor padrão impede que a ferramenta Monitoramento de BD do Azure gere alertas de injeção de SQL falsos em consultas.
maxbinlength No Nenhum padrão Controle o comprimento das colunas BinaryType. Esse parâmetro é convertido como VARBINARY(maxbinlength).
identityInsert No false A configuração como true habilita o modo IDENTITY_INSERT, que insere um valor fornecido pelo DataFrame na coluna de identidade da tabela do Azure Synapse.

Confira Como inserir explicitamente os valores em uma coluna IDENTITY.
externalDataSource No Nenhum padrão Uma fonte de dados externa previamente provisionada para ler dados do Azure Synapse. Uma fonte de dados externa só pode ser usada com o PolyBase e remove o requisito de permissão CONTROL, pois o conector não precisa criar uma credencial no escopo e uma fonte de dados externa para carregar dados.

Para ver exemplos de uso e a lista de permissões necessárias ao usar uma fonte de dados externa, confira Permissões do Azure Synapse necessárias para o PolyBase com a opção de fonte de dados externa.
maxErrors No 0 O número máximo de linhas que podem ser rejeitadas durante leituras e gravações antes do cancelamento da operação de carregamento (PolyBase ou COPY). As linhas rejeitadas serão ignoradas. Por exemplo, quando dois registros entre dez estão com erros, apenas oito deles são processados.

Confira a documentação de REJECT_VALUE em CREATE EXTERNAL TABLE e a documentação de MAXERRORS em COPY.

Observação

  • tableOptions, preActions, postActions e maxStrLength só são relevantes na gravação de dados do Azure Databricks para uma nova tabela do Azure Synapse.
  • externalDataSource só é relevante na leitura de dados do Azure Synapse e na gravação de dados do Azure Databricks para uma nova tabela do Azure Synapse com a semântica do PolyBase. Você não deve especificar outros tipos de autenticação de armazenamento ao usar externalDataSource como forwardSparkAzureStorageCredentials ou useAzureMSI.
  • checkpointLocation e numStreamingTempDirsToKeep só são relevantes para gravações de streaming do Azure Databricks para uma nova tabela do Azure Synapse.
  • Embora todos os nomes de opções de fonte de dados não diferenciem maiúsculas de minúsculas, recomendamos especificá-los em “minúsculas concatenadas” para maior clareza.

Pushdown de consulta no Azure Synapse

O conector do Azure Synapse implementa um conjunto de regras de otimização para efetuar pushdown dos seguintes operadores para o Azure Synapse:

  • Filter
  • Project
  • Limit

Os operadores Project e Filter dão suporte às seguintes expressões:

  • A maioria dos operadores lógicos boolianos
  • Comparações
  • Operadores aritméticos básicos
  • Conversões numéricas e de cadeia de caracteres

Para o operador Limit, só há suporte para o pushdown quando não há nenhuma ordenação especificada. Por exemplo:

SELECT TOP(10) * FROM table, mas não SELECT TOP(10) * FROM table ORDER BY col.

Observação

O conector do Azure Synapse não efetua pushdown de expressões que operam em cadeias de caracteres, datas ou carimbos de data/hora.

O pushdown de consulta criado com o conector do Azure Synapse está habilitado por padrão. Você pode desabilitá-lo definindo spark.databricks.sqldw.pushdown como false.

Gerenciamento de dados temporários

O conector do Azure Synapse não exclui os arquivos temporários que ele cria no contêiner do armazenamento de blobs. Portanto, recomendamos que você exclua periodicamente os arquivos temporários na localização tempDir fornecida pelo usuário.

Para facilitar a limpeza dos dados, o conector do Azure Synapse não armazena arquivos de dados diretamente em tempDir, mas cria um subdiretório do formato <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/. Você pode configurar trabalhos periódicos (usando o recurso trabalhos do Azure Databricks ou de outra forma) para excluir recursivamente os subdiretórios com mais de um limite especificado (por exemplo, dois dias), com a suposição de que não pode haver trabalhos do Spark em execução por mais tempo do que esse limite.

Uma alternativa mais simples é remover periodicamente todo o contêiner e criar outro com o mesmo nome. Isso exige que você use um contêiner dedicado para os dados temporários produzidos pelo conector do Azure Synapse e que você possa encontrar uma janela de tempo na qual possa garantir que nenhuma consulta que envolva o conector esteja em execução.

Gerenciamento de objetos temporários

O conector do Azure Synapse automatiza a transferência de dados entre um cluster do Azure Databricks e uma instância do Azure Synapse. Para ler dados de uma tabela do Azure Synapse ou consultar ou gravar dados em uma tabela do Azure Synapse, o conector do Azure Synapse cria objetos temporários, incluindo DATABASE SCOPED CREDENTIAL, EXTERNAL DATA SOURCE, EXTERNAL FILE FORMAT e EXTERNAL TABLE nos bastidores. Esses objetos só existem durante a duração do trabalho do Spark correspondente e devem ser removidos automaticamente depois disso.

Quando um cluster está executando uma consulta por meio do conector do Azure Synapse, se o processo do driver do Spark falha ou é forçado a ser reiniciado ou se o cluster é encerrado ou é forçado a ser reiniciado, os objetos temporários podem não ser removidos. Para facilitar a identificação e a exclusão manual desses objetos, o conector do Azure Synapse prefixa os nomes de todos os objetos temporários intermediários criados na instância do Azure Synapse com uma marca do formato tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>.

Recomendamos que você procure periodicamente objetos vazados usando consultas como as seguintes:

  • SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'

Gerenciamento de tabelas de ponto de verificação de streaming

O conector do Azure Synapse não exclui a tabela de ponto de verificação de streaming criada quando uma nova consulta de streaming é iniciada. Esse comportamento é coerente com o checkpointLocation no DBFS. Portanto, recomendamos que você exclua periodicamente as tabelas de ponto de verificação, removendo as localizações de ponto de verificação no DBFS das consultas que não serão executadas no futuro ou que já tenham a localização de ponto de verificação removida.

Por padrão, todas as tabelas de ponto de verificação têm o nome <prefix>_<query-id>, em que <prefix> é um prefixo configurável com o valor padrão databricks_streaming_checkpoint e query_id é uma ID de consulta de streaming com os caracteres _ removidos. Para encontrar todas as tabelas de ponto de verificação para consultas de streaming obsoletas ou excluídas, execute a consulta:

SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

Você pode configurar o prefixo com a opção de configuração do Spark SQL spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix.

Perguntas frequentes (FAQ)

Recebi um erro ao usar o conector do Azure Synapse. Como saber se esse erro é referente ao Azure Synapse ou ao Azure Databricks?

Para ajudar você a depurar erros, qualquer exceção gerada pelo código específico ao conector do Azure Synapse é envolvida em uma exceção que estende a característica SqlDWException. As exceções também fazem a seguinte distinção:

  • SqlDWConnectorException representa um erro gerado pelo conector do Azure Synapse
  • SqlDWSideException representa um erro gerado pela instância conectada do Azure Synapse

O que devo fazer se minha consulta falha com o erro “Nenhuma chave de acesso encontrada na configuração de sessão ou na configuração global do Hadoop”?

Esse erro significa que conector do Azure Synapse não pôde encontrar a chave de acesso da conta de armazenamento na configuração de sessão do notebook ou na configuração global do Hadoop para a conta de armazenamento especificada em tempDir. Confira Uso (lote) para ver exemplos de como configurar o acesso da conta de armazenamento corretamente. Se uma tabela do Spark for criada por meio do conector do Azure Synapse, você ainda precisará fornecer as credenciais de acesso da conta de armazenamento para leitura ou gravação na tabela do Spark.

Posso usar uma SAS (Assinatura de Acesso Compartilhado) para acessar o contêiner do armazenamento de blobs especificado por tempDir?

O Azure Synapse não dá suporte ao uso da SAS para acessar o armazenamento de blobs. Portanto, o conector do Azure Synapse não dá suporte à SAS para acessar o contêiner do armazenamento de blobs especificado por tempDir.

Criei uma tabela do Spark usando o conector do Azure Synapse com a opção dbTable, gravei alguns dados nessa tabela do Spark e, depois, removi a tabela do Spark. A tabela criada no lado do Azure Synapse será removida?

Não. O Azure Synapse é considerado uma fonte de dados externa. A tabela do Azure Synapse com o nome definido por meio de dbTable não é removida quando a tabela do Spark é removida.

Ao gravar um DataFrame no Azure Synapse, por que preciso indicar .option("dbTable", tableName).save() em vez de apenas .saveAsTable(tableName)?

Porque desejamos tornar a seguinte distinção clara: .option("dbTable", tableName) refere-se à tabela do banco de dados (ou seja, o Azure Synapse), enquanto .saveAsTable(tableName) refere-se à tabela do Spark. Na verdade, você pode até combinar os dois: df.write. ... .option("dbTable", tableNameDW).saveAsTable(tableNameSpark), que cria uma tabela do Azure Synapse chamada tableNameDW, e uma tabela externa no Spark chamada tableNameSpark que tem o suporte da tabela do Azure Synapse.

Aviso

Tenha cuidado com a seguinte diferença entre .save() e .saveAsTable():

  • Para df.write. ... .option("dbTable", tableNameDW).mode(writeMode).save(), writeMode age na tabela do Azure Synapse, conforme esperado.
  • Para df.write. ... .option("dbTable", tableNameDW).mode(writeMode).saveAsTable(tableNameSpark), writeMode age na tabela do Spark, enquanto tableNameDWé silenciosamente substituído, caso ele já exista no Azure Synapse.

Esse comportamento não é diferente da gravação em qualquer outra fonte de dados. É apenas uma limitação da API do DataFrameWriter do Spark.