Azure Data Explorer-connector voor Apache Spark
Apache Spark is een geïntegreerde analyse-engine voor gegevensverwerking op grote schaal. Azure Data Explorer is een snelle, volledig beheerde service voor gegevensanalyses waarmee grote hoeveelheden gegevens in real-time kunnen worden geanalyseerd.
De Azure Data Explorer-connector voor Spark is een open source-project dat kan worden uitgevoerd op elk Spark-cluster. Het implementeert gegevensbron en gegevenss sink voor het verplaatsen van gegevens Azure Data Explorer Spark-clusters. Met Azure Data Explorer en Apache Spark kunt u snelle en schaalbare toepassingen bouwen die zijn gericht op gegevensgestuurde scenario's. Bijvoorbeeld machine learning (ML), Extract-Transform-Load (ETL) en Log Analytics. Met de connector wordt Azure Data Explorer een geldig gegevensopslag voor standaard Spark-bron- en sinkbewerkingen, zoals schrijven, lezen en writeStream.
U kunt schrijven naar Azure Data Explorer in de batch- of streamingmodus. Lezen uit Azure Data Explorer ondersteunt het verwijderen van kolommen en het predicaat pushdown, waarmee de gegevens in Azure Data Explorer worden gefilterd, waardoor het volume van overgedragen gegevens wordt verkleind.
In dit onderwerp wordt beschreven hoe u de Spark Azure Data Explorer connector installeert en configureert en gegevens verplaatst tussen Azure Data Explorer en Apache Spark clusters.
Notitie
Hoewel sommige van de onderstaande voorbeelden verwijzen naar een Azure Databricks Spark-cluster, is Azure Data Explorer Spark-connector niet rechtstreeks afhankelijk van Databricks of een andere Spark-distributie.
Vereisten
- Een Azure-abonnement. Maak een gratis Azure-account.
- Maak een cluster en database.
- Een Spark-cluster maken
- Installeer Azure Data Explorer-connectorbibliotheek:
- Vooraf gebouwde bibliotheken voor Spark 2.4+Scala 2.11 of Spark 3+scala 2.12
- Maven-repo
- Maven 3.x geïnstalleerd
Tip
Spark 2.3.x-versies worden ook ondersteund, maar vereisen mogelijk enkele wijzigingen in pom.xml afhankelijkheden.
De Spark-connector bouwen
Vanaf versie 2.3.0 introduceren we nieuwe artefact-id's ter vervanging van spark-kusto-connector: kusto-spark_3.0_2.12 die zijn gericht op Spark 3.x en Scala 2.12 en kusto-spark_2.4_2.11 voor Spark 2.4.x en scala 2.11.
Notitie
Versies vóór 2.5.1 werken niet meer voor opname in een bestaande tabel. Werk bij naar een nieuwere versie. Deze stap is optioneel. Zie Installatie van Spark-clusterals u vooraf gebouwde bibliotheken gebruikt, bijvoorbeeld Maven.
Vereisten voor bouwen
Als u geen vooraf gebouwde bibliotheken gebruikt, moet u de bibliotheken installeren die worden vermeld in afhankelijkheden, waaronder de volgende Kusto Java SDK-bibliotheken. Als u de juiste versie wilt vinden om te installeren, kijkt u in de pom van de relevante release:
Raadpleeg deze bron voor het bouwen van de Spark-connector.
Voor Scala-/Java-toepassingen die gebruikmaken van Maven-projectdefinities, koppelt u uw toepassing aan het volgende artefact (de nieuwste versie kan verschillen):
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
Build-opdrachten
Jar bouwen en alle tests uitvoeren:
mvn clean package
Als u jar wilt bouwen, moet u alle tests uitvoeren en jar installeren in uw lokale Maven-opslagplaats:
mvn clean install
Zie connectorgebruik voor meer informatie.
Installatie van Spark-cluster
Notitie
Het is raadzaam om de nieuwste versie van Azure Data Explorer Spark-connector te gebruiken bij het uitvoeren van de volgende stappen.
Configureer de volgende Spark-clusterinstellingen op basis van Azure Databricks-cluster met behulp van Spark 2.4.4 en Scala 2.11 of Spark 3.0.1 en Scala 2.12:

Installeer de nieuwste spark-kusto-connector-bibliotheek vanuit Maven:


Controleer of alle vereiste bibliotheken zijn geïnstalleerd:

Voor installatie met behulp van een JAR-bestand controleert u of er aanvullende afhankelijkheden zijn geïnstalleerd:

Verificatie
Azure Data Explorer Spark-connector kunt u verifiëren met Azure Active Directory (Azure AD) met behulp van een van de volgende methoden:
- Een Azure AD-toepassing
- Een Azure AD-toegangs token
- Apparaatverificatie (voor niet-productiescenario's)
- Een Azure Key Vault Voor toegang tot Key Vault resource installeert u het azure-keyvault-pakket en geeft u toepassingsreferenties op.
Azure AD-toepassingsverificatie
Azure AD-toepassingsverificatie is de eenvoudigste en meest voorkomende verificatiemethode en wordt aanbevolen voor de Azure Data Explorer Spark-connector.
| Eigenschappen | Optiereeks | Beschrijving |
|---|---|---|
| KUSTO_AAD_APP_ID | kustoAadAppId | Id van de Azure AD-toepassing (client). |
| KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Azure AD-verificatie-instantie. Id van Azure AD Directory (tenant). |
| KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Azure AD-toepassingssleutel voor de client. |
Notitie
Oudere API-versies (minder dan 2.0.0) hebben de volgende naamgeving: kustoAADClientID, kustoClientAADClientPassword, kustoAADAuthorityID
Azure Data Explorer bevoegdheden
Verleen de volgende bevoegdheden op een Azure Data Explorer cluster:
- Voor het lezen (gegevensbron) moet de Azure AD-identiteit viewer-bevoegdheden hebben voor de doeldatabase of beheerdersbevoegdheden voor de doeltabel.
- Voor het schrijven (data sink) moet de Azure AD-identiteit ingestor-bevoegdheden hebben voor de doeldatabase. Het moet ook gebruikersbevoegdheden hebben voor de doeldatabase om nieuwe tabellen te maken. Als de doeltabel al bestaat, moet u beheerdersbevoegdheden voor de doeltabel configureren.
Zie autorisatie op Azure Data Explorer basis van rollen voor meer informatie over de belangrijkste rollen. Zie Beveiligingsrollen beheren voor het beheren van beveiligingsrollen.
Spark-sink: schrijven naar Azure Data Explorer
Sinkparameters instellen:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"Schrijf Spark DataFrame naar Azure Data Explorer cluster als batch:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()Of gebruik de vereenvoudigde syntaxis:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)Streaminggegevens schrijven:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark-bron: lezen vanuit Azure Data Explorer
Bij het lezen van kleine hoeveelheden gegevens definieertu de gegevensquery:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)Optioneel: Als u de tijdelijke blobopslag (en niet Azure Data Explorer) op te geven, worden de blobs gemaakt onder de verantwoordelijkheid van de aanroeper. Dit omvat het inrichten van de opslag, het roteren van toegangssleutels en het verwijderen van tijdelijke artefacten. De KustoBlobStorageUtils-module bevat helperfuncties voor het verwijderen van blobs op basis van account- en containercoördinaten en accountreferenties, of een volledige SAS-URL met schrijf-, lees- en lijstmachtigingen. Wanneer de bijbehorende RDD niet meer nodig is, slaat elke transactie tijdelijke blobartefacten op in een afzonderlijke map. Deze map wordt vastgelegd als onderdeel van de logboeken met informatie over leestransacties die zijn gerapporteerd op het knooppunt Spark-stuurprogramma.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")In het bovenstaande voorbeeld is de Key Vault niet toegankelijk via de connectorinterface; een eenvoudigere methode voor het gebruik van de Databricks-geheimen wordt gebruikt.
Lees uit Azure Data Explorer.
Als u de tijdelijke blobopslag op geeft, leest u Azure Data Explorer als volgt:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)Als Azure Data Explorer de tijdelijke blobopslag biedt, leest u Azure Data Explorer als volgt:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Volgende stappen
- Meer informatie over het gebruik van meer opties van de Azure Data Explorer Spark-connector
- Voorbeeldcode voor Scala en Python