Azure Veri Gezgini için Apache Spark Bağlayıcısı

Apache Spark, büyük ölçekli veri işleme için birleşik bir analiz altyapısıdır. Azure Veri Gezgini büyük miktarda veri üzerinde gerçek zamanlı analiz yapmaya yönelik hızlı ve tam olarak yönetilen bir veri analizi hizmetidir.

Spark Azure Veri Gezgini bağlayıcısı, herhangi bir Spark kümesinde çalıştırabilirsiniz açık kaynak bir projedir. Veri kaynağını ve veri havuzlarını, verileri Azure Veri Gezgini Spark kümeleri arasında taşımayı sağlar. Veri Azure Veri Gezgini ve Apache Spark kullanarak, veri odaklı senaryoları hedef alan hızlı ve ölçeklenebilir uygulamalar derlemeniz gerekir. Örneğin, makine öğrenmesi (ML), Ayıkla-Dönüştür-Yükle (ETL) ve Log Analytics. Bağlayıcı ile Azure Veri Gezgini, yazma, okuma ve yazmaStream gibi standart Spark kaynak ve havuz işlemleri için geçerli bir veri deposu haline gelir.

Toplu iş Azure Veri Gezgini akış modundaki bir veri akışına yazabilirsiniz. Verilerden Azure Veri Gezgini sütun ayıklamayı ve önkate itmeyi destekler. Bu da Azure Veri Gezgini veri hacmini azaltarak verileri filtreler.

Bu konu başlığında, Azure Veri Gezgini Spark bağlayıcısı yükleme ve yapılandırma ve Azure Veri Gezgini kümeler arasında Apache Spark açıklanmıştır.

Not

Aşağıdaki örneklerden bazıları bir Azure Databricks Spark kümesine başvurulsa da, Azure Veri Gezgini Spark bağlayıcısı Databricks'e veya diğer Spark dağıtımına doğrudan bağımlılıkları almaz.

Önkoşullar

İpucu

Spark 2.3.x sürümleri de de desteklese de, bu sürümlerde pom.xml gerekli olabilir.

Spark bağlayıcısı oluşturma

2.3.0 sürümünden itibaren Spark 3.x ve Scala 2.12'yi ve kusto-spark_2.4_2.11'i hedef alan Spark 2.4.x ve scala 2.11'i hedef alan kusto-spark_3.0_2.12 spark-kusto-connector'ın yerini alan yeni yapıt kimlikleri tanıtıyor.

Not

2.5.1'den önceki sürümler artık mevcut bir tabloya alınarak çalışmıyor, lütfen sonraki bir sürüme güncelleştirin. Bu adım isteğe bağlıdır. Maven gibi önceden yerleşik kitaplıklar kullanıyorsanız bkz. Spark kümesi kurulumu.

Derleme önkoşulları

  1. Önceden yerleşik kitaplıkları kullanasanız, aşağıdaki Kusto Java SDK kitaplıkları da dahil olmak üzere bağımlılıklarda listelenen kitaplıkları yüklemeniz gerekir. Doğru sürümü yüklemek için ilgili sürümün pom'una bakın:

  2. Spark Bağlayıcısı'nın inşası için bu kaynağı kullanın.

  3. Maven proje tanımlarını kullanan Scala/Java uygulamaları için, aşağıdaki yapıtla (en son sürüm farklılık gösterebilir) uygulamanıza bağlantı yapın:

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Derleme komutları

Jar derlemek ve tüm testleri çalıştırmak için:

mvn clean package

Jar derlemek için tüm testleri çalıştırın ve jar'ı yerel Maven deponıza yükleyin:

mvn clean install

Daha fazla bilgi için bkz. bağlayıcı kullanımı.

Spark kümesi kurulumu

Not

Aşağıdaki adımları gerçekleştirerek en son Azure Veri Gezgini Spark bağlayıcı sürümünün kullanılması önerilir.

  1. Spark 2.4.4 ve Scala 2.11 veya Spark 3.0.1 ve Scala 2.12 kullanarak Azure Databricks kümesine bağlı olarak aşağıdaki Spark kümesi ayarlarını yapılandırma:

    Databricks kümesi ayarları.

  2. Maven'dan en son spark-kusto-connector kitaplığını yükleyin:

    Kitaplıkları içeri aktarma.Spark-Kusto-Connector'ı seçin.

  3. Tüm gerekli kitaplıkların yüklü olduğunu doğrulayın:

    Kitaplıkların yüklü olduğunu doğrulayın.

  4. JAR dosyası kullanarak yükleme için ek bağımlılıkların yük olduğunu doğrulayın:

    Bağımlılıkları ekleyin.

