hızlı başlangıç: SQL apı için Azure Cosmos DB Spark 3 OLTP bağlayıcısı ile verileri yönetme

UYGULANDıĞı YER: SQL API

bu öğretici, Cosmos DB okuma veya yazma için Cosmos DB Spark bağlayıcısını nasıl kullanacağınızı gösteren hızlı bir başlangıç kılavuzudur. spark Connector, spark 3.1. x ' i temel alır Cosmos DB.

bu hızlı öğreticide, spark 3.1.1 ile Azure Databricks Runtime 8,0 ' i ve Cosmos DB spark bağlayıcısının nasıl kullanılacağını gösteren bir Jupyter Notebook güveniyoruz.

Başka bir Spark 3.1.1 Spark teklifini de kullanabilirsiniz. Ayrıca, Spark (PySpark, Scala, Java, vb.) tarafından desteklenen herhangi bir dili veya bildiğiniz herhangi bir Spark arabirimini (Jupyter Notebook, Livy, vb.) kullanabilirsiniz.

Önkoşullar

DOLAYıSıYLA SLF4J yalnızca, günlüğü kullanmayı planlıyorsanız gerekir, Ayrıca, DOLAYıSıYLA SLF4J API 'sini tercih ettiğiniz günlük uygulamasıyla bağlayacaktır. Daha fazla bilgi için dolayısıyla slf4j Kullanıcı kılavuzuna bakın.

spark kümenizde Cosmos DB spark bağlayıcısını ( azure-cosmos-spark_3 -1_2-12-4.3.1. jar ) yükleyip

Başlangıç Kılavuzu, PySpark 'a dayalıdır, ancak eşdeğer Scala sürümünü de kullanabilirsiniz ve aşağıdaki kod parçacığını bir Azure Databricks PySpark not defterinde çalıştırabilirsiniz.

Veritabanları ve kapsayıcılar oluşturma

ilk olarak, Cosmos DB hesabı kimlik bilgilerini ve Cosmos DB veritabanı adını ve kapsayıcı adını ayarlayın.

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

daha sonra, Spark aracılığıyla bir Cosmos DB veritabanı ve kapsayıcı oluşturmak için yeni katalog apı 'sini kullanabilirsiniz.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

Katalog API 'SI ile kapsayıcılar oluştururken, oluşturulacak kapsayıcının aktarım hızını ve bölüm anahtarı yolunu ayarlayabilirsiniz.

Daha fazla bilgi için bkz. tam Katalog API 'si belgeleri.

Veriyi çekme

veri kaynağının adı cosmos.oltp ve aşağıdaki örnekte, Cosmos DB için iki öğeden oluşan bir bellek veri çerçevesini nasıl yazabileceğiniz gösterilmektedir:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

idCosmos DB için zorunlu bir alan olduğunu unutmayın.

Verilerle ilgili daha fazla bilgi almak için bkz. tam yazma yapılandırması belgeleri.

Verileri sorgulama

Aynı cosmos.oltp veri kaynağını kullanarak, filtreleri göndermek için verileri sorgulayabilir ve kullanabilirsiniz filter :

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

Verileri sorgulama hakkında daha fazla bilgi için bkz. tam sorgu yapılandırması belgeleri.

Şema çıkarımı

Verileri sorgularken Spark Bağlayıcısı, üzerine ayarlayarak varolan öğeleri örneklemeye göre şemayı çıkarsçıkarabilir spark.cosmos.read.inferSchema.enabled true .

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()
 
df.printSchema()

Alternatif olarak, verileri okumak için kullanmak istediğiniz özel şemayı geçirebilirsiniz:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Özel şema belirtilmemişse ve Şema çıkarımı devre dışı bırakılmışsa, sonuçta elde edilen veriler öğelerin ham JSON içeriğini döndürür:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Şema çıkarımı ile ilgili daha fazla bilgi için bkz. tam Şema çıkarımı yapılandırması belgeleri.

Yapılandırma başvurusu

Genel yapılandırma

