Azure Data Explorer Connector pro Apache Spark
Apache Spark je jednotný analytický modul pro zpracování velkých objemů dat. Azure Data Explorer je rychlá, plně spravovaná služba analýzy dat pro analýzy velkých objemů dat v reálném čase.
Konektor Azure Data Explorer Pro Spark je projekt open source, který se může spustit na libovolném clusteru Spark. Implementuje zdroj dat a jímku dat pro přesun dat mezi Azure Data Explorer a clustery Spark. Pomocí Azure Data Explorer a Apache Spark můžete vytvářet rychlé a škálovatelné aplikace, které cílí na scénáře řízené daty. Například strojové učení (ML), extrakce, transformace a načítání (ETL) a Log Analytics. Díky konektoru se Azure Data Explorer platné úložiště dat pro standardní operace zdroje a jímky Sparku, jako je zápis, čtení a writeStream.
Do služby můžete zapisovat Azure Data Explorer dávkovém režimu nebo v režimu streamování. Čtení z Azure Data Explorer podporuje vyřazování sloupců a predikátové nabízení, které filtruje data v Azure Data Explorer, čímž snižuje objem přenesených dat.
Toto téma popisuje, jak nainstalovat a nakonfigurovat konektor Azure Data Explorer Spark a přesouvat data mezi Azure Data Explorer a Apache Spark clustery.
Poznámka
Přestože některé z následujících příkladů odkazují na cluster Azure Databricks Spark, konektor Azure Data Explorer Spark nepřidá přímé závislosti na Databricks nebo jiné distribuci Sparku.
Požadavky
- Předplatné Azure. Vytvořte si bezplatný účet Azure.
- Vytvořte cluster a databázi.
- Vytvoření clusteru Spark
- Nainstalujte Azure Data Explorer konektoru:
- Předem vytvořené knihovny pro Spark 2.4 a Scala 2.11 nebo Spark 3+Scala 2.12
- Maven repo
- Nainstalovaný Maven 3.x
Tip
Podporují se také verze Sparku 2.3.x, ale mohou vyžadovat určité změny pom.xml závislostech.
Postup sestavení konektoru Spark
Od verze 2.3.0 zavádíme nová ID artefaktů, která nahrazující konektor spark-kusto: kusto-spark_3.0_2.12 cílený na Spark 3.x a Scala 2.12 a kusto-spark_2.4_2.11 cílený na Spark 2.4.x a Scala 2.11.
Poznámka
Verze starší než 2.5.1 už nefungují pro ingestování do existující tabulky. Aktualizujte prosím na novější verzi. Tento krok je volitelný. Pokud používáte předem vytvořené knihovny, například Maven, podívejte se na nastavení clusteru Spark.
Požadavky na sestavení
Pokud předdefinované knihovny používáte, musíte nainstalovat knihovny uvedené v závislostech, včetně následujících knihoven sady Kusto Java SDK. Správnou verzi k instalaci najdete v pom příslušné verze:
Informace o vytvoření konektoru Spark najdete v tomto zdroji.
V případě aplikací Scala/Java využívajících definice projektů Maven propoojte aplikaci s následujícím artefaktem (nejnovější verze se může lišit):
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
Příkazy sestavení
Sestavení souboru JAR a spuštění všech testů:
mvn clean package
Pokud chcete sestavit soubor JAR, spusťte všechny testy a nainstalujte soubor JAR do místního úložiště Maven:
mvn clean install
Další informace najdete v tématu o využití konektoru.
Nastavení clusteru Spark
Poznámka
Při provádění následujících kroků se doporučuje Azure Data Explorer nejnovější verzi konektoru Spark.
Nakonfigurujte následující nastavení clusteru Spark na základě clusteru Azure Databricks pomocí Sparku 2.4.4 a Scaly 2.11 nebo Sparku 3.0.1 a Scaly 2.12:

Nainstalujte nejnovější knihovnu spark-kusto-connector z Mavenu:


