hızlı başlangıç: SQL apı için Azure Cosmos DB Spark 3 OLTP bağlayıcısı ile verileri yönetme
UYGULANDıĞı YER:
SQL API
bu öğretici, Cosmos DB okuma veya yazma için Cosmos DB Spark bağlayıcısını nasıl kullanacağınızı gösteren hızlı bir başlangıç kılavuzudur. spark Connector, spark 3.1. x ' i temel alır Cosmos DB.
bu hızlı öğreticide, spark 3.1.1 ile Azure Databricks Runtime 8,0 ' i ve Cosmos DB spark bağlayıcısının nasıl kullanılacağını gösteren bir Jupyter Notebook güveniyoruz.
Başka bir Spark 3.1.1 Spark teklifini de kullanabilirsiniz. Ayrıca, Spark (PySpark, Scala, Java, vb.) tarafından desteklenen herhangi bir dili veya bildiğiniz herhangi bir Spark arabirimini (Jupyter Notebook, Livy, vb.) kullanabilirsiniz.
Önkoşullar
Etkin bir Azure hesabı. Bir aboneliğiniz yoksa ücretsiz bir hesap için kaydolabilirsiniz. alternatif olarak, geliştirme ve test için Azure Cosmos DB kullan Emulator kullanabilirsiniz.
Spark 3.1.1 ile çalışma zamanı 8,0 Azure Databricks .
Seçim Dolayısıyla slf4j Binding , belirli bir günlük çerçevesini dolayısıyla slf4j ile ilişkilendirmek için kullanılır.
DOLAYıSıYLA SLF4J yalnızca, günlüğü kullanmayı planlıyorsanız gerekir, Ayrıca, DOLAYıSıYLA SLF4J API 'sini tercih ettiğiniz günlük uygulamasıyla bağlayacaktır. Daha fazla bilgi için dolayısıyla slf4j Kullanıcı kılavuzuna bakın.
spark kümenizde Cosmos DB spark bağlayıcısını ( azure-cosmos-spark_3 -1_2-12-4.3.1. jar ) yükleyip
Başlangıç Kılavuzu, PySpark 'a dayalıdır, ancak eşdeğer Scala sürümünü de kullanabilirsiniz ve aşağıdaki kod parçacığını bir Azure Databricks PySpark not defterinde çalıştırabilirsiniz.
Veritabanları ve kapsayıcılar oluşturma
ilk olarak, Cosmos DB hesabı kimlik bilgilerini ve Cosmos DB veritabanı adını ve kapsayıcı adını ayarlayın.
cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"
cfg = {
"spark.cosmos.accountEndpoint" : cosmosEndpoint,
"spark.cosmos.accountKey" : cosmosMasterKey,
"spark.cosmos.database" : cosmosDatabaseName,
"spark.cosmos.container" : cosmosContainerName,
}
daha sonra, Spark aracılığıyla bir Cosmos DB veritabanı ve kapsayıcı oluşturmak için yeni katalog apı 'sini kullanabilirsiniz.
# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))
# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))
Katalog API 'SI ile kapsayıcılar oluştururken, oluşturulacak kapsayıcının aktarım hızını ve bölüm anahtarı yolunu ayarlayabilirsiniz.
Daha fazla bilgi için bkz. tam Katalog API 'si belgeleri.
Veriyi çekme
veri kaynağının adı cosmos.oltp ve aşağıdaki örnekte, Cosmos DB için iki öğeden oluşan bir bellek veri çerçevesini nasıl yazabileceğiniz gösterilmektedir:
spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
.toDF("id","name","age","isAlive") \
.write\
.format("cosmos.oltp")\
.options(**cfg)\
.mode("APPEND")\
.save()
idCosmos DB için zorunlu bir alan olduğunu unutmayın.
Verilerle ilgili daha fazla bilgi almak için bkz. tam yazma yapılandırması belgeleri.
Verileri sorgulama
Aynı cosmos.oltp veri kaynağını kullanarak, filtreleri göndermek için verileri sorgulayabilir ve kullanabilirsiniz filter :
from pyspark.sql.functions import col
df = spark.read.format("cosmos.oltp").options(**cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.filter(col("isAlive") == True)\
.show()
Verileri sorgulama hakkında daha fazla bilgi için bkz. tam sorgu yapılandırması belgeleri.
Şema çıkarımı
Verileri sorgularken Spark Bağlayıcısı, üzerine ayarlayarak varolan öğeleri örneklemeye göre şemayı çıkarsçıkarabilir spark.cosmos.read.inferSchema.enabled true .
df = spark.read.format("cosmos.oltp").options(**cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.printSchema()
Alternatif olarak, verileri okumak için kullanmak istediğiniz özel şemayı geçirebilirsiniz:
customSchema = StructType([
StructField("id", StringType()),
StructField("name", StringType()),
StructField("type", StringType()),
StructField("age", IntegerType()),
StructField("isAlive", BooleanType())
])
df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg)\
.load()
df.printSchema()
Özel şema belirtilmemişse ve Şema çıkarımı devre dışı bırakılmışsa, sonuçta elde edilen veriler öğelerin ham JSON içeriğini döndürür:
df = spark.read.format("cosmos.oltp").options(**cfg)\
.load()
df.printSchema()
Şema çıkarımı ile ilgili daha fazla bilgi için bkz. tam Şema çıkarımı yapılandırması belgeleri.
Yapılandırma başvurusu
Genel yapılandırma
| Yapılandırma özelliği adı | Varsayılan | Açıklama |
|---|---|---|
spark.cosmos.accountEndpoint |
Hiçbiri | Cosmos DB hesabı uç noktası urı 'si |
spark.cosmos.accountKey |
Hiçbiri | Cosmos DB hesap anahtarı |
spark.cosmos.database |
Hiçbiri | Cosmos DB veritabanı adı |
spark.cosmos.container |
Hiçbiri | Cosmos DB kapsayıcı adı |
Ek ayarlama
| Yapılandırma özelliği adı | Varsayılan | Açıklama |
|---|---|---|
spark.cosmos.useGatewayMode |
false |
İstemci işlemleri için ağ geçidi modunu kullan |
spark.cosmos.read.forceEventualConsistency |
true |
İstemcinin varsayılan hesap düzeyi tutarlılığını kullanmak yerine okuma işlemleri için nihai tutarlılığı kullanmasını sağlar |
spark.cosmos.applicationName |
Hiçbiri | Uygulama adı |
spark.cosmos.preferredRegionsList |
Hiçbiri | çok bölgeli Cosmos DB hesap için kullanılacak tercih edilen bölge listesi. Bu, virgülle ayrılmış bir değerdir (örneğin, [East US, West US] veya), East US, West US sunulan tercih edilen bölgeler ipucu olarak kullanılacaktır. Cosmos DB hesabınızla birlikte bulunan bir spark kümesi kullanmalı ve spark küme bölgesini tercih edilen bölge olarak geçitirsiniz. Bkz . Azurebölgelerinin listesi. spark.cosmos.preferredRegionsAs diğer adı da kullanabilirsiniz |
spark.cosmos.diagnostics |
Hiçbiri | Daha ayrıntılı tanılamayı etkinleştirmek için kullanılabilir. Şu anda desteklenen tek seçenek, bu özelliği, simple ek günlüklerin INFO sürücü ve yürütücü günlüklerinde Günlükler olarak yayılmasıyla sonuçlanacaktır. |
Yazma yapılandırması
| Yapılandırma özelliği adı | Varsayılan | Açıklama |
|---|---|---|
spark.cosmos.write.strategy |
ItemOverwrite |
Cosmos DB öğeleri yazma stratejisi: ItemOverwrite (upsert kullanılarak), ItemAppend (önceden var olan öğeleri oluştur, var olan öğeleri sil, çakışmalar), ( ItemDelete tüm belgeleri sil), ItemDeleteIfNotModified (etag 'in değiştiği tüm belgeleri sil) |
spark.cosmos.write.maxRetryCount |
10 |
yeniden denenebilir hatalarda yazma sayısı üst sınırı Cosmos DB (örneğin, bağlantı hatası) |
spark.cosmos.write.point.maxConcurrency |
Hiçbiri | Cosmos DB öğe yazma en fazla eşzamanlılık. Belirtilmemişse, Spark yürütücü VM boyutu temel alınarak belirlenir |
spark.cosmos.write.bulk.maxPendingOperations |
Hiçbiri | Cosmos DB öğe yazma toplu modu maksimum bekleyen işlemler. Aynı anda işlenen toplu işlemlerin limitini tanımlar. Belirtilmemişse, Spark yürütücü VM boyutu temel alınarak belirlenir. Veri hacmi hedef kapsayıcıda sağlanan aktarım hızı için büyükse, bu ayar şu tahmine göre ayarlanabilir 1000 x Cores |
spark.cosmos.write.bulk.enabled |
true |
Cosmos DB öğe yazma toplu etkin |
Sorgu yapılandırması
| Yapılandırma özelliği adı | Varsayılan | Açıklama |
|---|---|---|
spark.cosmos.read.customQuery |
Hiçbiri | sağlandığı zaman, sorgu, koşul push aracılığıyla dinamik olarak oluşturmak yerine Cosmos uç noktasına karşı işlenir. Genellikle, sorgu planına bağlı olarak en verimli filtre kümesini oluşturmaya izin verecek olduğundan, Spark 'ın koşul gönderimi ' nı kullanmanız önerilir. Ancak toplamalara (en azından Spark 3,1 ' de) gönderilmemiş olan toplamalar (Count, group by, Ort, Sum vb.) gibi çeşitli koşullar vardır. bu nedenle, özel sorgu Cosmos gönderilen sorguya gönderilmesine izin veren bir geri dönüş olur. Belirtilmişse, şema çıkarımı etkinken özel sorgu da şemayı çıkarsanacak şekilde kullanılacaktır. |
spark.cosmos.read.maxItemCount |
1000 |
Tek bir sorgu veya değişiklik akışı isteği için döndürülebilecek en fazla belge sayısını geçersiz kılar. Varsayılan değer 1000 -Bu, yalnızca 1 KB 'tan küçük ortalama belge boyutları için artırmayı düşünün veya projeksiyon, sorgularda seçilen özelliklerin sayısını önemli ölçüde (örneğin, yalnızca "kimliği", yalnızca "kimlik" olarak seçerken) azaltır. |
Şema çıkarımı yapılandırması
Okuma işlemleri yaparken, kullanıcılar özel bir şema belirtebilir veya bağlayıcının onu çıkarması için izin verebilir. Şema çıkarımı varsayılan olarak etkindir.
| Yapılandırma Özelliği Adı | Varsayılan | Açıklama |
|---|---|---|
spark.cosmos.read.inferSchema.enabled |
true |
Şema çıkarı devre dışı bırakılmıştır ve kullanıcı bir şema sağlamazsa ham json döndürülür. |
spark.cosmos.read.inferSchema.query |
SELECT * FROM r |
Şema çıkarı etkinleştirildiğinde, bunu çıkarımk için özel sorgu olarak kullanılır. Örneğin, bir kapsayıcı içinde farklı şemalara sahip birden çok varlık depolarsanız ve çıkarımın yalnızca belirli belge türlerine bakması veya yalnızca belirli sütunların projesini yapmak istediğiniz durumlarda. |
spark.cosmos.read.inferSchema.samplingSize |
1000 |
Şema çıkarken ve sorgu kullanmazken kullanmak üzere örnekleme boyutu. |
spark.cosmos.read.inferSchema.includeSystemProperties |
false |
Şema çıkarı etkinleştirildiğinde, sonuçta elde edilen şemanın tüm veritabanı Cosmos dahil olup olmadığı. |
spark.cosmos.read.inferSchema.includeTimestamp |
false |
Şema çıkarı etkinleştirildiğinde, sonuçta elde edilen şemanın Zaman Damgası ( ) belgesini içerecek olup _ts olmadığı. Etkinse spark.cosmos.read.inferSchema.includeSystemProperties gerekli değildir çünkü zaten tüm sistem özelliklerini içerecektir. |
spark.cosmos.read.inferSchema.forceNullableProperties |
true |
Şema çıkarı etkinleştirildiğinde, sonuçta elde edilen şemanın tüm sütunları null değere döndürebilir olup olmadığı. Varsayılan olarak, örnek küme içindeki tüm satırlar null olmayan değerlere sahip olsa bile tüm sütunlar (Cosmos sistem özellikleri hariç) null değer olarak kabul edilir. Devre dışı bırakılarak, örnek küme içindeki herhangi bir kaydın bir sütun içinde null-değerlere sahip olup olmadığı bağlı olarak, alınan sütunlar null değer olarak kabul edilir. |
Serileştirme yapılandırması
JSON serileştirme/seriden serileştirme davranışını etkilemek için kullanılır
| Yapılandırma Özelliği Adı | Varsayılan | Açıklama |
|---|---|---|
spark.cosmos.serialization.inclusionMode |
Always |
Null/varsayılan değerlerin json'a seri hale getirilecek veya null/varsayılan değere sahip özelliklerin atlanacak olup olmadığını belirler. Davranış, Zaman'ın JsonInclude.Include ile aynı fikirleri izler. Always , json özelliklerinin null ve varsayılan değerler için bile oluşturulacak olduğu anlamına gelir. NonNull , açık null değerler için hiçbir json özelliği oluşturulmayacak anlamına gelir. NonEmpty , json özelliklerinin boş dize değerleri veya boş diziler/mpas için oluşturulmayacak olduğu anlamına gelir. NonDefault , json özelliklerinin yalnızca null/boş için değil aynı zamanda sayısal özellikler için varsayılan değerle aynı olduğunda 0 atlan anlamına gelir. |
Değişiklik akışı (yalnızca Spark-Streaming kaynağı kullanan veri cosmos.oltp.changeFeed kaynağı için) yapılandırma
| Yapılandırma Özelliği Adı | Varsayılan | Açıklama |
|---|---|---|
| spark.cosmos.changeFeed.startFrom | Beginning |
ChangeFeed Start from settings ( veya belirli bir zaman Now Beginning noktası (UTC) 2020-02-10T14:15:03 örneğinde ) - varsayılan değer Beginning değeridir. Yazma yapılandırması bir içeriyorsa ve herhangi bir denetim noktası varsa, akış her zaman ayarlardan bağımsız olarak devam eder. Amaç bu ise akışı yeniden başlatmak için denetim noktalarını değiştirmeniz veya checkpointLocation spark.cosmos.changeFeed.startFrom checkpointLocation silmeniz gerekir. |
| spark.cosmos.changeFeed.mode | Incremental |
ChangeFeed modu ( Incremental veya FullFidelity ) - NOT: şu anda deneysel FullFidelity durumda. Aboneliğin/hesabın özel önizleme için etkinleştirilmesini ve için hataya neden olan değişiklikler (döndürülen belgelerin şeması) FullFidelity olduğunu gerektirir. Bu noktada yalnızca üretim FullFidelity dışı senaryolar için kullanılması önerilir. |
| spark.cosmos.changeFeed.itemCountPerTriggerHint | Hiçbiri | Her mikro toplu iş/tetikleyici için değişiklik akışından okunan yaklaşık maksimum öğe sayısı |
Json dönüştürme yapılandırması
| Yapılandırma Özelliği Adı | Varsayılan | Açıklama |
|---|---|---|
spark.cosmos.read.schemaConversionMode |
Relaxed |
Şema dönüştürme davranışı ( Relaxed , Strict ). JSON belgelerini okurken, bir belge şema türüyle eş kullanmayan bir öznitelik içeriyorsa, kullanıcı bir değer mi (Gevşek) yoksa özel durum null mu (Katı) kullanmaya karar verir. |
Bölümleme stratejisi yapılandırması
| Yapılandırma Özelliği Adı | Varsayılan | Açıklama |
|---|---|---|
spark.cosmos.read.partitioning.strategy |
Default |
Kullanılan bölümleme stratejisi (Varsayılan, Özel, Kısıtlayıcı veya Agresif) |
spark.cosmos.partitioning.targetedCount |
Hiçbiri | Hedeflenen Bölüm Sayısı. Strategy==Custom kullanılmadıkça bu parametre isteğe bağlıdır ve yoksayılır. Bu durumda Spark Bağlayıcısı bölüm sayısını dinamik olarak hesaplamaz ancak bu değere bağlı kalmaz. |
Aktarım hızı denetimi yapılandırması
| Yapılandırma Özelliği Adı | Varsayılan | Açıklama |
|---|---|---|
spark.cosmos.throughputControl.enabled |
false |
Aktarım hızı denetimi etkin olup olmadığı |
spark.cosmos.throughputControl.name |
Hiçbiri | Aktarım hızı denetim grubu adı |
spark.cosmos.throughputControl.targetThroughput |
Hiçbiri | Aktarım hızı denetim grubu hedef aktarım hızı |
spark.cosmos.throughputControl.targetThroughputThreshold |
Hiçbiri | Aktarım hızı denetim grubu hedef aktarım hızı eşiği |
spark.cosmos.throughputControl.globalControl.database |
Hiçbiri | Veritabanı: Aktarım hızı genel denetimi için kullanılacak |
spark.cosmos.throughputControl.globalControl.container |
Hiçbiri | Kapsayıcı: Aktarım hızı genel denetimi için kullanılacak |
spark.cosmos.throughputControl.globalControl.renewIntervalInMS |
5s |
İstemcinin kendi aktarım hızı kullanımını ne sıklıkta güncelleştiriyor? |
spark.cosmos.throughputControl.globalControl.expireIntervalInMS |
11s |
Çevrimdışı bir istemcinin ne kadar hızlı algılanır? |
Sonraki adımlar
- Azure Cosmos DB Apache Spark 3 Çekirdek için OLTP Bağlayıcısı (SQL) API'si: Sürüm notları ve kaynaklar
- Apache Spark hakkında daha fazla bilgi edinin.