Yapılandırma özelliği adı Varsayılan Açıklama
spark.cosmos.accountEndpoint Hiçbiri Cosmos DB hesabı uç noktası urı 'si
spark.cosmos.accountKey Hiçbiri Cosmos DB hesap anahtarı
spark.cosmos.database Hiçbiri Cosmos DB veritabanı adı
spark.cosmos.container Hiçbiri Cosmos DB kapsayıcı adı

Ek ayarlama

Yapılandırma özelliği adı Varsayılan Açıklama
spark.cosmos.useGatewayMode false İstemci işlemleri için ağ geçidi modunu kullan
spark.cosmos.read.forceEventualConsistency true İstemcinin varsayılan hesap düzeyi tutarlılığını kullanmak yerine okuma işlemleri için nihai tutarlılığı kullanmasını sağlar
spark.cosmos.applicationName Hiçbiri Uygulama adı
spark.cosmos.preferredRegionsList Hiçbiri çok bölgeli Cosmos DB hesap için kullanılacak tercih edilen bölge listesi. Bu, virgülle ayrılmış bir değerdir (örneğin, [East US, West US] veya), East US, West US sunulan tercih edilen bölgeler ipucu olarak kullanılacaktır. Cosmos DB hesabınızla birlikte bulunan bir spark kümesi kullanmalı ve spark küme bölgesini tercih edilen bölge olarak geçitirsiniz. Bkz . Azurebölgelerinin listesi. spark.cosmos.preferredRegionsAs diğer adı da kullanabilirsiniz
spark.cosmos.diagnostics Hiçbiri Daha ayrıntılı tanılamayı etkinleştirmek için kullanılabilir. Şu anda desteklenen tek seçenek, bu özelliği, simple ek günlüklerin INFO sürücü ve yürütücü günlüklerinde Günlükler olarak yayılmasıyla sonuçlanacaktır.

Yazma yapılandırması

