Share via


Conectando o Azure Databricks e o Azure Synapse ao PolyBase (legado)

Importante

Esta documentação foi desativada e pode não ser atualizada. Os produtos, serviços ou tecnologias mencionados neste conteúdo não são mais suportados. Consulte Dados de consulta no Azure Synapse Analytics.

O Databricks recomenda usar a funcionalidade padrão COPY com o Azure Data Lake Storage Gen2 para conexões com o Azure Synapse. Este artigo inclui documentação herdada sobre o PolyBase e o armazenamento de blobs.

O Azure Synapse Analytics (anteriormente SQL Data Warehouse) é um armazém de dados empresarial baseado na nuvem que tira partido do processamento paralelo maciço (MPP) para executar rapidamente consultas complexas em petabytes de dados. Use o Azure como um componente-chave de uma solução de big data. Importe big data para o Azure com consultas T-SQL PolyBase simples ou instrução COPY e, em seguida, use o poder do MPP para executar análises de alto desempenho. À medida que integra e analisa, o armazém de dados tornar-se-á a única versão real com que a sua empresa pode contar para obter informações.

Você pode acessar o Azure Synapse do Azure Databricks usando o conector Azure Synapse, uma implementação de fonte de dados para o Apache Spark que usa o armazenamento de Blob do Azure, e o PolyBase ou a COPY instrução no Azure Synapse para transferir grandes volumes de dados de forma eficiente entre um cluster do Azure Databricks e uma instância do Azure Synapse.

Tanto o cluster do Azure Databricks quanto a instância do Azure Synapse acessam um contêiner de armazenamento de Blob comum para trocar dados entre esses dois sistemas. No Azure Databricks, os trabalhos do Apache Spark são acionados pelo conector Synapse do Azure para ler e gravar dados no contêiner de armazenamento de Blob. No lado do Azure Synapse, as operações de carregamento e descarregamento de dados executadas pelo PolyBase são acionadas pelo conector do Azure Synapse por meio do JDBC. No Databricks Runtime 7.0 e superior, COPY é usado por padrão para carregar dados no Azure Synapse pelo conector do Azure Synapse por meio do JDBC.

Nota

COPY está disponível apenas em instâncias do Azure Synapse Gen2, que fornecem melhor desempenho. Se seu banco de dados ainda usa instâncias Gen1, recomendamos que você migre o banco de dados para Gen2.

O conector do Azure Synapse é mais adequado para ETL do que para consultas interativas, porque cada execução de consulta pode extrair grandes quantidades de dados para o armazenamento de Blob. Se você planeja executar várias consultas na mesma tabela do Azure Synapse, recomendamos que salve os dados extraídos em um formato como Parquet.

Requisitos

Uma chave mestra de banco de dados do Azure Synapse.

Autenticação

O conector do Azure Synapse utiliza três tipos de ligações de rede:

  • Controlador do Apache Spark para Azure Synapse
  • Executores e controlador do Apache Spark para a conta de armazenamento do Azure
  • Azure Synapse para a conta de armazenamento do Azure
                                 ┌─────────┐
      ┌─────────────────────────>│ STORAGE │<────────────────────────┐
      │   Storage acc key /      │ ACCOUNT │  Storage acc key /      │
      │   Managed Service ID /   └─────────┘  OAuth 2.0 /            │
      │                               │                              │
      │                               │                              │
      │                               │ Storage acc key /            │
      │                               │ OAuth 2.0 /                  │
      │                               │                              │
      v                               v                       ┌──────v────┐
┌──────────┐                      ┌──────────┐                │┌──────────┴┐
│ Synapse  │                      │  Spark   │                ││ Spark     │
│ Analytics│<────────────────────>│  Driver  │<───────────────>│ Executors │
└──────────┘  JDBC with           └──────────┘    Configured   └───────────┘
              username & password /                in Spark

As seções a seguir descrevem as opções de configuração de autenticação de cada conexão.

Controlador do Apache Spark para Azure Synapse

O driver do Spark pode se conectar ao Azure Synapse usando JDBC com um nome de usuário e senha ou OAuth 2.0 com uma entidade de serviço para autenticação.

Nome de utilizador e palavra-passe

Recomendamos que você use as cadeias de conexão fornecidas pelo portal do Azure para ambos os tipos de autenticação, que habilitam a criptografia SSL (Secure Sockets Layer) para todos os dados enviados entre o driver do Spark e a instância do Azure Synapse por meio da conexão JDBC. Para verificar se a criptografia SSL está habilitada, você pode procurar na cadeia de encrypt=true conexão.