Kimlik Doğrulaması

Azure Veri Gezgini Spark bağlayıcısı, aşağıdaki yöntemlerden birini Azure Active Directory (Azure AD) ile kimlik doğrulamasına olanak sağlar:

Azure AD uygulama kimlik doğrulaması

Azure AD uygulama kimlik doğrulaması en basit ve en yaygın kimlik doğrulama yöntemidir ve Spark bağlayıcısı Azure Veri Gezgini önerilir.

Özellikler Seçenek Dizesi Description
KUSTO_AAD_APP_ID kustoAadAppId Azure AD uygulama (istemci) tanımlayıcısı.
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Azure AD kimlik doğrulama yetkilisi. Azure AD Dizini (kiracı) kimliği.
KUSTO_AAD_APP_SECRET kustoAadAppSecret İstemci için Azure AD uygulama anahtarı.

Not

Eski API sürümleri (2.0.0'dan küçük) şu adlandırmaya sahiptir: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Azure Veri Gezgini ayrıcalıkları

Bir kümede aşağıdaki Azure Veri Gezgini ver:

  • Okumak için (veri kaynağı), Azure AD kimliğinin hedef veritabanında görüntüleyici ayrıcalıklarına veya hedef tabloda yönetici ayrıcalıklarına sahip olması gerekir.
  • Yazma (veri havuzu) için Azure AD kimliğinin hedef veritabanında veri alma ayrıcalıklarına sahip olması gerekir. Ayrıca yeni tablolar oluşturmak için hedef veritabanında kullanıcı ayrıcalıklarına da sahip olmalıdır. Hedef tablo zaten varsa, hedef tabloda yönetici ayrıcalıklarını yapılandırmalısınız.

Rol tabanlı rol Azure Veri Gezgini daha fazla bilgi için bkz. rol tabanlı yetkilendirme. Güvenlik rollerini yönetmek için bkz. güvenlik rolleri yönetimi.

Spark havuzu: Azure Veri Gezgini

  1. Havuz parametrelerini ayarlayın:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Kümeyi toplu olarak Azure Veri Gezgini Spark DataFrame yazın:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Basitleştirilmiş söz dizimlerini de kullanabilirsiniz:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Akış verileri yazma:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark kaynağı: Azure Veri Gezgini'den okuma

  1. Küçük miktarlardaki verileri okurkenveri sorgusunu tanımlayın:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. İsteğe bağlı: Geçici blob depolama alanını sağlarsanız (Azure Veri Gezgini değil) bloblar çağıranın sorumluluğu altında oluşturulur. Buna depolama alanı sağlama, erişim anahtarlarını döndürme ve geçici yapıtları silme dahildir. KustoBlobStorageUtils modülü, hesap ve kapsayıcı koordinatları ile hesap kimlik bilgilerine göre blobları silmek için yardımcı işlevler ya da yazma, okuma ve liste izinlerine sahip tam bir SAS URL'si içerir. Karşılık gelen RDD artık gerekli olmadığı zaman, her işlem geçici blob yapıtlarını ayrı bir dizinde depolar. Bu dizin, Spark Sürücüsü düğümünde bildirilen okuma işlemi bilgi günlüklerinin bir parçası olarak yakalanır.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    Yukarıdaki örnekte, Key Vault arabirimi kullanılarak erişilemez; Databricks gizli dizilerini kullanmanın daha basit bir yöntemi kullanılır.

  3. Okuma Azure Veri Gezgini.

    • Geçici blob depolamayı sağlarsanız, aşağıdaki gibi Azure Veri Gezgini okuyun:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Geçici Azure Veri Gezgini blob depolamayı sağlarsa, aşağıdaki gibi Azure Veri Gezgini okuma yapın:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      

Sonraki adımlar