Azure Data Explorer-Connector für Apache Spark

Wichtig

Dieser Connector kann in Echtzeitanalyse in Microsoft Fabric verwendet werden. Verwenden Sie die Anweisungen in diesem Artikel mit den folgenden Ausnahmen:

Apache Spark ist eine vereinheitlichte Engine zur Verarbeitung von umfangreichen Daten. Azure Data Explorer ist ein schneller, vollständig verwalteter Datenanalysedienst für Echtzeitanalysen großer Mengen an Daten.

Der Azure Data Explorer-Connector für Spark ist ein Open Source-Projekt, das auf jedem Spark-Cluster ausgeführt werden kann. Er implementiert Datenquellen und Datensenken zum Verschieben von Daten zwischen Azure Data Explorer und Spark-Clustern. Mit Azure Data Explorer und Apache Spark können Sie schnelle und skalierbare Anwendungen für datengesteuerte Szenarien erstellen. Beispiele dafür sind maschinelles Lernen (Machine Learning, ML), Extrahieren, Transformieren und Laden (Extract-Transform-Load, ETL) und Protokollanalysen (Log Analytics). Durch den Connector wird Azure Data Explorer ein gültiger Datenspeicher für Spark-Standardvorgänge für Quellen und Senken wie„write“, „read“ und „writeStream“.

Sie können über die Erfassung in die Warteschlange oder die Streamingerfassung in Azure Data Explorer schreiben. Das Lesen aus Azure Data Explorer unterstützt das Löschen von Spalten und die Prädikatweitergabe. Dabei werden die Daten in Azure Data Explorer gefiltert, wodurch die Menge der übertragenen Daten verringert wird.

Hinweis

Informationen zum Arbeiten mit dem Synapse Spark-Connector für Azure Data Explorer finden Sie unter Verbinden mit Azure Data Explorer mithilfe von Apache Spark für Azure Synapse Analytics.

In diesem Thema wird beschrieben, wie Sie den Azure Data Explorer-Connector für Spark installieren und konfigurieren und Daten zwischen Azure Data Explorer und Apache Spark-Clustern verschieben.

Hinweis

Obwohl sich einige der unten stehenden Beispiele auf einen Azure Databricks Spark-Cluster beziehen, akzeptiert Azure Data Explorer-Connector für Spark keine direkten Abhängigkeiten von Databricks oder einer anderen Spark-Distribution.

Voraussetzungen

Tipp

Spark 2.3.x-Versionen werden ebenfalls unterstützt, erfordern aber möglicherweise Änderungen in den pom.xml-Abhängigkeiten.

Erstellen des Spark-Connectors

Ab Version 2.3.0 werden neue Artefakt-IDs eingeführt, die „spark-kusto-connector“ ersetzen: kusto-spark_3.0_2.12 für Spark 3.x und Scala 2.12 sowie kusto-spark_2.4_2.11 für Spark 2.4.x und Scala 2.11.

Hinweis

Versionen vor 2.5.1 funktionieren nicht mehr für die Erfassung in einer vorhandenen Tabelle. Führen Sie die Aktualisierung auf eine höhere Version durch. Dieser Schritt ist optional. Wenn Sie vorgefertigte Bibliotheken (etwa Maven) verwenden, finden Sie weitere Informationen unter Einrichtung des Spark-Clusters.

Buildvoraussetzungen

  1. Wenn Sie keine vorgefertigten Bibliotheken verwenden, müssen Sie die unter Abhängigkeiten aufgeführten Bibliotheken installieren, einschließlich der folgenden Kusto Java SDK-Bibliotheken. Die richtige Version für die Installation finden Sie in der POM-Datei des entsprechenden Releases:

  2. Ziehe Sie diese Quelle zum Erstellen des Spark-Connectors zurate.

  3. Wenn Sie Scala-/Java-Anwendungen mit Maven-Projektdefinitionen verwenden, verknüpfen Sie Ihre Anwendung mit dem folgenden Artefakt (die neueste Version kann abweichen):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Erstellen von Befehlen

Führen Sie den folgenden Befehl aus, um die JAR-Datei zu erstellen und alle Tests auszuführen:

mvn clean package

Um die JAR-Datei zu erstellen, führen Sie alle Tests aus, und installieren Sie die JAR-Datei in Ihrem lokalen Maven-Repository:

mvn clean install

Weitere Informationen finden Sie unter Connectorverwendung.

Einrichtung des Spark-Clusters

Hinweis

