Share via


Connecteur Azure Data Explorer pour Apache Spark

Important

Ce connecteur peut être utilisé dans l’analyse en temps réel dans Microsoft Fabric. Utilisez les instructions de cet article avec les exceptions suivantes :

Apache Spark est un moteur d’analytique unifié pour le traitement des données à grande échelle. Azure Data Explorer est un service d’analytique données rapide et entièrement managé dédié à l’analyse en temps réel de volumes importants de données de streaming.

Le connecteur Azure Data Explorer pour Spark est un projet open source qui peut s’exécuter sur n’importe quel cluster Spark. Il implémente la source de données et le récepteur de données pour déplacer les données entre les clusters Azure Data Explorer et Spark. À l’aide d’Azure Data Explorer et d’Apache Spark, vous pouvez rapidement créer des applications scalables ciblant des scénarios basés sur les données. Par exemple, les scénarios de machine learning (ML), les scénarios ETL et les scénarios Log Analytics. Avec le connecteur, Azure Data Explorer devient un magasin de données valide pour les opérations de source et de réception Spark standard, telles que write, read et writeStream.

Vous pouvez écrire dans Azure Data Explorer via l’ingestion en file d’attente ou l’ingestion en streaming. La fonctionnalité de lecture dans Azure Data Explorer prend en charge le nettoyage des colonnes et le pushdown des prédicats, ce qui filtre les données d’Azure Data Explorer et réduit le volume de données transférées.

Notes

Pour plus d’informations sur l’utilisation du connecteur Synapse Spark pour Azure Data Explorer, consultez Connexion à Azure Data Explorer à l’aide d’Apache Spark pour Azure Synapse Analytics.

Cette rubrique explique comment installer et configurer le connecteur Azure Data Explorer pour Spark, et comment déplacer des données entre Azure Data Explorer et les clusters Apache Spark.

Notes

Bien que certains des exemples ci-dessous fassent référence à un cluster Spark Azure Databricks, le connecteur Spark Azure Data Explorer ne dépend pas directement de ce cluster, ni d’aucune autre distribution Spark.

Prérequis

Conseil

Les versions 2.3.x de Spark sont également prises en charge, mais il peut s’avérer nécessaire de modifier certaines dépendances du fichier pom.xml.

Comment créer un connecteur Spark

À partir de la version 2.3.0, nous introduisons de nouveaux ID d’artefact qui remplacent spark-kusto-connector : kusto-spark_3.0_2.12 ciblant Spark 3.x et Scala 2.12, et kusto-spark_2.4_2.11 ciblant Spark 2.4.x et Scala 2.11.

Notes

Les versions antérieures à la version 2.5.1 ne fonctionnent plus pour l’ingestion dans une table existante, effectuez une mise à jour vers une version ultérieure. Cette étape est facultative. Si vous utilisez des bibliothèques prédéfinies, par exemple Maven, consultez Configuration des clusters Spark.

Configuration requise

  1. Si vous n’utilisez pas de bibliothèques prédéfinies, vous devez installer les bibliothèques listées dans dépendencies, y compris les bibliothèques Kusto Java SDK suivantes. Pour trouver la bonne version à installer, regardez dans le fichier pom de la version appropriée :

  2. Reportez-vous à cette source pour la création du connecteur Spark.

  3. Pour les applications Scala/Java utilisant des définitions de projet Maven, liez votre application à l’artefact suivant (la dernière version peut être différente) :

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Commandes de génération

Pour générer un fichier jar et exécuter tous les tests :

mvn clean package

Pour générer un fichier jar, exécutez tous les tests et installez le fichier jar dans votre référentiel Maven local :

mvn clean install

Pour en savoir plus, consultez la section relative à l’utilisation des connecteurs.

Configuration du cluster Spark

Notes

