Azure Data Explorer Connector för Apache Spark

Apache Spark är en enhetlig analysmotor för storskalig databearbetning. Azure Data Explorer är en snabb, fullständigt hanterad dataanalystjänst för realtidsanalys av stora mängder data.

Anslutningsappen Azure Data Explorer Spark är ett projekt med öppen källkod som kan köras i alla Spark-kluster. Den implementerar datakälla och data mottagare för att flytta data Azure Data Explorer och Spark-kluster. Med Azure Data Explorer och Apache Spark kan du skapa snabba och skalbara program som riktar in sig på datadrivna scenarier. Till exempel kan maskininlärning (ML), Extract-Transform-Load (ETL) och Log Analytics. Med anslutningsappen blir Azure Data Explorer ett giltigt datalager för standardåtgärder för Spark-källa och -mottagare, till exempel skrivning, läsning och writeStream.

Du kan skriva till Azure Data Explorer i batch- eller strömningsläge. Läsning från Azure Data Explorer har stöd för kolumnens rensning och predikat-pushdown, vilket filtrerar data i Azure Data Explorer, vilket minskar mängden överförda data.

Det här avsnittet beskriver hur du installerar och konfigurerar Azure Data Explorer Spark-anslutningsappen och flyttar data mellan Azure Data Explorer och Apache Spark kluster.

Anteckning

Även om vissa av exemplen nedan refererar till ett Azure Databricks Spark-kluster, tar Azure Data Explorer Spark-anslutningsappen inte direkta beroenden av Databricks eller någon annan Spark-distribution.

Förutsättningar

Tips

Spark 2.3.x-versioner stöds också, men kan kräva vissa ändringar pom.xml beroenden.

Så här skapar du Spark-anslutningsappen

Från och med version 2.3.0 introducerar vi nya artefakt-ID:n som ersätter spark-kusto-connector: kusto-spark_3.0_2.12 för Spark 3.x och Scala 2.12 och kusto-spark_2.4_2.11 med Spark 2.4.x och scala 2.11 som mål.

Anteckning

Versioner före 2.5.1 fungerar inte längre för inmatning till en befintlig tabell. Uppdatera till en senare version. Det här är valfritt. Om du använder förbyggda bibliotek, till exempel Maven, kan du se Spark-klusterkonfiguration.

Skapa förutsättningar

  1. Om du inte använder förbyggda bibliotek måste du installera biblioteken som anges i beroenden, inklusive följande Kusto Java SDK-bibliotek. Du hittar rätt version att installera genom att titta i den relevanta versionens pom:

  2. Se den här källan för att skapa Spark-anslutningsappen.

  3. För Scala-/Java-program som använder Maven-projektdefinitioner länkar du programmet till följande artefakt (den senaste versionen kan skilja sig):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Build-kommandon

Så här skapar du jar och kör alla tester:

mvn clean package

Skapa jar genom att köra alla tester och installera jar på din lokala Maven-lagringsplats:

mvn clean install

Mer information finns i användning av anslutningsappar.

Konfiguration av Spark-kluster

Anteckning

Vi rekommenderar att du använder den senaste versionen Azure Data Explorer Spark-anslutningsappen när du utför följande steg.

  1. Konfigurera följande Spark-klusterinställningar baserat på Azure Databricks-kluster med Spark 2.4.4 och Scala 2.11 eller Spark 3.0.1 och Scala 2.12:

    Databricks-klusterinställningar.

  2. Installera det senaste spark-kusto-connector-biblioteket från Maven:

    Importera bibliotek.Välj Spark-Kusto-Connector.

  3. Kontrollera att alla nödvändiga bibliotek är installerade:

    Kontrollera att bibliotek har installerats.

  4. Kontrollera att ytterligare beroenden har installerats för installation med hjälp av en JAR-fil:

    Lägg till beroenden.

Autentisering

Azure Data Explorer Spark-anslutningsappen kan du autentisera med Azure Active Directory (Azure AD) med någon av följande metoder:

Autentisering av Azure AD-program

Azure AD-programautentisering är den enklaste och vanligaste autentiseringsmetoden och rekommenderas för Azure Data Explorer Spark-anslutningsappen.

Egenskaper Alternativsträng Beskrivning
KUSTO_AAD_APP_ID kustoAadAppId Identifierare för Azure AD-program (klient).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Azure AD-autentiseringsutfärdare. Id för Azure AD-katalog (klientorganisation).
KUSTO_AAD_APP_SECRET kustoAadAppSecret Azure AD-programnyckel för klienten.

Anteckning

Äldre API-versioner (mindre än 2.0.0) har följande namngivning: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Azure Data Explorer behörigheter

Bevilja följande behörigheter på ett Azure Data Explorer kluster:

  • För läsning (datakälla) måste Azure AD-identiteten ha visningsbehörighet på måldatabasen eller administratörsbehörigheter i måltabellen.
  • För skrivning (data mottagare) måste Azure AD-identiteten ha behörighet som ingestor på måldatabasen. Den måste också ha användarbehörighet för måldatabasen för att skapa nya tabeller. Om måltabellen redan finns måste du konfigurera administratörsbehörigheter för måltabellen.

Mer information om hur Azure Data Explorer huvudroller finns i rollbaserad auktorisering. Information om hur du hanterar säkerhetsroller finns i hantering av säkerhetsroller.

Spark-mottagare: skriva till Azure Data Explorer

  1. Konfigurera parametrar för mottagare:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Skriv Spark DataFrame för Azure Data Explorer kluster som batch:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Eller använd den förenklade syntaxen:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Skriva strömmande data:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark-källa: läsa från Azure Data Explorer

  1. När du läser små mängder datadefinierar du datafrågan:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Valfritt: Om du anger tillfällig bloblagring (och inte Azure Data Explorer) skapas blobarna under anroparens ansvar. Detta omfattar att etablera lagringen, rotera åtkomstnycklar och ta bort tillfälliga artefakter. KustoBlobStorageUtils-modulen innehåller hjälpfunktioner för att ta bort blobar baserat på konto- och containerkoordinater och kontoautentiseringsuppgifter, eller en fullständig SAS-URL med skriv-, läs- och listbehörigheter. När motsvarande RDD inte längre behövs lagrar varje transaktion tillfälliga blobartefakter i en separat katalog. Den här katalogen avbildas som en del av lästransaktionsinformationsloggar som rapporteras på Spark-drivrutinsnoden.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    I exemplet ovan kan Key Vault inte nås med anslutningsgränssnittet. en enklare metod för att använda Databricks-hemligheterna används.

  3. Läs från Azure Data Explorer.

    • Om du anger tillfällig bloblagring läser du från Azure Data Explorer följande:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Om Azure Data Explorer tillhandahåller den tillfälliga bloblagringen läser du från Azure Data Explorer följande:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      

Nästa steg