Conector do Azure Data Explorer para o Apache Spark

Importante

Esse conector pode ser usado na Análise em Tempo Real no Microsoft Fabric. Use as instruções neste artigo com as seguintes exceções:

O Apache Spark é um mecanismo de análise unificado para processamento de dados em grande escala. O Azure Data Explorer é um serviço de análise de dados rápido e totalmente gerenciado para análise em tempo real de grandes volumes de dados.

O conector do Azure Data Explorer para Spark é um projeto de código aberto que pode ser executado em qualquer cluster do Spark. Ele implementa a fonte de dados e o coletor de dados para mover dados entre o Azure Data Explorer e os clusters do Spark. Usando o Azure Data Explorer e o Apache Spark, você pode criar aplicativos rápidos e escalonáveis direcionados a cenários baseados em dados. Por exemplo, ML (aprendizado de máquina), ETL (Extração, Transformação e Carregamento) e Log Analytics. Com o conector, o Azure Data Explorer se torna um armazenamento de dados válido para operações de origem e coletor padrão do Spark, como Gravação, Leitura e writeStream.

Você pode gravar no Azure Data Explorer por meio de ingestão na fila ou ingestão de streaming. A leitura do Azure Data Explorer dá suporte à remoção de colunas e à aplicação de predicado, que filtra os dados no Azure Data Explorer, reduzindo o volume de dados transferidos.

Observação

Para obter informações sobre como trabalhar com o conector do Synapse Spark para o Azure Data Explorer, confira Conectar-se ao Azure Data Explorer usando o Apache Spark para o Azure Synapse Analytics.

Este tópico descreve como instalar e configurar o conector Spark do Azure Data Explorer e mover dados entre os clusters do Azure Data Explorer e Apache Spark.

Observação

Embora alguns dos exemplos abaixo se refiram a um cluster Spark do Azure Databricks, o conector Spark do Azure Data Explorer não tem dependências diretas do Databricks ou qualquer outra distribuição do Spark.

Pré-requisitos

Dica

As versões do Spark 2.3. x também têm suporte, mas podem exigir algumas alterações nas dependências de pom.xml.

Como compilar o conector Spark

A partir da versão 2.3.0 introduzimos novas IDs de artefato substituindo o spark-kusto-connector: kusto-spark_3.0_2.12 direcionando o Spark 3.x e Scala 2.12 e kusto-spark_2.4_2.11 direcionando Spark 2.4.x e scala 2.11.

Observação

As versões anteriores à 2.5.1 não funcionam mais para ingestão em uma tabela existente, atualize para uma versão posterior. Esta etapa é opcional. Se você estiver usando bibliotecas pré-compiladas, por exemplo, Maven, confira Configuração do cluster do Spark.

Pré-requisitos de compilação

  1. Se você não estiver usando bibliotecas pré-compiladas, precisará instalar as bibliotecas listadas em dependências, incluindo as seguintes bibliotecas do SDK do Java Kusto. Para encontrar a versão correta a ser instalada, examine o POM da versão relevante:

  2. Consulte esta fonte para compilar o conector Spark.

  3. Para aplicativos Scala/Java usando definições de projeto Maven, vincule seu aplicativo com o seguinte artefato (a versão mais recente pode ser diferente):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Criar comandos

Para criar o jar e executar todos os testes:

mvn clean package

Para criar o jar, execute todos os testes e instale o jar no seu repositório Maven local:

mvn clean install

Para obter mais informações, confira uso do conector.

Instalação de cluster do Spark

Observação

É recomendável usar a versão mais recente do conector Spark do Azure Data Explorer ao executar as etapas a seguir.

  1. Defina as seguintes configurações de cluster do Spark, com base no cluster Azure Databricks usando Spark 2.4.4 e Scala 2.11 ou Spark 3.0.1 e Scala 2.12:

    Configurações de cluster do Databricks.

  2. Instale a biblioteca mais recente do Spark-Kusto-Connector do Maven:

    Importe as bibliotecas.Selecione Spark-Kusto-Connector.

  3. Verifique se todas as bibliotecas necessárias estão instaladas:

    Verifique as bibliotecas instaladas.

  4. Para a instalação usando um arquivo JAR, verifique se foram instaladas dependências adicionais:

    Adicionar dependências.

Autenticação

O conector do Azure Data Explorer Spark permite que você se autentique com Microsoft Entra ID usando um dos seguintes métodos:

autenticação de aplicativo Microsoft Entra

Microsoft Entra autenticação de aplicativo é o método de autenticação mais simples e comum e é recomendado para o conector spark do Azure Data Explorer.

Propriedades Cadeia de caracteres de opção Descrição
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra identificador de aplicativo (cliente).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra autoridade de autenticação. Microsoft Entra ID do Diretório (locatário). Opcional – o padrão é microsoft.com. Para obter mais informações, consulte Microsoft Entra autoridade.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra chave do aplicativo para o cliente.

Observação

Versões de API mais antigas (menos de 2.0.0) têm a seguinte nomenclatura: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Privilégios do Azure Data Explorer

Conceda os seguintes privilégios em um cluster do Azure Data Explorer:

  • Para leitura (fonte de dados), a identidade Microsoft Entra deve ter privilégios de visualizador no banco de dados de destino ou privilégios de administrador na tabela de destino.
  • Para gravar (coletor de dados), a identidade Microsoft Entra deve ter privilégios de ingestão no banco de dados de destino. Ele também deve ter privilégios de usuário no banco de dados de destino para criar novas tabelas. Se a tabela de destino já existir, você deverá configurar privilégios de administrador na tabela de destino.

Para obter mais informações sobre as funções principais do Azure Data Explorer, confira Controle de acesso baseado em função. Para gerenciar funções de segurança, consulte Gerenciamento de funções de segurança.

Coletor Spark: gravando no Azure Data Explorer

  1. Configurar parâmetros do coletor:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Grave o DataFrame do Spark no cluster do Azure Data Explorer como lote:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Ou use a sintaxe simplificada:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Grave os dados de streaming:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Origem do Spark: lendo do Azure Data Explorer

  1. Ao ler pequenas quantidades de dados, defina a consulta de dados:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Opcional: se você fornecer o armazenamento de blob transitório (e não o Azure Data Explorer), os blobs serão criados sob a responsabilidade do chamador. Isso inclui o provisionamento do armazenamento, a rotação de chaves de acesso e a exclusão de artefatos transitórios. O módulo KustoBlobStorageUtils contém funções auxiliares para excluir blobs com base em coordenadas de conta e contêiner e credenciais de conta, ou uma URL SAS completa com permissões de gravação, leitura e lista. Quando o RDD correspondente não é mais necessário, cada transação armazena artefatos de blob transitórios em um diretório separado. Esse diretório é capturado como parte dos logs de informações da transação de leitura relatados no nó do driver Spark.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    No exemplo acima, o Key Vault não é acessado usando a interface do conector; um método mais simples de usar os segredos do Databricks é usado.

  3. Leia do Azure Data Explorer.

    • Se você fornecer o armazenamento de blob transitório, leia do Azure Data Explorer da seguinte maneira:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Se o Azure Data Explorer fornecer o armazenamento de blob transitório, leia do Azure Data Explorer da seguinte maneira:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)