Azure Data Explorer Connector pro Apache Spark

Apache Spark je jednotný analytický modul pro zpracování velkých objemů dat. Azure Data Explorer je rychlá, plně spravovaná služba analýzy dat pro analýzy velkých objemů dat v reálném čase.

Konektor Azure Data Explorer Pro Spark je projekt open source, který se může spustit na libovolném clusteru Spark. Implementuje zdroj dat a jímku dat pro přesun dat mezi Azure Data Explorer a clustery Spark. Pomocí Azure Data Explorer a Apache Spark můžete vytvářet rychlé a škálovatelné aplikace, které cílí na scénáře řízené daty. Například strojové učení (ML), extrakce, transformace a načítání (ETL) a Log Analytics. Díky konektoru se Azure Data Explorer platné úložiště dat pro standardní operace zdroje a jímky Sparku, jako je zápis, čtení a writeStream.

Do služby můžete zapisovat Azure Data Explorer dávkovém režimu nebo v režimu streamování. Čtení z Azure Data Explorer podporuje vyřazování sloupců a predikátové nabízení, které filtruje data v Azure Data Explorer, čímž snižuje objem přenesených dat.

Toto téma popisuje, jak nainstalovat a nakonfigurovat konektor Azure Data Explorer Spark a přesouvat data mezi Azure Data Explorer a Apache Spark clustery.

Poznámka

Přestože některé z následujících příkladů odkazují na cluster Azure Databricks Spark, konektor Azure Data Explorer Spark nepřidá přímé závislosti na Databricks nebo jiné distribuci Sparku.

Požadavky

Tip

Podporují se také verze Sparku 2.3.x, ale mohou vyžadovat určité změny pom.xml závislostech.

Postup sestavení konektoru Spark

Od verze 2.3.0 zavádíme nová ID artefaktů, která nahrazující konektor spark-kusto: kusto-spark_3.0_2.12 cílený na Spark 3.x a Scala 2.12 a kusto-spark_2.4_2.11 cílený na Spark 2.4.x a Scala 2.11.

Poznámka

Verze starší než 2.5.1 už nefungují pro ingestování do existující tabulky. Aktualizujte prosím na novější verzi. Tento krok je volitelný. Pokud používáte předem vytvořené knihovny, například Maven, podívejte se na nastavení clusteru Spark.

Požadavky na sestavení

  1. Pokud předdefinované knihovny používáte, musíte nainstalovat knihovny uvedené v závislostech, včetně následujících knihoven sady Kusto Java SDK. Správnou verzi k instalaci najdete v pom příslušné verze:

  2. Informace o vytvoření konektoru Spark najdete v tomto zdroji.

  3. V případě aplikací Scala/Java využívajících definice projektů Maven propoojte aplikaci s následujícím artefaktem (nejnovější verze se může lišit):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Příkazy sestavení

Sestavení souboru JAR a spuštění všech testů:

mvn clean package

Pokud chcete sestavit soubor JAR, spusťte všechny testy a nainstalujte soubor JAR do místního úložiště Maven:

mvn clean install

Další informace najdete v tématu o využití konektoru.

Nastavení clusteru Spark

Poznámka

Při provádění následujících kroků se doporučuje Azure Data Explorer nejnovější verzi konektoru Spark.

  1. Nakonfigurujte následující nastavení clusteru Spark na základě clusteru Azure Databricks pomocí Sparku 2.4.4 a Scaly 2.11 nebo Sparku 3.0.1 a Scaly 2.12:

    Nastavení clusteru Databricks.

  2. Nainstalujte nejnovější knihovnu spark-kusto-connector z Mavenu:

    Import knihoven.Vyberte Spark-Kusto-Connector.

  3. Ověřte, že jsou nainstalované všechny požadované knihovny:

    Ověřte nainstalované knihovny.

  4. V případě instalace pomocí souboru JAR ověřte, že byly nainstalovány další závislosti:

    Přidejte závislosti.

Authentication

Azure Data Explorer Spark umožňuje ověřování pomocí Azure Active Directory (Azure AD) pomocí jedné z následujících metod:

Ověřování aplikací Azure AD

Ověřování aplikací Azure AD je nejjednodušší a nejběžnější metodou ověřování a doporučuje se pro Azure Data Explorer Spark.

Vlastnosti Řetězec možnosti Popis
KUSTO_AAD_APP_ID KustoAadAppId Identifikátor aplikace Azure AD (klienta).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Ověřovací autorita Azure AD. ID adresáře azure AD (tenanta).
KUSTO_AAD_APP_SECRET kustoAadAppSecret Klíč aplikace Azure AD pro klienta.

Poznámka

Starší verze rozhraní API (méně než 2.0.0) mají následující názvy: kustoAADClientID, kustoClientAADClientPassword, kustoAADAuthorityID.

Azure Data Explorer oprávnění

Udělte clusteru s podporou Azure Data Explorer oprávnění:

  • Pro čtení (zdroj dat) musí mít identita Azure AD oprávnění diváka k cílové databázi nebo oprávnění správce cílové tabulky.
  • Pro zápis (jímka dat) musí mít identita Azure AD oprávnění ingestoru k cílové databázi. Aby bylo možné vytvářet nové tabulky, musí mít také uživatelská oprávnění k cílové databázi. Pokud cílová tabulka již existuje, musíte pro cílovou tabulku nakonfigurovat oprávnění správce.

Další informace o rolích Azure Data Explorer najdete v tématu Autorizace na základě rolí. Informace o správě rolí zabezpečení najdete v tématu Správa rolí zabezpečení.

Jímka Sparku: Zápis do Azure Data Explorer

  1. Nastavení parametrů jímky:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Zápis sparkového datového rámce do Azure Data Explorer clusteru jako dávky:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Nebo použijte zjednodušenou syntaxi:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Zápis streamovaných dat:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Zdroj Spark: čtení z Azure Data Explorer

  1. Při čtení malých objemů datdefinujte dotaz na data:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Volitelné: Pokud poskytnete přechodné úložiště objektů blob (a Azure Data Explorer), objekty blob se vytvoří pod zodpovědností volajícího. To zahrnuje zřízení úložiště, obměnou přístupových klíčů a odstranění přechodných artefaktů. Modul KustoBlobStorageUtils obsahuje pomocné funkce pro odstraňování objektů blob na základě souřadnic účtu a kontejneru a přihlašovacích údajů účtu nebo úplné adresy URL SAS s oprávněními k zápisu, čtení a zobrazení seznamu. Pokud už odpovídající rdd nepotřebujete, každá transakce ukládá přechodné artefakty objektů blob v samostatném adresáři. Tento adresář se zachycuje jako součást protokolů s informacemi o transakcích pro čtení hlášených v uzlu ovladače Sparku.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    Ve výše uvedeném příkladu se Key Vault k souboru přistupovat pomocí rozhraní konektoru. Používá se jednodušší metoda použití tajných kódů Databricks.

  3. Číst z Azure Data Explorer.

    • Pokud poskytnete přechodné úložiště objektů blob, přečtěte si Azure Data Explorer následujícím způsobem:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Pokud Azure Data Explorer poskytuje přechodné úložiště objektů blob, přečtěte si Azure Data Explorer následujícím způsobem:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      

Další kroky