Para permitir que o driver do Spark alcance o Azure Synapse, recomendamos que você defina Permitir que os serviços e recursos do Azure acessem esse espaço de trabalho como ATIVADO no painel Rede em Segurança do espaço de trabalho do Azure Synapse por meio do portal do Azure. Essa configuração permite comunicações de todos os endereços IP do Azure e todas as sub-redes do Azure, o que permite que os drivers do Spark alcancem a instância do Azure Synapse.

OAuth 2.0 com um principal de serviço

Você pode autenticar no Azure Synapse Analytics usando uma entidade de serviço com acesso à conta de armazenamento subjacente. Para obter mais informações sobre a utilização de credenciais de principal de serviço para aceder a uma conta de armazenamento do Azure, consulte Ligar ao Azure Data Lake Storage Gen2 e Armazenamento de Blobs. Você deve definir a enableServicePrincipalAuth opção como true em Parâmetros de configuração de conexão para permitir que o conector se autentique com uma entidade de serviço.

Opcionalmente, você pode usar uma entidade de serviço diferente para a conexão do Azure Synapse Analytics. Um exemplo que configura credenciais de entidade de serviço para a conta de armazenamento e credenciais de entidade de serviço opcionais para Synapse:

ini
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token

; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
Scala
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Python
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
R
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")

# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

Executores e controlador do Apache Spark para a conta de armazenamento do Azure

O contêiner de armazenamento do Azure atua como um intermediário para armazenar dados em massa ao ler ou gravar no Azure Synapse. O Spark se conecta ao ADLS Gen2 ou ao Blob Storage usando o abfss driver.

As seguintes opções de autenticação estão disponíveis:

Os exemplos abaixo ilustram essas duas maneiras usando a abordagem de chave de acesso à conta de armazenamento. O mesmo se aplica à configuração do OAuth 2.0.

Configuração da sessão do bloco de notas (preferencial)

Usando essa abordagem, a chave de acesso à conta é definida na configuração da sessão associada ao bloco de anotações que executa o comando. Essa configuração não afeta outros blocos de anotações conectados ao mesmo cluster. spark é o SparkSession objeto fornecido no caderno.

spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

Configuração global do Hadoop

Essa abordagem atualiza a configuração global do Hadoop associada ao SparkContext objeto compartilhado por todos os blocos de anotações.

Scala
sc.hadoopConfiguration.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")
Python

hadoopConfiguration não é exposto em todas as versões do PySpark. Embora o comando a seguir dependa de alguns internos do Spark, ele deve funcionar com todas as versões do PySpark e é improvável que quebre ou mude no futuro:

sc._jsc.hadoopConfiguration().set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

Azure Synapse para a conta de armazenamento do Azure

O Azure Synapse também se liga a uma conta de armazenamento durante o carregamento e o descarregamento de dados temporários.

Caso tenha configurado uma chave de conta e um segredo para a conta de armazenamento, você pode definir forwardSparkAzureStorageCredentials como true, caso em que o conector do Azure Synapse descobre automaticamente a chave de acesso da conta definida na configuração da sessão do bloco de anotações ou na configuração global do Hadoop e encaminha a chave de acesso da conta de armazenamento para a instância Synapse do Azure conectada criando uma credencial temporária com escopo do banco de dados do Azure.

Como alternativa, se você usar o ADLS Gen2 com autenticação OAuth 2.0 ou se sua instância do Azure Synapse estiver configurada para ter uma Identidade de Serviço Gerenciado (normalmente em conjunto com uma configuração de VNet + Pontos de Extremidade de Serviço), deverá definir useAzureMSI como true. Nesse caso, o conector especificará IDENTITY = 'Managed Service Identity' para a credencial de escopo baseada em dados e não SECRET.

Suporte de streaming

O conector Azure Synapse oferece suporte de gravação de Streaming Estruturado eficiente e escalável para o Azure Synapse que fornece experiência de usuário consistente com gravações em lote e usa PolyBase ou COPY para grandes transferências de dados entre um cluster do Azure Databricks e a instância do Azure Synapse. Semelhante às gravações em lote, o streaming é projetado em grande parte para ETL, fornecendo latência mais alta que pode não ser adequada para processamento de dados em tempo real em alguns casos.

Semântica de tolerância a falhas