Ověřte, že jsou nainstalované všechny požadované knihovny:

V případě instalace pomocí souboru JAR ověřte, že byly nainstalovány další závislosti:

Authentication
Azure Data Explorer Spark umožňuje ověřování pomocí Azure Active Directory (Azure AD) pomocí jedné z následujících metod:
- Aplikace Azure AD
- Přístupový token Azure AD
- Ověřování zařízení (pro neprodukní scénáře)
- Správce Azure Key Vault Přístup k prostředku Key Vault, nainstalujte balíček azure-keyvault a zadejte přihlašovací údaje aplikace.
Ověřování aplikací Azure AD
Ověřování aplikací Azure AD je nejjednodušší a nejběžnější metodou ověřování a doporučuje se pro Azure Data Explorer Spark.
| Vlastnosti | Řetězec možnosti | Popis |
|---|---|---|
| KUSTO_AAD_APP_ID | KustoAadAppId | Identifikátor aplikace Azure AD (klienta). |
| KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Ověřovací autorita Azure AD. ID adresáře azure AD (tenanta). |
| KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Klíč aplikace Azure AD pro klienta. |
Poznámka
Starší verze rozhraní API (méně než 2.0.0) mají následující názvy: kustoAADClientID, kustoClientAADClientPassword, kustoAADAuthorityID.
Azure Data Explorer oprávnění
Udělte clusteru s podporou Azure Data Explorer oprávnění:
- Pro čtení (zdroj dat) musí mít identita Azure AD oprávnění diváka k cílové databázi nebo oprávnění správce cílové tabulky.
- Pro zápis (jímka dat) musí mít identita Azure AD oprávnění ingestoru k cílové databázi. Aby bylo možné vytvářet nové tabulky, musí mít také uživatelská oprávnění k cílové databázi. Pokud cílová tabulka již existuje, musíte pro cílovou tabulku nakonfigurovat oprávnění správce.
Další informace o rolích Azure Data Explorer najdete v tématu Autorizace na základě rolí. Informace o správě rolí zabezpečení najdete v tématu Správa rolí zabezpečení.
Jímka Sparku: Zápis do Azure Data Explorer
Nastavení parametrů jímky:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"Zápis sparkového datového rámce do Azure Data Explorer clusteru jako dávky:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()Nebo použijte zjednodušenou syntaxi:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)Zápis streamovaných dat:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Zdroj Spark: čtení z Azure Data Explorer
Při čtení malých objemů datdefinujte dotaz na data:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)Volitelné: Pokud poskytnete přechodné úložiště objektů blob (a Azure Data Explorer), objekty blob se vytvoří pod zodpovědností volajícího. To zahrnuje zřízení úložiště, obměnou přístupových klíčů a odstranění přechodných artefaktů. Modul KustoBlobStorageUtils obsahuje pomocné funkce pro odstraňování objektů blob na základě souřadnic účtu a kontejneru a přihlašovacích údajů účtu nebo úplné adresy URL SAS s oprávněními k zápisu, čtení a zobrazení seznamu. Pokud už odpovídající rdd nepotřebujete, každá transakce ukládá přechodné artefakty objektů blob v samostatném adresáři. Tento adresář se zachycuje jako součást protokolů s informacemi o transakcích pro čtení hlášených v uzlu ovladače Sparku.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")Ve výše uvedeném příkladu se Key Vault k souboru přistupovat pomocí rozhraní konektoru. Používá se jednodušší metoda použití tajných kódů Databricks.
Číst z Azure Data Explorer.
Pokud poskytnete přechodné úložiště objektů blob, přečtěte si Azure Data Explorer následujícím způsobem:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)Pokud Azure Data Explorer poskytuje přechodné úložiště objektů blob, přečtěte si Azure Data Explorer následujícím způsobem:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Další kroky
- Zjistěte, jak používat další možnosti konektoru Azure Data Explorer Spark.
- Ukázkový kód pro Scalu a Python