Il est recommandé d’utiliser la dernière version du connecteur Spark Azure Data Explorer lorsque vous effectuez les étapes suivantes :

  1. Configurez les paramètres suivants du cluster Spark en fonction du cluster Azure Databricks, en utilisant Spark 2.4.4 et Scala 2.11 ou Spark 3.0.1 et Scala 2.12 :

    Paramètres de cluster Databricks.

  2. Installez la dernière bibliothèque spark-kusto-connector à partir de Maven :

    Importer des bibliothèques.Sélectionner Spark-Kusto-Connector.

  3. Vérifiez que toutes les bibliothèques requises sont installées :

    Vérifier l’installation des bibliothèques.

  4. Si vous souhaitez effectuer une installation à l’aide d’un fichier JAR, vérifiez que des dépendances supplémentaires ont été installées :

    Ajoutez des dépendances.

Authentification

Le connecteur Spark Azure Data Explorer vous permet de vous authentifier avec Microsoft Entra ID à l’aide de l’une des méthodes suivantes :

Authentification de l’application Microsoft Entra

Microsoft Entra’authentification d’application est la méthode d’authentification la plus simple et la plus courante et est recommandée pour le connecteur Azure Data Explorer Spark.

Propriétés Chaîne d’option Description
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra’identificateur d’application (client).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra autorité d’authentification. MICROSOFT ENTRA’ID d’annuaire (locataire). Facultatif : par défaut, microsoft.com. Pour plus d’informations, consultez Microsoft Entra autorité.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra clé d’application pour le client.

Notes

Les anciennes versions d’API (inférieures à 2.0.0) ont le nommage suivant : "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Privilèges Azure Data Explorer

Accordez les privilèges suivants sur un cluster Azure Data Explorer :

  • Pour la lecture (source de données), l’identité Microsoft Entra doit avoir des privilèges de visionneuse sur la base de données cible ou des privilèges d’administrateur sur la table cible.
  • Pour l’écriture (récepteur de données), l’identité Microsoft Entra doit avoir des privilèges d’ingestion sur la base de données cible. Elle doit également bénéficier de privilèges d’utilisateur sur la base de données cible pour pouvoir créer des tables. Si la table cible existe déjà, vous devez configurer des privilèges d’administrateur pour celle-ci.

Pour plus d’informations sur azure Data Explorer rôles principaux, consultez Contrôle d’accès en fonction du rôle. Pour savoir comment gérer les rôles de sécurité, consultez la section relative à la gestion des rôles de sécurité.

Récepteur Spark : écriture de données dans Azure Data Explorer

  1. Configurez les paramètres du récepteur :

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Écrivez la trame de données Spark dans le cluster Azure Data Explorer en tant que lot :

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Ou utilisez la syntaxe simplifiée :

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Écrivez les données de diffusion en continu :

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Source Spark : lecture des données depuis Azure Data Explorer

  1. Lors de la lecture de petites quantités de données, définissez la requête de données :

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Facultatif : Si c’est vous qui fournissez le stockage des objets blob temporaires (et non Azure Data Explorer), les objets blob seront créés sous la responsabilité de l’appelant. Cela comprend le provisionnement du stockage, la rotation des clés d’accès et la suppression des artefacts temporaires. Le module KustoBlobStorageUtils contient des fonctions d’assistance permettant de supprimer des objets blob en fonction des coordonnées du compte et du conteneur, et des informations d’identification du compte, ou en fonction d’une URL SAS complète disposant d’autorisations d’écriture, de lecture et de liste. Lorsque le jeu de données RDD correspondant n’est plus nécessaire, chaque transaction stocke les artefacts d’objets blob temporaires dans un répertoire distinct. Ce répertoire est capturé dans les journaux des transactions de lecture signalés sur le nœud du pilote Spark.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    Dans l’exemple ci-dessus, le coffre de clés n’est pas accessible à l’aide de l’interface du connecteur. Pour l’utilisation des secrets Databricks, une méthode plus simple est utilisée.

  3. Lisez les données dans Azure Data Explorer.

    • Si c’est vous qui fournissez le stockage des objets blob temporaires, lisez les données à partir d’Azure Data Explorer de la façon suivante :

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Si c’est Azure Data Explorer qui fournit le stockage des objets blob temporaires, lisez les données à partir d’Azure Data Explorer de la façon suivante :

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)