Por padrão, o Azure Synapse Streaming oferece garantia de ponta a ponta exatamente uma vez para gravar dados em uma tabela do Azure Synapse, acompanhando de forma confiável o progresso da consulta usando uma combinação de local de ponto de verificação no DBFS, tabela de ponto de verificação no Azure Synapse e mecanismo de bloqueio para garantir que o streaming possa lidar com quaisquer tipos de falhas, tentativas e reinicializações de consulta. Opcionalmente, você pode selecionar semânticas menos restritivas pelo menos uma vez para o Azure Synapse Streaming definindo spark.databricks.sqldw.streaming.exactlyOnce.enabled a opção como false, caso em que a duplicação de dados pode ocorrer no caso de falhas de conexão intermitentes com o Azure Synapse ou encerramento inesperado da consulta.

Utilização (lote)

Você pode usar esse conector por meio da API da fonte de dados em notebooks Scala, Python, SQL e R.

Scala


// Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .load()

// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select x, count(*) as cnt from table group by x")
  .load()

// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .save()

Python


# Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .load()

# Load data from an Azure Synapse query.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("query", "select x, count(*) as cnt from table group by x") \
  .load()

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .save()

SQL


-- Otherwise, set up the Blob storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;

-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);

-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;

R

# Load SparkR
library(SparkR)

# Otherwise, set up the Blob storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   dbTable = "<your-table-name>",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

# Load data from an Azure Synapse query.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   query = "select x, count(*) as cnt from table group by x",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
  df,
  source = "com.databricks.spark.sqldw",
  url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
  forward_spark_azure_storage_credentials = "true",
  dbTable = "<your-table-name>",
  tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

Utilização (Streaming)

Você pode gravar dados usando o Structured Streaming em blocos de anotações Scala e Python.

Scala

// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100000")
  .option("numPartitions", "16")
  .load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()

Python

# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

Configuração

Esta seção descreve como configurar a semântica de gravação para o conector, as permissões necessárias e os parâmetros de configuração diversos.

Nesta secção:

Modos de salvamento suportados para gravações em lote

O conector Sinapse do Azure suporta ErrorIfExistsos modos , Ignore, Appende Overwrite salvar com o modo padrão sendo ErrorIfExists. Para obter mais informações sobre os modos de salvamento suportados no Apache Spark, consulte a documentação do Spark SQL em Modos de salvamento.

Modos de saída suportados para gravações de streaming

O conector do Azure Synapse dá suporte Append aos modos de saída para Complete acréscimos e agregações de registros. Para obter mais detalhes sobre modos de saída e matriz de compatibilidade, consulte o guia Streaming estruturado.

Semântica de escrita

Nota

COPY está disponível no Databricks Runtime 7.0 e superior.

Além do PolyBase, o conector Synapse do Azure dá suporte à COPY instrução. A COPY instrução oferece uma maneira mais conveniente de carregar dados no Azure Synapse sem a necessidade de criar uma tabela externa, requer menos permissões para carregar dados e melhora o desempenho da ingestão de dados no Azure Synapse.

Por padrão, o conector descobre automaticamente a melhor semântica de gravação (COPY ao direcionar uma instância do Azure Synapse Gen2, caso contrário, o PolyBase). Você também pode especificar a semântica de gravação com a seguinte configuração:

Scala

// Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")

Python

# Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")

SQL

-- Configure the write semantics for Azure Synapse connector in the notebook session conf.
SET spark.databricks.sqldw.writeSemantics=<write-semantics>;

R

# Load SparkR
library(SparkR)

# Configure the write semantics for Azure Synapse connector in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.writeSemantics", "<write-semantics>")

onde <write-semantics> é polybase usar PolyBase ou copy usar a COPY instrução.

Permissões necessárias do Azure Synapse para o PolyBase

Quando você usa o PolyBase, o conector do Azure Synapse requer que o usuário da conexão JDBC tenha permissão para executar os seguintes comandos na instância conectada do Azure Synapse:

Como pré-requisito para o primeiro comando, o conector espera que já exista uma chave mestra de banco de dados para a instância Synapse do Azure especificada. Caso contrário, você pode criar uma chave usando o comando CREATE MASTER KEY .

Além disso, para ler a tabela do Azure Synapse definida ou dbTable as tabelas mencionadas no query, o usuário JDBC deve ter permissão para acessar as tabelas necessárias do Azure Synapse. Para gravar dados de volta em uma tabela do Azure Synapse definida através dbTabledo , o usuário JDBC deve ter permissão para gravar nessa tabela do Azure Synapse.

