Conector do Azure Data Explorer para o Apache Spark
Importante
Esse conector pode ser usado na Análise em Tempo Real no Microsoft Fabric. Use as instruções neste artigo com as seguintes exceções:
- Se necessário, crie bancos de dados usando as instruções em Criar um banco de dados KQL.
- Se necessário, crie tabelas usando as instruções em Criar uma tabela vazia.
- Obtenha URIs de consulta ou ingestão usando as instruções em Copiar URI.
- Execute consultas em um conjunto de consultas KQL.
O Apache Spark é um mecanismo de análise unificado para processamento de dados em grande escala. O Azure Data Explorer é um serviço de análise de dados rápido e totalmente gerenciado para análise em tempo real de grandes volumes de dados.
O conector do Azure Data Explorer para Spark é um projeto de código aberto que pode ser executado em qualquer cluster do Spark. Ele implementa a fonte de dados e o coletor de dados para mover dados entre o Azure Data Explorer e os clusters do Spark. Usando o Azure Data Explorer e o Apache Spark, você pode criar aplicativos rápidos e escalonáveis direcionados a cenários baseados em dados. Por exemplo, ML (aprendizado de máquina), ETL (Extração, Transformação e Carregamento) e Log Analytics. Com o conector, o Azure Data Explorer se torna um armazenamento de dados válido para operações de origem e coletor padrão do Spark, como Gravação, Leitura e writeStream.
Você pode gravar no Azure Data Explorer por meio de ingestão na fila ou ingestão de streaming. A leitura do Azure Data Explorer dá suporte à remoção de colunas e à aplicação de predicado, que filtra os dados no Azure Data Explorer, reduzindo o volume de dados transferidos.
Observação
Para obter informações sobre como trabalhar com o conector do Synapse Spark para o Azure Data Explorer, confira Conectar-se ao Azure Data Explorer usando o Apache Spark para o Azure Synapse Analytics.
Este tópico descreve como instalar e configurar o conector Spark do Azure Data Explorer e mover dados entre os clusters do Azure Data Explorer e Apache Spark.
Observação
Embora alguns dos exemplos abaixo se refiram a um cluster Spark do Azure Databricks, o conector Spark do Azure Data Explorer não tem dependências diretas do Databricks ou qualquer outra distribuição do Spark.
Pré-requisitos
- Uma assinatura do Azure. Criar uma conta gratuita do Azure.
- Um cluster e um banco de dados do Azure Data Explorer. Crie um cluster e um banco de dados.
- Um cluster do Spark
- Instale a biblioteca do conector de Data Explorer do Azure:
- Bibliotecas pré-compiladas para Spark 2.4+Scala 2.11 ou Spark 3+scala 2.12
- Repositório Maven
- Maven 3.x instalado
Dica
As versões do Spark 2.3. x também têm suporte, mas podem exigir algumas alterações nas dependências de pom.xml.
Como compilar o conector Spark
A partir da versão 2.3.0 introduzimos novas IDs de artefato substituindo o spark-kusto-connector: kusto-spark_3.0_2.12 direcionando o Spark 3.x e Scala 2.12 e kusto-spark_2.4_2.11 direcionando Spark 2.4.x e scala 2.11.
Observação
As versões anteriores à 2.5.1 não funcionam mais para ingestão em uma tabela existente, atualize para uma versão posterior. Esta etapa é opcional. Se você estiver usando bibliotecas pré-compiladas, por exemplo, Maven, confira Configuração do cluster do Spark.
Pré-requisitos de compilação
Se você não estiver usando bibliotecas pré-compiladas, precisará instalar as bibliotecas listadas em dependências, incluindo as seguintes bibliotecas do SDK do Java Kusto. Para encontrar a versão correta a ser instalada, examine o POM da versão relevante:
Consulte esta fonte para compilar o conector Spark.
Para aplicativos Scala/Java usando definições de projeto Maven, vincule seu aplicativo com o seguinte artefato (a versão mais recente pode ser diferente):
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
Criar comandos
Para criar o jar e executar todos os testes:
mvn clean package
Para criar o jar, execute todos os testes e instale o jar no seu repositório Maven local:
mvn clean install
Para obter mais informações, confira uso do conector.
Instalação de cluster do Spark
Observação
É recomendável usar a versão mais recente do conector Spark do Azure Data Explorer ao executar as etapas a seguir.
Defina as seguintes configurações de cluster do Spark, com base no cluster Azure Databricks usando Spark 2.4.4 e Scala 2.11 ou Spark 3.0.1 e Scala 2.12:
Instale a biblioteca mais recente do Spark-Kusto-Connector do Maven:
Verifique se todas as bibliotecas necessárias estão instaladas:
Para a instalação usando um arquivo JAR, verifique se foram instaladas dependências adicionais:
Autenticação
O conector do Azure Data Explorer Spark permite que você se autentique com Microsoft Entra ID usando um dos seguintes métodos:
- Um aplicativo Microsoft Entra
- Um token de acesso Microsoft Entra
- Autenticação do dispositivo (para cenários de produção)
- Um Azure Key Vault acessar o recurso Key Vault, instalar o pacote azure-keyvault e fornecer credenciais de aplicativo.
autenticação de aplicativo Microsoft Entra
Microsoft Entra autenticação de aplicativo é o método de autenticação mais simples e comum e é recomendado para o conector spark do Azure Data Explorer.
Propriedades | Cadeia de caracteres de opção | Descrição |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra identificador de aplicativo (cliente). |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra autoridade de autenticação. Microsoft Entra ID do Diretório (locatário). Opcional – o padrão é microsoft.com. Para obter mais informações, consulte Microsoft Entra autoridade. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra chave do aplicativo para o cliente. |
Observação
Versões de API mais antigas (menos de 2.0.0) têm a seguinte nomenclatura: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"
Privilégios do Azure Data Explorer
Conceda os seguintes privilégios em um cluster do Azure Data Explorer:
- Para leitura (fonte de dados), a identidade Microsoft Entra deve ter privilégios de visualizador no banco de dados de destino ou privilégios de administrador na tabela de destino.
- Para gravar (coletor de dados), a identidade Microsoft Entra deve ter privilégios de ingestão no banco de dados de destino. Ele também deve ter privilégios de usuário no banco de dados de destino para criar novas tabelas. Se a tabela de destino já existir, você deverá configurar privilégios de administrador na tabela de destino.
Para obter mais informações sobre as funções principais do Azure Data Explorer, confira Controle de acesso baseado em função. Para gerenciar funções de segurança, consulte Gerenciamento de funções de segurança.
Coletor Spark: gravando no Azure Data Explorer
Configurar parâmetros do coletor:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Grave o DataFrame do Spark no cluster do Azure Data Explorer como lote:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Ou use a sintaxe simplificada:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Grave os dados de streaming:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Origem do Spark: lendo do Azure Data Explorer
Ao ler pequenas quantidades de dados, defina a consulta de dados:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Opcional: se você fornecer o armazenamento de blob transitório (e não o Azure Data Explorer), os blobs serão criados sob a responsabilidade do chamador. Isso inclui o provisionamento do armazenamento, a rotação de chaves de acesso e a exclusão de artefatos transitórios. O módulo KustoBlobStorageUtils contém funções auxiliares para excluir blobs com base em coordenadas de conta e contêiner e credenciais de conta, ou uma URL SAS completa com permissões de gravação, leitura e lista. Quando o RDD correspondente não é mais necessário, cada transação armazena artefatos de blob transitórios em um diretório separado. Esse diretório é capturado como parte dos logs de informações da transação de leitura relatados no nó do driver Spark.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
No exemplo acima, o Key Vault não é acessado usando a interface do conector; um método mais simples de usar os segredos do Databricks é usado.
Leia do Azure Data Explorer.
Se você fornecer o armazenamento de blob transitório, leia do Azure Data Explorer da seguinte maneira:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Se o Azure Data Explorer fornecer o armazenamento de blob transitório, leia do Azure Data Explorer da seguinte maneira:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Conteúdo relacionado
Comentários
https://aka.ms/ContentUserFeedback.
Em breve: Ao longo de 2024, eliminaremos os problemas do GitHub como o mecanismo de comentários para conteúdo e o substituiremos por um novo sistema de comentários. Para obter mais informações, consulteEnviar e exibir comentários de