Azure Data Explorer-connector voor Apache Spark

Apache Spark is een geïntegreerde analyse-engine voor gegevensverwerking op grote schaal. Azure Data Explorer is een snelle, volledig beheerde service voor gegevensanalyses waarmee grote hoeveelheden gegevens in real-time kunnen worden geanalyseerd.

De Azure Data Explorer-connector voor Spark is een open source-project dat kan worden uitgevoerd op elk Spark-cluster. Het implementeert gegevensbron en gegevenss sink voor het verplaatsen van gegevens Azure Data Explorer Spark-clusters. Met Azure Data Explorer en Apache Spark kunt u snelle en schaalbare toepassingen bouwen die zijn gericht op gegevensgestuurde scenario's. Bijvoorbeeld machine learning (ML), Extract-Transform-Load (ETL) en Log Analytics. Met de connector wordt Azure Data Explorer een geldig gegevensopslag voor standaard Spark-bron- en sinkbewerkingen, zoals schrijven, lezen en writeStream.

U kunt schrijven naar Azure Data Explorer in de batch- of streamingmodus. Lezen uit Azure Data Explorer ondersteunt het verwijderen van kolommen en het predicaat pushdown, waarmee de gegevens in Azure Data Explorer worden gefilterd, waardoor het volume van overgedragen gegevens wordt verkleind.

In dit onderwerp wordt beschreven hoe u de Spark Azure Data Explorer connector installeert en configureert en gegevens verplaatst tussen Azure Data Explorer en Apache Spark clusters.

Notitie

Hoewel sommige van de onderstaande voorbeelden verwijzen naar een Azure Databricks Spark-cluster, is Azure Data Explorer Spark-connector niet rechtstreeks afhankelijk van Databricks of een andere Spark-distributie.

Vereisten

Tip

Spark 2.3.x-versies worden ook ondersteund, maar vereisen mogelijk enkele wijzigingen in pom.xml afhankelijkheden.

De Spark-connector bouwen

Vanaf versie 2.3.0 introduceren we nieuwe artefact-id's ter vervanging van spark-kusto-connector: kusto-spark_3.0_2.12 die zijn gericht op Spark 3.x en Scala 2.12 en kusto-spark_2.4_2.11 voor Spark 2.4.x en scala 2.11.

Notitie

Versies vóór 2.5.1 werken niet meer voor opname in een bestaande tabel. Werk bij naar een nieuwere versie. Deze stap is optioneel. Zie Installatie van Spark-clusterals u vooraf gebouwde bibliotheken gebruikt, bijvoorbeeld Maven.

Vereisten voor bouwen

  1. Als u geen vooraf gebouwde bibliotheken gebruikt, moet u de bibliotheken installeren die worden vermeld in afhankelijkheden, waaronder de volgende Kusto Java SDK-bibliotheken. Als u de juiste versie wilt vinden om te installeren, kijkt u in de pom van de relevante release:

  2. Raadpleeg deze bron voor het bouwen van de Spark-connector.

  3. Voor Scala-/Java-toepassingen die gebruikmaken van Maven-projectdefinities, koppelt u uw toepassing aan het volgende artefact (de nieuwste versie kan verschillen):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Build-opdrachten

Jar bouwen en alle tests uitvoeren:

mvn clean package

Als u jar wilt bouwen, moet u alle tests uitvoeren en jar installeren in uw lokale Maven-opslagplaats:

mvn clean install

Zie connectorgebruik voor meer informatie.

Installatie van Spark-cluster

Notitie