A tabela a seguir resume as permissões necessárias para todas as operações com o PolyBase:

Operação Permissões Permissões ao usar fonte de dados externa
Gravação em lote CONTROLO Consulte Gravação em lote
Gravação em streaming CONTROLO Consulte Gravação de streaming
Lida CONTROLO Ver Ler

Permissões necessárias do Azure Synapse para o PolyBase com a opção de fonte de dados externa

Você pode usar o PolyBase com uma fonte de dados externa pré-provisionada. Consulte o externalDataSource parâmetro em Parâmetros para obter mais informações.

Para usar o PolyBase com uma fonte de dados externa pré-provisionada, o conector do Azure Synapse exige que o usuário da conexão JDBC tenha permissão para executar os seguintes comandos na instância conectada do Azure Synapse:

Para criar uma fonte de dados externa, você deve primeiro criar uma credencial com escopo de banco de dados. Os links a seguir descrevem como criar uma credencial com escopo para entidades de serviço e uma fonte de dados externa para um local ABFS:

Nota

O local da fonte de dados externa deve apontar para um contêiner. O conector não funcionará se o local for um diretório em um contêiner.

A tabela a seguir resume as permissões para operações de gravação do PolyBase com a opção de fonte de dados externa:

Operação Permissões (inserir numa tabela existente) Permissões (inserir numa tabela nova)
Gravação em lote ADMINISTRAR OPERAÇÕES EM MASSA NA BASE DE DADOS

INSERT

CREATE TABLE

ALTERAR QUALQUER ESQUEMA

ALTERAR QUALQUER FONTE DE DADOS EXTERNA

ALTERAR QUALQUER FORMATO DE ARQUIVO EXTERNO
ADMINISTRAR OPERAÇÕES EM MASSA NA BASE DE DADOS

INSERT

CREATE TABLE

ALTERAR QUALQUER ESQUEMA

ALTERAR QUALQUER FONTE DE DADOS EXTERNA

ALTERAR QUALQUER FORMATO DE ARQUIVO EXTERNO
Gravação em streaming ADMINISTRAR OPERAÇÕES EM MASSA NA BASE DE DADOS

INSERT

CREATE TABLE

ALTERAR QUALQUER ESQUEMA

ALTERAR QUALQUER FONTE DE DADOS EXTERNA

ALTERAR QUALQUER FORMATO DE ARQUIVO EXTERNO
ADMINISTRAR OPERAÇÕES EM MASSA NA BASE DE DADOS

INSERT

CREATE TABLE

ALTERAR QUALQUER ESQUEMA

ALTERAR QUALQUER FONTE DE DADOS EXTERNA

ALTERAR QUALQUER FORMATO DE ARQUIVO EXTERNO

A tabela a seguir resume as permissões para operações de leitura do PolyBase com a opção de fonte de dados externa:

Operação Permissões
Lida CREATE TABLE

ALTERAR QUALQUER ESQUEMA

ALTERAR QUALQUER FONTE DE DADOS EXTERNA

ALTERAR QUALQUER FORMATO DE ARQUIVO EXTERNO

Você pode usar esse conector para ler através da API da fonte de dados em notebooks Scala, Python, SQL e R.

Scala
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("externalDataSource", "<your-pre-provisioned-data-source>")
  .option("dbTable", "<your-table-name>")
  .load()
Python
# Get some data from an Azure Synapse table.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("externalDataSource", "<your-pre-provisioned-data-source>") \
  .option("dbTable", "<your-table-name>") \
  .load()
SQL
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>',
  externalDataSource '<your-pre-provisioned-data-source>'
);
R
# Get some data from an Azure Synapse table.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   dbTable = "<your-table-name>",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>"
   externalDataSource = "<your-pre-provisioned-data-source>")

Permissões necessárias do Azure Synapse para a COPY instrução

Nota

Disponível no Databricks Runtime 7.0 e superior.

Quando você usa a COPY instrução, o conector do Azure Synapse requer que o usuário da conexão JDBC tenha permissão para executar os seguintes comandos na instância conectada do Azure Synapse:

Se a tabela de destino não existir no Azure Synapse, a permissão para executar o seguinte comando será necessária, além do comando acima:

A tabela a seguir resume as permissões para gravações em lote e streaming com COPY:

Operação Permissões (inserir numa tabela existente) Permissões (inserir numa tabela nova)
Gravação em lote ADMINISTRAR OPERAÇÕES EM MASSA NA BASE DE DADOS

INSERT
ADMINISTRAR OPERAÇÕES EM MASSA NA BASE DE DADOS

INSERT

CREATE TABLE

ALTERAR NO ESQUEMA :: dbo
Gravação em streaming ADMINISTRAR OPERAÇÕES EM MASSA NA BASE DE DADOS

INSERT
ADMINISTRAR OPERAÇÕES EM MASSA NA BASE DE DADOS

INSERT

CREATE TABLE

ALTERAR NO ESQUEMA :: dbo

Parâmetros

O mapa de parâmetros ou OPTIONS fornecido no Spark SQL suporta as seguintes configurações:

Parâmetro Necessário Predefinição Notas
dbTable Sim, a menos que query seja especificado Sem padrão A tabela a partir da qual criar ou ler no Azure Synapse. Esse parâmetro é necessário ao salvar dados de volta no Azure Synapse.

Você também pode usar {SCHEMA NAME}.{TABLE NAME} para acessar uma tabela em um determinado esquema. Se o nome do esquema não for fornecido, o esquema padrão associado ao usuário JDBC será usado.

A variante suportada dbtable anteriormente foi preterida e será ignorada em versões futuras. Em vez disso, use o nome "camel case".
query Sim, a menos que dbTable seja especificado Sem padrão A consulta a partir da qual ler no Azure Synapse.

Para tabelas referidas na consulta, você também pode usar {SCHEMA NAME}.{TABLE NAME} para acessar uma tabela em um determinado esquema. Se o nome do esquema não for fornecido, o esquema padrão associado ao usuário JDBC será usado.
user Não Sem padrão O nome de usuário do Azure Synapse. Deve ser usado em conjunto com password a opção. Só pode ser usado se o usuário e a senha não forem passados na URL. A aprovação de ambos resultará em um erro.
password Não Sem padrão A senha do Azure Synapse. Deve ser usado em conjunto com user a opção. Só pode ser usado se o usuário e a senha não forem passados na URL. A aprovação de ambos resultará em um erro.
url Sim Sem padrão UM URL JDBC com sqlserver definido como o subprotocolo. É recomendável usar a cadeia de conexão fornecida pelo portal do Azure. Definição
encrypt=true é altamente recomendado, pois permite a criptografia SSL da conexão JDBC. Se user e password forem definidos separadamente, não é necessário incluí-los no URL.
jdbcDriver Não Determinado pelo subprotocolo da URL JDBC O nome da classe do driver JDBC a ser usado. Essa classe deve estar no classpath. Na maioria dos casos, não deve ser necessário especificar essa opção, pois o nome de classe do driver apropriado deve ser determinado automaticamente pelo subprotocolo da URL JDBC.

A variante suportada jdbc_driver anteriormente foi preterida e será ignorada em versões futuras. Em vez disso, use o nome "camel case".
tempDir Sim Sem padrão Um abfss URI. Recomendamos que você use um contêiner de armazenamento de Blob dedicado para o Azure Synapse.

A variante suportada tempdir anteriormente foi preterida e será ignorada em versões futuras. Em vez disso, use o nome "camel case".
tempFormat Não PARQUET O formato no qual salvar arquivos temporários no repositório de blob ao gravar no Azure Synapse. O padrão é PARQUET: nenhum outro valor é permitido no momento.
tempCompression Não SNAPPY O algoritmo de compactação a ser usado para codificar/decodificar temporariamente pelo Spark e pelo Azure Synapse. Os valores atualmente suportados são: UNCOMPRESSED, SNAPPY e GZIP.
forwardSparkAzureStorageCredentials Não false Se trueo , a biblioteca descobrir automaticamente as credenciais que o Spark está usando para se conectar ao contêiner de armazenamento de Blob e encaminhará essas credenciais para o Azure Synapse sobre JDBC. Essas credenciais são enviadas como parte da consulta JDBC. Portanto, é altamente recomendável que você habilite a criptografia SSL da conexão JDBC ao usar essa opção.

A versão atual do conector Synapse do Azure requer (exatamente) um dos forwardSparkAzureStorageCredentials, enableServicePrincipalAuthou useAzureMSI para ser explicitamente definido como true.

