Connecteur Azure Data Explorer pour Apache Spark
Important
Ce connecteur peut être utilisé dans l’analyse en temps réel dans Microsoft Fabric. Utilisez les instructions de cet article avec les exceptions suivantes :
- Si nécessaire, créez des bases de données en suivant les instructions fournies dans Créer une base de données KQL.
- Si nécessaire, créez des tables en suivant les instructions fournies dans Créer une table vide.
- Obtenez des URI de requête ou d’ingestion à l’aide des instructions fournies dans Copier l’URI.
- Exécutez des requêtes dans un ensemble de requêtes KQL.
Apache Spark est un moteur d’analytique unifié pour le traitement des données à grande échelle. Azure Data Explorer est un service d’analytique données rapide et entièrement managé dédié à l’analyse en temps réel de volumes importants de données de streaming.
Le connecteur Azure Data Explorer pour Spark est un projet open source qui peut s’exécuter sur n’importe quel cluster Spark. Il implémente la source de données et le récepteur de données pour déplacer les données entre les clusters Azure Data Explorer et Spark. À l’aide d’Azure Data Explorer et d’Apache Spark, vous pouvez rapidement créer des applications scalables ciblant des scénarios basés sur les données. Par exemple, les scénarios de machine learning (ML), les scénarios ETL et les scénarios Log Analytics. Avec le connecteur, Azure Data Explorer devient un magasin de données valide pour les opérations de source et de réception Spark standard, telles que write, read et writeStream.
Vous pouvez écrire dans Azure Data Explorer via l’ingestion en file d’attente ou l’ingestion en streaming. La fonctionnalité de lecture dans Azure Data Explorer prend en charge le nettoyage des colonnes et le pushdown des prédicats, ce qui filtre les données d’Azure Data Explorer et réduit le volume de données transférées.
Notes
Pour plus d’informations sur l’utilisation du connecteur Synapse Spark pour Azure Data Explorer, consultez Connexion à Azure Data Explorer à l’aide d’Apache Spark pour Azure Synapse Analytics.
Cette rubrique explique comment installer et configurer le connecteur Azure Data Explorer pour Spark, et comment déplacer des données entre Azure Data Explorer et les clusters Apache Spark.
Notes
Bien que certains des exemples ci-dessous fassent référence à un cluster Spark Azure Databricks, le connecteur Spark Azure Data Explorer ne dépend pas directement de ce cluster, ni d’aucune autre distribution Spark.
Prérequis
- Un abonnement Azure. Créez un compte Azure gratuit.
- Un cluster et une base de données Azure Data Explorer. Créez un cluster et une base de données.
- Un cluster Spark
- Installez la bibliothèque du connecteur Azure Data Explorer :
- Bibliothèques prédéfinies pour Spark 2.4+Scala 2.11 ou Spark 3+scala 2.12
- Dépôt Maven
- Installation de Maven 3.x
Conseil
Les versions 2.3.x de Spark sont également prises en charge, mais il peut s’avérer nécessaire de modifier certaines dépendances du fichier pom.xml.
Comment créer un connecteur Spark
À partir de la version 2.3.0, nous introduisons de nouveaux ID d’artefact qui remplacent spark-kusto-connector : kusto-spark_3.0_2.12 ciblant Spark 3.x et Scala 2.12, et kusto-spark_2.4_2.11 ciblant Spark 2.4.x et Scala 2.11.
Notes
Les versions antérieures à la version 2.5.1 ne fonctionnent plus pour l’ingestion dans une table existante, effectuez une mise à jour vers une version ultérieure. Cette étape est facultative. Si vous utilisez des bibliothèques prédéfinies, par exemple Maven, consultez Configuration des clusters Spark.
Configuration requise
Si vous n’utilisez pas de bibliothèques prédéfinies, vous devez installer les bibliothèques listées dans dépendencies, y compris les bibliothèques Kusto Java SDK suivantes. Pour trouver la bonne version à installer, regardez dans le fichier pom de la version appropriée :
Reportez-vous à cette source pour la création du connecteur Spark.
Pour les applications Scala/Java utilisant des définitions de projet Maven, liez votre application à l’artefact suivant (la dernière version peut être différente) :
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
Commandes de génération
Pour générer un fichier jar et exécuter tous les tests :
mvn clean package
Pour générer un fichier jar, exécutez tous les tests et installez le fichier jar dans votre référentiel Maven local :
mvn clean install
Pour en savoir plus, consultez la section relative à l’utilisation des connecteurs.
Configuration du cluster Spark
Notes
Il est recommandé d’utiliser la dernière version du connecteur Spark Azure Data Explorer lorsque vous effectuez les étapes suivantes :
Configurez les paramètres suivants du cluster Spark en fonction du cluster Azure Databricks, en utilisant Spark 2.4.4 et Scala 2.11 ou Spark 3.0.1 et Scala 2.12 :
Installez la dernière bibliothèque spark-kusto-connector à partir de Maven :
Vérifiez que toutes les bibliothèques requises sont installées :
Si vous souhaitez effectuer une installation à l’aide d’un fichier JAR, vérifiez que des dépendances supplémentaires ont été installées :
Authentification
Le connecteur Spark Azure Data Explorer vous permet de vous authentifier avec Microsoft Entra ID à l’aide de l’une des méthodes suivantes :
- Une application Microsoft Entra
- Jeton d’accès Microsoft Entra
- Avec l’authentification de l’appareil (pour les scénarios autres que les scénarios de production)
- Un coffre de clés Azure Key Vault Pour accéder à la ressource Key Vault, installez le package azure-keyvault et entrez les informations d’identification de l’application.
Authentification de l’application Microsoft Entra
Microsoft Entra’authentification d’application est la méthode d’authentification la plus simple et la plus courante et est recommandée pour le connecteur Azure Data Explorer Spark.
Propriétés | Chaîne d’option | Description |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra’identificateur d’application (client). |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra autorité d’authentification. MICROSOFT ENTRA’ID d’annuaire (locataire). Facultatif : par défaut, microsoft.com. Pour plus d’informations, consultez Microsoft Entra autorité. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra clé d’application pour le client. |
Notes
Les anciennes versions d’API (inférieures à 2.0.0) ont le nommage suivant : "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"
Privilèges Azure Data Explorer
Accordez les privilèges suivants sur un cluster Azure Data Explorer :
- Pour la lecture (source de données), l’identité Microsoft Entra doit avoir des privilèges de visionneuse sur la base de données cible ou des privilèges d’administrateur sur la table cible.
- Pour l’écriture (récepteur de données), l’identité Microsoft Entra doit avoir des privilèges d’ingestion sur la base de données cible. Elle doit également bénéficier de privilèges d’utilisateur sur la base de données cible pour pouvoir créer des tables. Si la table cible existe déjà, vous devez configurer des privilèges d’administrateur pour celle-ci.
Pour plus d’informations sur azure Data Explorer rôles principaux, consultez Contrôle d’accès en fonction du rôle. Pour savoir comment gérer les rôles de sécurité, consultez la section relative à la gestion des rôles de sécurité.
Récepteur Spark : écriture de données dans Azure Data Explorer
Configurez les paramètres du récepteur :
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Écrivez la trame de données Spark dans le cluster Azure Data Explorer en tant que lot :
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Ou utilisez la syntaxe simplifiée :
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Écrivez les données de diffusion en continu :
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Source Spark : lecture des données depuis Azure Data Explorer
Lors de la lecture de petites quantités de données, définissez la requête de données :
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Facultatif : Si c’est vous qui fournissez le stockage des objets blob temporaires (et non Azure Data Explorer), les objets blob seront créés sous la responsabilité de l’appelant. Cela comprend le provisionnement du stockage, la rotation des clés d’accès et la suppression des artefacts temporaires. Le module KustoBlobStorageUtils contient des fonctions d’assistance permettant de supprimer des objets blob en fonction des coordonnées du compte et du conteneur, et des informations d’identification du compte, ou en fonction d’une URL SAS complète disposant d’autorisations d’écriture, de lecture et de liste. Lorsque le jeu de données RDD correspondant n’est plus nécessaire, chaque transaction stocke les artefacts d’objets blob temporaires dans un répertoire distinct. Ce répertoire est capturé dans les journaux des transactions de lecture signalés sur le nœud du pilote Spark.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
Dans l’exemple ci-dessus, le coffre de clés n’est pas accessible à l’aide de l’interface du connecteur. Pour l’utilisation des secrets Databricks, une méthode plus simple est utilisée.
Lisez les données dans Azure Data Explorer.
Si c’est vous qui fournissez le stockage des objets blob temporaires, lisez les données à partir d’Azure Data Explorer de la façon suivante :
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Si c’est Azure Data Explorer qui fournit le stockage des objets blob temporaires, lisez les données à partir d’Azure Data Explorer de la façon suivante :
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Contenu connexe
Commentaires
https://aka.ms/ContentUserFeedback.
Bientôt disponible : Tout au long de 2024, nous allons supprimer progressivement GitHub Issues comme mécanisme de commentaires pour le contenu et le remplacer par un nouveau système de commentaires. Pour plus d’informations, consultezEnvoyer et afficher des commentaires pour