Het is raadzaam om de nieuwste versie van Azure Data Explorer Spark-connector te gebruiken bij het uitvoeren van de volgende stappen.

  1. Configureer de volgende Spark-clusterinstellingen op basis van Azure Databricks-cluster met behulp van Spark 2.4.4 en Scala 2.11 of Spark 3.0.1 en Scala 2.12:

    Databricks-clusterinstellingen.

  2. Installeer de nieuwste spark-kusto-connector-bibliotheek vanuit Maven:

    Importeer bibliotheken.Selecteer Spark-Kusto-Connector.

  3. Controleer of alle vereiste bibliotheken zijn geïnstalleerd:

    Controleer of bibliotheken zijn geïnstalleerd.

  4. Voor installatie met behulp van een JAR-bestand controleert u of er aanvullende afhankelijkheden zijn geïnstalleerd:

    Afhankelijkheden toevoegen.

Verificatie

Azure Data Explorer Spark-connector kunt u verifiëren met Azure Active Directory (Azure AD) met behulp van een van de volgende methoden:

Azure AD-toepassingsverificatie

Azure AD-toepassingsverificatie is de eenvoudigste en meest voorkomende verificatiemethode en wordt aanbevolen voor de Azure Data Explorer Spark-connector.

Eigenschappen Optiereeks Beschrijving
KUSTO_AAD_APP_ID kustoAadAppId Id van de Azure AD-toepassing (client).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Azure AD-verificatie-instantie. Id van Azure AD Directory (tenant).
KUSTO_AAD_APP_SECRET kustoAadAppSecret Azure AD-toepassingssleutel voor de client.

Notitie

Oudere API-versies (minder dan 2.0.0) hebben de volgende naamgeving: kustoAADClientID, kustoClientAADClientPassword, kustoAADAuthorityID

Azure Data Explorer bevoegdheden

Verleen de volgende bevoegdheden op een Azure Data Explorer cluster:

  • Voor het lezen (gegevensbron) moet de Azure AD-identiteit viewer-bevoegdheden hebben voor de doeldatabase of beheerdersbevoegdheden voor de doeltabel.
  • Voor het schrijven (data sink) moet de Azure AD-identiteit ingestor-bevoegdheden hebben voor de doeldatabase. Het moet ook gebruikersbevoegdheden hebben voor de doeldatabase om nieuwe tabellen te maken. Als de doeltabel al bestaat, moet u beheerdersbevoegdheden voor de doeltabel configureren.

Zie autorisatie op Azure Data Explorer basis van rollen voor meer informatie over de belangrijkste rollen. Zie Beveiligingsrollen beheren voor het beheren van beveiligingsrollen.

Spark-sink: schrijven naar Azure Data Explorer

  1. Sinkparameters instellen:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Schrijf Spark DataFrame naar Azure Data Explorer cluster als batch:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Of gebruik de vereenvoudigde syntaxis:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Streaminggegevens schrijven:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark-bron: lezen vanuit Azure Data Explorer

  1. Bij het lezen van kleine hoeveelheden gegevens definieertu de gegevensquery:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Optioneel: Als u de tijdelijke blobopslag (en niet Azure Data Explorer) op te geven, worden de blobs gemaakt onder de verantwoordelijkheid van de aanroeper. Dit omvat het inrichten van de opslag, het roteren van toegangssleutels en het verwijderen van tijdelijke artefacten. De KustoBlobStorageUtils-module bevat helperfuncties voor het verwijderen van blobs op basis van account- en containercoördinaten en accountreferenties, of een volledige SAS-URL met schrijf-, lees- en lijstmachtigingen. Wanneer de bijbehorende RDD niet meer nodig is, slaat elke transactie tijdelijke blobartefacten op in een afzonderlijke map. Deze map wordt vastgelegd als onderdeel van de logboeken met informatie over leestransacties die zijn gerapporteerd op het knooppunt Spark-stuurprogramma.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    In het bovenstaande voorbeeld is de Key Vault niet toegankelijk via de connectorinterface; een eenvoudigere methode voor het gebruik van de Databricks-geheimen wordt gebruikt.

  3. Lees uit Azure Data Explorer.

    • Als u de tijdelijke blobopslag op geeft, leest u Azure Data Explorer als volgt:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Als Azure Data Explorer de tijdelijke blobopslag biedt, leest u Azure Data Explorer als volgt:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      

Volgende stappen