A variante suportada forward_spark_azure_storage_credentials anteriormente foi preterida e será ignorada em versões futuras. Em vez disso, use o nome "camel case".
useAzureMSI Não false Se true, a biblioteca especificará IDENTITY = 'Managed Service Identity' e não SECRET para as credenciais de escopo do banco de dados que cria.

A versão atual do conector Synapse do Azure requer (exatamente) um dos forwardSparkAzureStorageCredentials, enableServicePrincipalAuthou useAzureMSI para ser explicitamente definido como true.
enableServicePrincipalAuth Não false Se true, a biblioteca usará as credenciais da entidade de serviço fornecida para se conectar à conta de armazenamento do Azure e ao Azure Synapse Analytics por JDBC.

A versão atual do conector Synapse do Azure requer (exatamente) um dos forwardSparkAzureStorageCredentials, enableServicePrincipalAuthou useAzureMSI para ser explicitamente definido como true.
tableOptions Não CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN Uma cadeia de caracteres usada para especificar opções de tabela ao criar a tabela do Azure Synapse definida através do dbTable. Essa cadeia de caracteres é passada literalmente para a WITH cláusula da instrução SQL emitida contra o CREATE TABLE Azure Synapse.

A variante suportada table_options anteriormente foi preterida e será ignorada em versões futuras. Em vez disso, use o nome "camel case".
preActions Não Sem padrão (string vazia) Uma ; lista separada de comandos SQL a serem executados no Azure Synapse antes de gravar dados na instância do Azure Synapse. Esses comandos SQL são necessários para serem comandos válidos aceitos pelo Azure Synapse.

Se algum desses comandos falhar, ele será tratado como um erro e a operação de gravação não será executada.
postActions Não Sem padrão (string vazia) Uma ; lista separada de comandos SQL a serem executados no Azure Synapse depois que o conector grava dados com êxito na instância do Azure Synapse. Esses comandos SQL são necessários para serem comandos válidos aceitos pelo Azure Synapse.

Se qualquer um desses comandos falhar, ele será tratado como um erro e você obterá uma exceção depois que os dados forem gravados com êxito na instância do Azure Synapse.
maxStrLength Não 256 StringType no Spark é mapeado para o NVARCHAR(maxStrLength) tipo no Azure Synapse. Você pode usar maxStrLength para definir o comprimento da cadeia de caracteres para todas as NVARCHAR(maxStrLength) colunas de tipo que estão na tabela com nome
dbTable no Azure Synapse.

A variante suportada maxstrlength anteriormente foi preterida e será ignorada em versões futuras. Em vez disso, use o nome "camel case".
checkpointLocation Sim Sem padrão Local no DBFS que será usado pelo Structured Streaming para gravar metadados e informações de ponto de verificação. Consulte Recuperando-se de falhas com ponto de verificação no guia de programação de streaming estruturado.
numStreamingTempDirsToKeep Não 0 Indica quantos diretórios temporários (mais recentes) manter para limpeza periódica de microlotes em streaming. Quando definido como 0, a exclusão de diretório é acionada imediatamente após o microlote ser confirmado, caso contrário, desde que o número de microlotes mais recentes seja mantido e o restante dos diretórios seja removido. Use -1 para desativar a limpeza periódica.
applicationName Não Databricks-User-Query A tag da conexão para cada consulta. Se não for especificado ou se o valor for uma cadeia de caracteres vazia, o valor padrão da tag será adicionado à URL JDBC. O valor padrão impede que a ferramenta de Monitoramento do Banco de Dados do Azure gere alertas de injeção de SQL espúrios contra consultas.
maxbinlength Não Sem padrão Controle o comprimento das BinaryType colunas. Este parâmetro é traduzido como VARBINARY(maxbinlength).
identityInsert Não false Configuração para habilita IDENTITY_INSERT o true modo, que insere um valor fornecido pelo DataFrame na coluna de identidade da tabela Sinapse do Azure.

Consulte Inserção explícita de valores em uma coluna IDENTIDADE.
externalDataSource Não Sem padrão Uma fonte de dados externa pré-provisionada para ler dados do Azure Synapse. Uma fonte de dados externa só pode ser usada com o PolyBase e remove o requisito de permissão CONTROL, uma vez que o conector não precisa criar uma credencial com escopo e uma fonte de dados externa para carregar dados.

