Connettore azure Esplora dati per Apache Spark
Importante
Questo connettore può essere usato in Analisi in tempo reale in Microsoft Fabric. Usare le istruzioni in questo articolo con le eccezioni seguenti:
- Se necessario, creare database usando le istruzioni in Creare un database KQL.
- Se necessario, creare tabelle usando le istruzioni in Creare una tabella vuota.
- Ottenere gli URI di query o di inserimento usando le istruzioni in URI di copia.
- Eseguire query in un set di query KQL.
Apache Spark è un motore di analisi unificato per l'elaborazione di dati su larga scala. Esplora dati di Azure è un servizio di analisi dei dati veloce e completamente gestito per l'analisi in tempo reale di volumi elevati di dati.
Il connettore Azure Esplora dati per Spark è un progetto di open source che può essere eseguito in qualsiasi cluster Spark. Implementa l'origine dati e il sink di dati per lo spostamento di dati tra cluster di Esplora dati di Azure e Spark. Con Azure Esplora dati e Apache Spark è possibile creare applicazioni veloci e scalabili destinate a scenari basati sui dati. Ad esempio, Machine Learning (ML), Extract-Transform-Load (ETL) e Log Analytics. Con il connettore, Azure Esplora dati diventa un archivio dati valido per le operazioni di origine e sink Spark standard, ad esempio scrittura, lettura e scritturaStream.
È possibile scrivere in Azure Esplora dati tramite l'inserimento in coda o l'inserimento in streaming. La lettura da Azure Esplora dati supporta l'eliminazione delle colonne e il push dei predicati, che filtra i dati in Azure Esplora dati, riducendo il volume di dati trasferiti.
Nota
Per informazioni sull'uso del connettore Synapse Spark per Azure Esplora dati, vedere Connettersi ad Azure Esplora dati usando Apache Spark per Azure Synapse Analytics.
Questo argomento descrive come installare e configurare il connettore Azure Esplora dati Spark e spostare i dati tra cluster di Azure Esplora dati e Apache Spark.
Nota
Anche se alcuni degli esempi seguenti fanno riferimento a un cluster Spark di Azure Databricks, il connettore Azure Esplora dati Spark non assume dipendenze dirette da Databricks o da altre distribuzioni spark.
Prerequisiti
- Una sottoscrizione di Azure. Creare un account Azure gratuito.
- Un cluster e un database di Esplora dati di Azure. Creare un cluster e un database.
- Un cluster Spark
- Installare la libreria del connettore di Azure Esplora dati:
- Librerie predefinite per Spark 2.4+Scala 2.11 o Spark 3+scala 2.12
- Repository Maven
- Maven 3.x installato
Suggerimento
Sono supportate anche le versioni di Spark 2.3.x, ma potrebbero essere necessarie alcune modifiche nelle dipendenze pom.xml.
Come compilare il connettore Spark
A partire dalla versione 2.3.0 vengono introdotti nuovi ID artefatti che sostituiscono spark-kusto-connector: kusto-spark_3.0_2.12 destinati a Spark 3.x e Scala 2.12 e kusto-spark_2.4_2.11 destinati a Spark 2.4.x e scala 2.11.
Nota
Le versioni precedenti alla 2.5.1 non funzionano più per l'inserimento in una tabella esistente. Eseguire l'aggiornamento a una versione successiva. Questo passaggio è facoltativo. Se si usano librerie predefinite, ad esempio Maven, vedere Configurazione del cluster Spark.
Prerequisiti di compilazione
Se non si usano librerie predefinite, è necessario installare le librerie elencate nelle dipendenze , incluse le librerie Kusto Java SDK seguenti. Per trovare la versione corretta da installare, cercare il pom della versione pertinente:
Fare riferimento a questa origine per la compilazione del connettore Spark.
Per le applicazioni Scala/Java che usano definizioni di progetto Maven, collegare l'applicazione con l'artefatto seguente (la versione più recente può essere diversa):
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
Comandi di compilazione
Per compilare file con estensione jar ed eseguire tutti i test:
mvn clean package
Per compilare il file JAR, eseguire tutti i test e installare jar nel repository Maven locale:
mvn clean install
Per altre informazioni, vedere Utilizzo del connettore.
Configurazione del cluster Spark
Nota
È consigliabile usare la versione più recente del connettore Azure Esplora dati Spark quando si eseguono i passaggi seguenti.
Configurare le impostazioni del cluster Spark seguenti, in base al cluster Azure Databricks usando Spark 2.4.4 e Scala 2.11 o Spark 3.0.1 e Scala 2.12:
Installare la libreria spark-kusto-connector più recente da Maven:
Verificare che tutte le librerie necessarie siano installate:
Per l'installazione usando un file JAR, verificare che siano state installate dipendenze aggiuntive:
Authentication
Il connettore Azure Esplora dati Spark consente di eseguire l'autenticazione con Microsoft Entra ID usando uno dei metodi seguenti:
- Un'applicazione Microsoft Entra
- Un token di accesso Microsoft Entra
- Autenticazione del dispositivo (per scenari non di produzione)
- Azure Key Vault Per accedere alla risorsa Key Vault, installare il pacchetto azure-keyvault e fornire le credenziali dell'applicazione.
autenticazione dell'applicazione Microsoft Entra
Microsoft Entra'autenticazione dell'applicazione è il metodo di autenticazione più semplice e comune ed è consigliato per il connettore Azure Esplora dati Spark.
Proprietà | Stringa di opzione | Descrizione |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra identificatore dell'applicazione (client). |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra'autorità di autenticazione. MICROSOFT ENTRA ID directory (tenant). Facoltativo: il valore predefinito è microsoft.com. Per altre informazioni, vedere autorità di Microsoft Entra. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra chiave dell'applicazione per il client. |
Nota
Le versioni precedenti dell'API (minori di 2.0.0) hanno la denominazione seguente: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"
Privilegi di Azure Esplora dati
Concedere i privilegi seguenti in un cluster Esplora dati di Azure:
- Per la lettura (origine dati), l'identità Microsoft Entra deve avere privilegi di visualizzatore per il database di destinazione o privilegi di amministratore nella tabella di destinazione.
- Per la scrittura (sink di dati), l'identità Microsoft Entra deve disporre dei privilegi di ingestor nel database di destinazione. Per creare nuove tabelle, è necessario disporre anche dei privilegi utente per il database di destinazione. Se la tabella di destinazione esiste già, è necessario configurare i privilegi di amministratore nella tabella di destinazione.
Per altre informazioni sui ruoli dell'entità di Esplora dati di Azure, vedere Controllo degli accessi in base al ruolo. Per la gestione dei ruoli di sicurezza, vedere Gestione dei ruoli di sicurezza.
Sink Spark: scrittura in Azure Esplora dati
Configurare i parametri sink:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Scrivere un dataframe Spark nel cluster Esplora dati di Azure come batch:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
In alternativa, usare la sintassi semplificata:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Scrivere dati di streaming:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Origine Spark: lettura da Azure Esplora dati
Quando si leggono piccole quantità di dati, definire la query sui dati:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Facoltativo: se si specifica l'archivio BLOB temporaneo (e non Azure Esplora dati) i BLOB vengono creati sotto la responsabilità del chiamante. Ciò include il provisioning dell'archiviazione, la rotazione delle chiavi di accesso e l'eliminazione di artefatti temporanei. Il modulo KustoBlobStorageUtils contiene funzioni helper per l'eliminazione di BLOB in base alle coordinate di account e contenitori e alle credenziali dell'account oppure a un URL di firma di accesso condiviso completo con autorizzazioni di scrittura, lettura ed elenco. Quando il set di dati RDD corrispondente non è più necessario, ogni transazione archivia gli artefatti BLOB temporanei in una directory separata. Questa directory viene acquisita come parte dei log delle informazioni sulle transazioni di lettura segnalati nel nodo Driver Spark.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
Nell'esempio precedente, l'Key Vault non è accessibile usando l'interfaccia del connettore. Viene usato un metodo più semplice per l'uso dei segreti di Databricks.
Leggere da Azure Esplora dati.
Se si specifica l'archiviazione BLOB temporanea, leggere da Azure Esplora dati come indicato di seguito:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Se Azure Esplora dati fornisce l'archivio BLOB temporaneo, leggere da Azure Esplora dati come indicato di seguito:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Contenuti correlati
Commenti e suggerimenti
https://aka.ms/ContentUserFeedback.
Presto disponibile: Nel corso del 2024 verranno gradualmente disattivati i problemi di GitHub come meccanismo di feedback per il contenuto e ciò verrà sostituito con un nuovo sistema di feedback. Per altre informazioni, vedereInvia e visualizza il feedback per