Es wird empfohlen, die aktuelle Version des Azure Data Explorer-Connectors für Spark zu verwenden, wenn Sie die folgenden Schritte ausführen.

  1. Konfigurieren Sie die folgenden Spark-Clustereinstellungen basierend auf dem Azure Databricks-Cluster unter Verwendung von Spark 2.4.4 und Scala 2.11 oder Spark 3.0.1 und Scala 2.12:

    Databricks-Clustereinstellungen

  2. Installieren Sie die neueste Spark-Kusto-Connectorbibliothek von Maven:

    Importieren von BibliothekenAuswählen des Spark-Kusto-Connectors

  3. Überprüfen Sie, ob alle erforderlichen Bibliotheken installiert sind:

    Überprüfen der installierten Bibliotheken

  4. Überprüfen Sie bei der Installation mithilfe einer JAR-Datei, ob zusätzliche Abhängigkeiten installiert wurden:

    Fügen Sie Abhängigkeiten hinzu.

Authentifizierung

Azure Data Explorer Spark-Connector ermöglicht Ihnen die Authentifizierung mit Microsoft Entra-ID mithilfe einer der folgenden Methoden:

Microsoft Entra Anwendungsauthentifizierung

Microsoft Entra Anwendungsauthentifizierung ist die einfachste und am häufigsten verwendete Authentifizierungsmethode und wird für den Azure Data Explorer Spark-Connector empfohlen.

Eigenschaften Optionszeichenfolge BESCHREIBUNG
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra Anwendungsbezeichner (Client).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra Authentifizierungsberechtigung. Microsoft Entra Verzeichnis-ID (Mandant). Optional: Standardmäßig microsoft.com. Weitere Informationen finden Sie unter Microsoft Entra Autorität.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra Anwendungsschlüssel für den Client.

Hinweis

Ältere API-Versionen (vor 2.0.0) haben den folgenden Namen: „kustoAADClientID“, „kustoClientAADClientPassword“, „kustoAADAuthorityID“.

Azure Data Explorer-Berechtigungen

Erteilen Sie die folgenden Berechtigungen für einen Azure Data Explorer-Cluster:

  • Zum Lesen (Datenquelle) muss die Microsoft Entra Identität über Viewerberechtigungen für die Zieldatenbank oder Administratorrechte für die Zieltabelle verfügen.
  • Zum Schreiben (Datensenke) muss die Microsoft Entra Identität über Ingestorberechtigungen für die Zieldatenbank verfügen. Außerdem benötigt sie Benutzerberechtigungen für die Zieldatenbank, um neue Tabellen zu erstellen. Wenn die Zieltabelle bereits vorhanden ist, müssen Sie Administratorberechtigungen für die Zieltabelle konfigurieren.

Weitere Informationen zu Azure Data Explorer Prinzipalrollen finden Sie unter Rollenbasierte Zugriffssteuerung. Informationen zum Verwalten von Sicherheitsrollen finden Sie unter Sicherheitsrollenverwaltung.

Spark-Senke: Schreiben in Azure Data Explorer

  1. Einrichten von Senkenparametern:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Schreiben von Spark DataFrame in Azure Data Explorer-Cluster als Batch:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Oder verwenden Sie die vereinfachte Syntax:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Schreiben von Streamingdaten:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark-Senke: Lesen aus Azure Data Explorer

  1. Wenn kleine Datenmengen gelesen werden, definieren Sie die Datenabfrage:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Optional: Wenn Sie den temporären Blobspeicher (und nicht Azure Data Explorer) bereitstellen, werden die Blobs in der Verantwortung des Aufrufers erstellt. Dies umfasst das Bereitstellen des Speichers, das Rotieren von Zugriffsschlüsseln und das Löschen temporärer Artefakte. Das KustoBlobStorageUtils-Modul enthält Hilfsfunktionen zum Löschen von Blobs, die entweder auf Konto- und Containerkoordinaten und Kontoanmeldeinformationen oder einer vollständigen SAS-URL mit Schreib-, Lese- und Listenberechtigungen basieren. Wenn das entsprechende RDD nicht mehr benötigt wird, speichert jede Transaktion temporäre Blobartefakte in einem separaten Verzeichnis. Dieses Verzeichnis wird als Teil der Lesetransaktionsinformationsprotokolle erfasst, die auf dem Spark-Treiberknoten gemeldet werden.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    Im obigen Beispiel erfolgt der Zugriff auf den Schlüsseltresor nicht über die Connectorschnittstelle. Es wird eine einfachere Methode der Verwendung der Databricks-Geheimnisse verwendet.

  3. Lesen aus Azure Data Explorer.

    • Wenn Sie den temporären Blobspeicher bereitstellen, lesen Sie die Daten aus Azure Data Explorer wie folgt:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Wenn Azure Data Explorer den temporären Blobspeicher bereitstellt, lesen Sie die Daten aus Azure Data Explorer wie folgt:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)