Por exemplo, o uso e a lista de permissões necessárias ao usar uma fonte de dados externa, consulte Permissões necessárias do Azure Synapse para o PolyBase com a opção de fonte de dados externa.
maxErrors Não 0 O número máximo de linhas que podem ser rejeitadas durante leituras e gravações antes que a operação de carregamento (PolyBase ou COPY) seja cancelada. As linhas rejeitadas serão ignoradas. Por exemplo, se dois em cada dez registos tiverem erros, apenas oito registos serão processados.

Consulte REJECT_VALUE documentação em CREATE EXTERNAL TABLE e MAXERRORS em COPY.

Nota

  • tableOptions, preActions, postActions, e maxStrLength são relevantes somente ao gravar dados do Azure Databricks em uma nova tabela no Azure Synapse.
  • externalDataSource é relevante somente ao ler dados do Azure Synapse e gravar dados do Azure Databricks em uma nova tabela no Azure Synapse com semântica PolyBase. Você não deve especificar outros tipos de autenticação de armazenamento ao usar externalDataSource como forwardSparkAzureStorageCredentials ou useAzureMSI.
  • checkpointLocation e numStreamingTempDirsToKeep são relevantes apenas para gravações de streaming do Azure Databricks para uma nova tabela no Azure Synapse.
  • Embora todos os nomes de opções de fonte de dados não diferenciem maiúsculas de minúsculas, recomendamos que você os especifique em "camel case" para maior clareza.

Pushdown de consulta no Azure Synapse

O conector do Azure Synapse implementa um conjunto de regras de otimização para enviar os seguintes operadores para o Azure Synapse:

  • Filter
  • Project
  • Limit

Os Project operadores e Filter suportam as seguintes expressões:

  • A maioria dos operadores lógicos booleanos
  • Comparações
  • Operações aritméticas básicas
  • Moldes numéricos e de cordas

Para o operador, o pushdown é suportado Limit apenas quando não há nenhuma ordem especificada. Por exemplo:

SELECT TOP(10) * FROM table, mas não SELECT TOP(10) * FROM table ORDER BY col.

Nota

O conector Synapse do Azure não envia expressões que operam em cadeias de caracteres, datas ou carimbos de data/hora.

O pushdown de consulta criado com o conector Synapse do Azure é habilitado por padrão. Você pode desativá-lo definindo spark.databricks.sqldw.pushdown como false.

Gestão temporária de dados

O conector Synapse do Azure não exclui os arquivos temporários que ele cria no contêiner de armazenamento de Blob. Portanto, recomendamos que você exclua periodicamente arquivos temporários no local fornecido pelo tempDir usuário.

Para facilitar a limpeza de dados, o conector Synapse do Azure não armazena arquivos de dados diretamente no tempDir, mas cria um subdiretório do formulário: <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/. Você pode configurar trabalhos periódicos (usando o recurso de trabalhos do Azure Databricks ou de outra forma) para excluir recursivamente quaisquer subdiretórios mais antigos do que um determinado limite (por exemplo, 2 dias), com a suposição de que não pode haver trabalhos do Spark executando mais do que esse limite.

Uma alternativa mais simples é deixar cair periodicamente todo o recipiente e criar um novo com o mesmo nome. Isso requer que você use um contêiner dedicado para os dados temporários produzidos pelo conector Synapse do Azure e que possa encontrar uma janela de tempo na qual você possa garantir que nenhuma consulta envolvendo o conector esteja em execução.

Gerenciamento temporário de objetos

O conector do Azure Synapse automatiza a transferência de dados entre um cluster do Azure Databricks e uma instância do Azure Synapse. Para ler dados de uma tabela do Azure Synapse ou consultar ou gravar dados em uma tabela do Azure Synapse, o conector do Azure Synapse cria objetos temporários, incluindo DATABASE SCOPED CREDENTIAL, EXTERNAL DATA SOURCEEXTERNAL FILE FORMAT, e EXTERNAL TABLE nos bastidores. Esses objetos vivem apenas durante toda a duração do trabalho Spark correspondente e devem ser automaticamente descartados em seguida.

Quando um cluster está executando uma consulta usando o conector Sinapse do Azure, se o processo do driver do Spark falhar ou for reiniciado à força, ou se o cluster for encerrado ou reiniciado à força, os objetos temporários poderão não ser descartados. Para facilitar a identificação e a exclusão manual desses objetos, o conector do Azure Synapse prefixa os nomes de todos os objetos temporários intermediários criados na instância do Azure Synapse com uma marca do formulário: tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>.

