Azure Veri Gezgini için Apache Spark Bağlayıcısı
Apache Spark, büyük ölçekli veri işleme için birleşik bir analiz altyapısıdır. Azure Veri Gezgini büyük miktarda veri üzerinde gerçek zamanlı analiz yapmaya yönelik hızlı ve tam olarak yönetilen bir veri analizi hizmetidir.
Spark Azure Veri Gezgini bağlayıcısı, herhangi bir Spark kümesinde çalıştırabilirsiniz açık kaynak bir projedir. Veri kaynağını ve veri havuzlarını, verileri Azure Veri Gezgini Spark kümeleri arasında taşımayı sağlar. Veri Azure Veri Gezgini ve Apache Spark kullanarak, veri odaklı senaryoları hedef alan hızlı ve ölçeklenebilir uygulamalar derlemeniz gerekir. Örneğin, makine öğrenmesi (ML), Ayıkla-Dönüştür-Yükle (ETL) ve Log Analytics. Bağlayıcı ile Azure Veri Gezgini, yazma, okuma ve yazmaStream gibi standart Spark kaynak ve havuz işlemleri için geçerli bir veri deposu haline gelir.
Toplu iş Azure Veri Gezgini akış modundaki bir veri akışına yazabilirsiniz. Verilerden Azure Veri Gezgini sütun ayıklamayı ve önkate itmeyi destekler. Bu da Azure Veri Gezgini veri hacmini azaltarak verileri filtreler.
Bu konu başlığında, Azure Veri Gezgini Spark bağlayıcısı yükleme ve yapılandırma ve Azure Veri Gezgini kümeler arasında Apache Spark açıklanmıştır.
Not
Aşağıdaki örneklerden bazıları bir Azure Databricks Spark kümesine başvurulsa da, Azure Veri Gezgini Spark bağlayıcısı Databricks'e veya diğer Spark dağıtımına doğrudan bağımlılıkları almaz.
Önkoşullar
- Azure aboneliği. Ücretsiz bir Azure hesabı oluşturun.
- Bir küme ve veritabanı oluşturun.
- Spark kümesi oluşturma
- Bağlayıcı Azure Veri Gezgini kitaplığını yükleyin:
- Spark 2.4+Scala 2.11 veya Spark 3+scala 2.12 için önceden yerleşik kitaplıklar
- Maven repo
- Maven 3.x yüklü
İpucu
Spark 2.3.x sürümleri de de desteklese de, bu sürümlerde pom.xml gerekli olabilir.
Spark bağlayıcısı oluşturma
2.3.0 sürümünden itibaren Spark 3.x ve Scala 2.12'yi ve kusto-spark_2.4_2.11'i hedef alan Spark 2.4.x ve scala 2.11'i hedef alan kusto-spark_3.0_2.12 spark-kusto-connector'ın yerini alan yeni yapıt kimlikleri tanıtıyor.
Not
2.5.1'den önceki sürümler artık mevcut bir tabloya alınarak çalışmıyor, lütfen sonraki bir sürüme güncelleştirin. Bu adım isteğe bağlıdır. Maven gibi önceden yerleşik kitaplıklar kullanıyorsanız bkz. Spark kümesi kurulumu.
Derleme önkoşulları
Önceden yerleşik kitaplıkları kullanasanız, aşağıdaki Kusto Java SDK kitaplıkları da dahil olmak üzere bağımlılıklarda listelenen kitaplıkları yüklemeniz gerekir. Doğru sürümü yüklemek için ilgili sürümün pom'una bakın:
Spark Bağlayıcısı'nın inşası için bu kaynağı kullanın.
Maven proje tanımlarını kullanan Scala/Java uygulamaları için, aşağıdaki yapıtla (en son sürüm farklılık gösterebilir) uygulamanıza bağlantı yapın:
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
Derleme komutları
Jar derlemek ve tüm testleri çalıştırmak için:
mvn clean package
Jar derlemek için tüm testleri çalıştırın ve jar'ı yerel Maven deponıza yükleyin:
mvn clean install
Daha fazla bilgi için bkz. bağlayıcı kullanımı.
Spark kümesi kurulumu
Not
Aşağıdaki adımları gerçekleştirerek en son Azure Veri Gezgini Spark bağlayıcı sürümünün kullanılması önerilir.
Spark 2.4.4 ve Scala 2.11 veya Spark 3.0.1 ve Scala 2.12 kullanarak Azure Databricks kümesine bağlı olarak aşağıdaki Spark kümesi ayarlarını yapılandırma:

Maven'dan en son spark-kusto-connector kitaplığını yükleyin:


Tüm gerekli kitaplıkların yüklü olduğunu doğrulayın:

JAR dosyası kullanarak yükleme için ek bağımlılıkların yük olduğunu doğrulayın:

Kimlik Doğrulaması
Azure Veri Gezgini Spark bağlayıcısı, aşağıdaki yöntemlerden birini Azure Active Directory (Azure AD) ile kimlik doğrulamasına olanak sağlar:
- Azure AD uygulaması
- Azure AD erişim belirteci
- Cihaz kimlik doğrulaması (üretim dışı senaryolar için)
- Azure Key Vault Kaynağına erişmek Key Vault azure-keyvault paketini yükleyin ve uygulama kimlik bilgilerini girin.
Azure AD uygulama kimlik doğrulaması
Azure AD uygulama kimlik doğrulaması en basit ve en yaygın kimlik doğrulama yöntemidir ve Spark bağlayıcısı Azure Veri Gezgini önerilir.
| Özellikler | Seçenek Dizesi | Description |
|---|---|---|
| KUSTO_AAD_APP_ID | kustoAadAppId | Azure AD uygulama (istemci) tanımlayıcısı. |
| KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Azure AD kimlik doğrulama yetkilisi. Azure AD Dizini (kiracı) kimliği. |
| KUSTO_AAD_APP_SECRET | kustoAadAppSecret | İstemci için Azure AD uygulama anahtarı. |
Not
Eski API sürümleri (2.0.0'dan küçük) şu adlandırmaya sahiptir: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"
Azure Veri Gezgini ayrıcalıkları
Bir kümede aşağıdaki Azure Veri Gezgini ver:
- Okumak için (veri kaynağı), Azure AD kimliğinin hedef veritabanında görüntüleyici ayrıcalıklarına veya hedef tabloda yönetici ayrıcalıklarına sahip olması gerekir.
- Yazma (veri havuzu) için Azure AD kimliğinin hedef veritabanında veri alma ayrıcalıklarına sahip olması gerekir. Ayrıca yeni tablolar oluşturmak için hedef veritabanında kullanıcı ayrıcalıklarına da sahip olmalıdır. Hedef tablo zaten varsa, hedef tabloda yönetici ayrıcalıklarını yapılandırmalısınız.
Rol tabanlı rol Azure Veri Gezgini daha fazla bilgi için bkz. rol tabanlı yetkilendirme. Güvenlik rollerini yönetmek için bkz. güvenlik rolleri yönetimi.
Spark havuzu: Azure Veri Gezgini
Havuz parametrelerini ayarlayın:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"Kümeyi toplu olarak Azure Veri Gezgini Spark DataFrame yazın:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()Basitleştirilmiş söz dizimlerini de kullanabilirsiniz:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)Akış verileri yazma:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark kaynağı: Azure Veri Gezgini'den okuma
Küçük miktarlardaki verileri okurkenveri sorgusunu tanımlayın:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)İsteğe bağlı: Geçici blob depolama alanını sağlarsanız (Azure Veri Gezgini değil) bloblar çağıranın sorumluluğu altında oluşturulur. Buna depolama alanı sağlama, erişim anahtarlarını döndürme ve geçici yapıtları silme dahildir. KustoBlobStorageUtils modülü, hesap ve kapsayıcı koordinatları ile hesap kimlik bilgilerine göre blobları silmek için yardımcı işlevler ya da yazma, okuma ve liste izinlerine sahip tam bir SAS URL'si içerir. Karşılık gelen RDD artık gerekli olmadığı zaman, her işlem geçici blob yapıtlarını ayrı bir dizinde depolar. Bu dizin, Spark Sürücüsü düğümünde bildirilen okuma işlemi bilgi günlüklerinin bir parçası olarak yakalanır.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")Yukarıdaki örnekte, Key Vault arabirimi kullanılarak erişilemez; Databricks gizli dizilerini kullanmanın daha basit bir yöntemi kullanılır.
Okuma Azure Veri Gezgini.
Geçici blob depolamayı sağlarsanız, aşağıdaki gibi Azure Veri Gezgini okuyun:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)Geçici Azure Veri Gezgini blob depolamayı sağlarsa, aşağıdaki gibi Azure Veri Gezgini okuma yapın:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)