Yapılandırma özelliği adı Varsayılan Açıklama
spark.cosmos.write.strategy ItemOverwrite Cosmos DB öğeleri yazma stratejisi: ItemOverwrite (upsert kullanılarak), ItemAppend (önceden var olan öğeleri oluştur, var olan öğeleri sil, çakışmalar), ( ItemDelete tüm belgeleri sil), ItemDeleteIfNotModified (etag 'in değiştiği tüm belgeleri sil)
spark.cosmos.write.maxRetryCount 10 yeniden denenebilir hatalarda yazma sayısı üst sınırı Cosmos DB (örneğin, bağlantı hatası)
spark.cosmos.write.point.maxConcurrency Hiçbiri Cosmos DB öğe yazma en fazla eşzamanlılık. Belirtilmemişse, Spark yürütücü VM boyutu temel alınarak belirlenir
spark.cosmos.write.bulk.maxPendingOperations Hiçbiri Cosmos DB öğe yazma toplu modu maksimum bekleyen işlemler. Aynı anda işlenen toplu işlemlerin limitini tanımlar. Belirtilmemişse, Spark yürütücü VM boyutu temel alınarak belirlenir. Veri hacmi hedef kapsayıcıda sağlanan aktarım hızı için büyükse, bu ayar şu tahmine göre ayarlanabilir 1000 x Cores
spark.cosmos.write.bulk.enabled true Cosmos DB öğe yazma toplu etkin

Sorgu yapılandırması

Yapılandırma özelliği adı Varsayılan Açıklama
spark.cosmos.read.customQuery Hiçbiri sağlandığı zaman, sorgu, koşul push aracılığıyla dinamik olarak oluşturmak yerine Cosmos uç noktasına karşı işlenir. Genellikle, sorgu planına bağlı olarak en verimli filtre kümesini oluşturmaya izin verecek olduğundan, Spark 'ın koşul gönderimi ' nı kullanmanız önerilir. Ancak toplamalara (en azından Spark 3,1 ' de) gönderilmemiş olan toplamalar (Count, group by, Ort, Sum vb.) gibi çeşitli koşullar vardır. bu nedenle, özel sorgu Cosmos gönderilen sorguya gönderilmesine izin veren bir geri dönüş olur. Belirtilmişse, şema çıkarımı etkinken özel sorgu da şemayı çıkarsanacak şekilde kullanılacaktır.
spark.cosmos.read.maxItemCount 1000 Tek bir sorgu veya değişiklik akışı isteği için döndürülebilecek en fazla belge sayısını geçersiz kılar. Varsayılan değer 1000 -Bu, yalnızca 1 KB 'tan küçük ortalama belge boyutları için artırmayı düşünün veya projeksiyon, sorgularda seçilen özelliklerin sayısını önemli ölçüde (örneğin, yalnızca "kimliği", yalnızca "kimlik" olarak seçerken) azaltır.

Şema çıkarımı yapılandırması

Okuma işlemleri yaparken, kullanıcılar özel bir şema belirtebilir veya bağlayıcının onu çıkarması için izin verebilir. Şema çıkarımı varsayılan olarak etkindir.

Yapılandırma Özelliği Adı Varsayılan Açıklama
spark.cosmos.read.inferSchema.enabled true Şema çıkarı devre dışı bırakılmıştır ve kullanıcı bir şema sağlamazsa ham json döndürülür.
spark.cosmos.read.inferSchema.query SELECT * FROM r Şema çıkarı etkinleştirildiğinde, bunu çıkarımk için özel sorgu olarak kullanılır. Örneğin, bir kapsayıcı içinde farklı şemalara sahip birden çok varlık depolarsanız ve çıkarımın yalnızca belirli belge türlerine bakması veya yalnızca belirli sütunların projesini yapmak istediğiniz durumlarda.
spark.cosmos.read.inferSchema.samplingSize 1000 Şema çıkarken ve sorgu kullanmazken kullanmak üzere örnekleme boyutu.
spark.cosmos.read.inferSchema.includeSystemProperties false Şema çıkarı etkinleştirildiğinde, sonuçta elde edilen şemanın tüm veritabanı Cosmos dahil olup olmadığı.
spark.cosmos.read.inferSchema.includeTimestamp false Şema çıkarı etkinleştirildiğinde, sonuçta elde edilen şemanın Zaman Damgası ( ) belgesini içerecek olup _ts olmadığı. Etkinse spark.cosmos.read.inferSchema.includeSystemProperties gerekli değildir çünkü zaten tüm sistem özelliklerini içerecektir.
spark.cosmos.read.inferSchema.forceNullableProperties true Şema çıkarı etkinleştirildiğinde, sonuçta elde edilen şemanın tüm sütunları null değere döndürebilir olup olmadığı. Varsayılan olarak, örnek küme içindeki tüm satırlar null olmayan değerlere sahip olsa bile tüm sütunlar (Cosmos sistem özellikleri hariç) null değer olarak kabul edilir. Devre dışı bırakılarak, örnek küme içindeki herhangi bir kaydın bir sütun içinde null-değerlere sahip olup olmadığı bağlı olarak, alınan sütunlar null değer olarak kabul edilir.

Serileştirme yapılandırması

JSON serileştirme/seriden serileştirme davranışını etkilemek için kullanılır

Yapılandırma Özelliği Adı Varsayılan Açıklama
spark.cosmos.serialization.inclusionMode Always Null/varsayılan değerlerin json'a seri hale getirilecek veya null/varsayılan değere sahip özelliklerin atlanacak olup olmadığını belirler. Davranış, Zaman'ın JsonInclude.Include ile aynı fikirleri izler. Always , json özelliklerinin null ve varsayılan değerler için bile oluşturulacak olduğu anlamına gelir. NonNull , açık null değerler için hiçbir json özelliği oluşturulmayacak anlamına gelir. NonEmpty , json özelliklerinin boş dize değerleri veya boş diziler/mpas için oluşturulmayacak olduğu anlamına gelir. NonDefault , json özelliklerinin yalnızca null/boş için değil aynı zamanda sayısal özellikler için varsayılan değerle aynı olduğunda 0 atlan anlamına gelir.

Değişiklik akışı (yalnızca Spark-Streaming kaynağı kullanan veri cosmos.oltp.changeFeed kaynağı için) yapılandırma

Yapılandırma Özelliği Adı Varsayılan Açıklama
spark.cosmos.changeFeed.startFrom Beginning ChangeFeed Start from settings ( veya belirli bir zaman Now Beginning noktası (UTC) 2020-02-10T14:15:03 örneğinde ) - varsayılan değer Beginning değeridir. Yazma yapılandırması bir içeriyorsa ve herhangi bir denetim noktası varsa, akış her zaman ayarlardan bağımsız olarak devam eder. Amaç bu ise akışı yeniden başlatmak için denetim noktalarını değiştirmeniz veya checkpointLocation spark.cosmos.changeFeed.startFrom checkpointLocation silmeniz gerekir.
spark.cosmos.changeFeed.mode Incremental ChangeFeed modu ( Incremental veya FullFidelity ) - NOT: şu anda deneysel FullFidelity durumda. Aboneliğin/hesabın özel önizleme için etkinleştirilmesini ve için hataya neden olan değişiklikler (döndürülen belgelerin şeması) FullFidelity olduğunu gerektirir. Bu noktada yalnızca üretim FullFidelity dışı senaryolar için kullanılması önerilir.
spark.cosmos.changeFeed.itemCountPerTriggerHint Hiçbiri Her mikro toplu iş/tetikleyici için değişiklik akışından okunan yaklaşık maksimum öğe sayısı

Json dönüştürme yapılandırması

Yapılandırma Özelliği Adı Varsayılan Açıklama
spark.cosmos.read.schemaConversionMode Relaxed Şema dönüştürme davranışı ( Relaxed , Strict ). JSON belgelerini okurken, bir belge şema türüyle eş kullanmayan bir öznitelik içeriyorsa, kullanıcı bir değer mi (Gevşek) yoksa özel durum null mu (Katı) kullanmaya karar verir.

Bölümleme stratejisi yapılandırması

Yapılandırma Özelliği Adı Varsayılan Açıklama
spark.cosmos.read.partitioning.strategy Default Kullanılan bölümleme stratejisi (Varsayılan, Özel, Kısıtlayıcı veya Agresif)
spark.cosmos.partitioning.targetedCount Hiçbiri Hedeflenen Bölüm Sayısı. Strategy==Custom kullanılmadıkça bu parametre isteğe bağlıdır ve yoksayılır. Bu durumda Spark Bağlayıcısı bölüm sayısını dinamik olarak hesaplamaz ancak bu değere bağlı kalmaz.

Aktarım hızı denetimi yapılandırması

Yapılandırma Özelliği Adı Varsayılan Açıklama
spark.cosmos.throughputControl.enabled false Aktarım hızı denetimi etkin olup olmadığı
spark.cosmos.throughputControl.name Hiçbiri Aktarım hızı denetim grubu adı
spark.cosmos.throughputControl.targetThroughput Hiçbiri Aktarım hızı denetim grubu hedef aktarım hızı
spark.cosmos.throughputControl.targetThroughputThreshold Hiçbiri Aktarım hızı denetim grubu hedef aktarım hızı eşiği
spark.cosmos.throughputControl.globalControl.database Hiçbiri Veritabanı: Aktarım hızı genel denetimi için kullanılacak
spark.cosmos.throughputControl.globalControl.container Hiçbiri Kapsayıcı: Aktarım hızı genel denetimi için kullanılacak
spark.cosmos.throughputControl.globalControl.renewIntervalInMS 5s İstemcinin kendi aktarım hızı kullanımını ne sıklıkta güncelleştiriyor?
spark.cosmos.throughputControl.globalControl.expireIntervalInMS 11s Çevrimdışı bir istemcinin ne kadar hızlı algılanır?

Sonraki adımlar