Recomendamos que você procure periodicamente objetos vazados usando consultas como as seguintes:

  • SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'

Gerenciamento de tabela de pontos de verificação de streaming

O conector Sinapse do Azure não exclui a tabela de pontos de verificação de streaming que é criada quando uma nova consulta de streaming é iniciada. Esse comportamento é consistente com o checkpointLocation no DBFS. Portanto, recomendamos que você exclua periodicamente as tabelas de pontos de verificação ao mesmo tempo em que remove os locais de pontos de verificação no DBFS para consultas que não serão executadas no futuro ou que já tenham o local do ponto de verificação removido.

Por padrão, todas as tabelas de pontos de verificação têm o nome <prefix>_<query-id>, onde <prefix> é um prefixo configurável com valor databricks_streaming_checkpoint padrão e query_id é um ID de consulta de streaming com _ caracteres removidos. Para localizar todas as tabelas de pontos de verificação para consultas de streaming obsoletas ou excluídas, execute a consulta:

SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

Você pode configurar o prefixo com a opção spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefixde configuração do Spark SQL.

Perguntas mais frequentes (FAQ)

Recebi um erro ao usar o conector Synapse do Azure. Como posso saber se esse erro é do Azure Synapse ou do Azure Databricks?

Para ajudá-lo a depurar erros, qualquer exceção lançada por código específico para o conector Synapse do Azure é encapsulada em uma exceção que estende a SqlDWException característica. As exceções estabelecem igualmente a seguinte distinção:

  • SqlDWConnectorException representa um erro gerado pelo conector Synapse do Azure
  • SqlDWSideException representa um erro gerado pela instância Synapse do Azure conectada

O que devo fazer se minha consulta falhar com o erro "Nenhuma chave de acesso encontrada no conf de sessão ou no conf global do Hadoop"?

Este erro significa que o conector do Azure Synapse não conseguiu encontrar a chave de acesso da conta de armazenamento na configuração da sessão do bloco de notas ou na configuração global do Hadoop para a conta de armazenamento especificada em tempDir. Consulte Uso (lote) para obter exemplos de como configurar o acesso à conta de armazenamento corretamente. Se uma tabela do Spark for criada usando o conector Synapse do Azure, você ainda deverá fornecer as credenciais de acesso da conta de armazenamento para ler ou gravar na tabela do Spark.

Posso utilizar uma Assinatura de Acesso Partilhado (SAS) para aceder ao contentor do Armazenamento de blobs especificado por tempDir?

O Azure Synapse não oferece suporte ao uso de SAS para acessar o armazenamento de Blob. Portanto, o conector Sinapse do Azure não oferece suporte a SAS para acessar o contêiner de armazenamento de Blob especificado pelo tempDir.

Criei uma tabela do Spark usando o conector do Azure Synapse com a dbTable opção, escrevi alguns dados nessa tabela do Spark e larguei essa tabela do Spark. A tabela criada no lado da Sinapse do Azure será descartada?

N.º O Azure Synapse é considerado uma fonte de dados externa. A tabela Azure Synapse com o nome definido não dbTable é descartada quando a tabela Spark é descartada.

Ao escrever um DataFrame no Azure Synapse, por que motivo tenho de indicar .option("dbTable", tableName).save() em vez de apenas .saveAsTable(tableName)?

Isso porque queremos deixar clara a seguinte distinção: .option("dbTable", tableName) refere-se à tabela de banco de dados (ou seja, Azure Synapse), enquanto .saveAsTable(tableName) se refere à tabela Spark. Na verdade, você pode até combinar os dois: df.write. ... .option("dbTable", tableNameDW).saveAsTable(tableNameSpark) que cria uma tabela no Azure Synapse chamada tableNameDW e uma tabela externa no Spark chamada tableNameSpark que é apoiada pela tabela Azure Synapse.

Aviso

Cuidado com a seguinte diferença entre .save() e .saveAsTable():

  • Para df.write. ... .option("dbTable", tableNameDW).mode(writeMode).save(), writeMode atua na tabela Sinapse do Azure, conforme o esperado.
  • Para df.write. ... .option("dbTable", tableNameDW).mode(writeMode).saveAsTable(tableNameSpark), writeMode atua na tabela do Spark, enquanto tableNameDWé substituído silenciosamente se já existir no Azure Synapse.

Esse comportamento não é diferente de gravar em qualquer outra fonte de dados. É apenas uma ressalva da API Spark DataFrameWriter.