Öğretici: Spark kullanarak NoSQL için Azure Cosmos DB'ye Bağlan
UYGULANANLAR: NoSQL
Bu öğreticide, NoSQL için Azure Cosmos DB hesabından veri okumak veya yazmak için Azure Cosmos DB Spark bağlayıcısını kullanacaksınız. Bu öğreticide, Spark'tan NoSQL için API ile tümleştirmeyi göstermek için Azure Databricks ve jupyter not defteri kullanılır. Bu öğretici, Spark tarafından desteklenen herhangi bir dili veya arabirimi kullansanız bile Python ve Scala'ya odaklanır.
Bu öğreticide aşağıdakilerin nasıl yapılacağını öğreneceksiniz:
- Spark ve Jupyter not defteri kullanarak NoSQL hesabı için API'ye Bağlan
- Veritabanı ve kapsayıcı kaynakları oluşturma
- Kapsayıcıya veri alma
- Kapsayıcıdaki verileri sorgulama
- Kapsayıcıdaki öğelerde ortak işlemler gerçekleştirme
Önkoşullar
- Mevcut bir NoSQL için Azure Cosmos DB hesabı.
- Mevcut bir Azure aboneliğiniz varsa yeni bir hesap oluşturun.
- Azure aboneliği yok mu? Kredi kartı gerektirmeden Azure Cosmos DB'i ücretsiz deneyebilirsiniz.
- Mevcut bir Azure Databricks çalışma alanı.
Spark ve Jupyter kullanarak Bağlan
NoSQL için Azure Cosmos DB hesabınıza bağlanmak üzere Apache Spark 3.4.x kullanmaya hazır bir işlem kümesi oluşturmak için mevcut Azure Databricks çalışma alanınızı kullanın.
Azure Databricks çalışma alanınızı açın.
Çalışma alanı arabiriminde yeni bir küme oluşturun. Kümeyi en az şu ayarlarla yapılandırın:
Value Çalışma zamanı sürümü 13.3 LTS (Scala 2.12, Spark 3.4.1) Maven Central'danGrup Kimliği ile Maven paketlerini aramak için çalışma alanı arabirimini
com.azure.cosmos.spark
kullanın. Spark 3.4'e özgü paketi kümeye ön ekli bir Yapıt Kimliği ileazure-cosmos-spark_3-4
yükleyin.Son olarak yeni bir not defteri oluşturun.
İpucu
Varsayılan olarak, not defteri son oluşturulan kümeye eklenir.
Not defterinde NoSQL hesap uç noktası, veritabanı adı ve kapsayıcı adı için OLTP yapılandırma ayarlarını yapın.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Veritabanı ve kapsayıcı oluşturma
Veritabanları ve kapsayıcılar gibi hesap kaynaklarını yönetmek için Katalog API'sini kullanın. Ardından OLTP'yi kullanarak kapsayıcı kaynağındaki verileri yönetebilirsiniz.[s].
Spark kullanarak NoSQL kaynaklarının API'sini yönetmek için Katalog API'sini yapılandırın.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
kullanarak
CREATE DATABASE IF NOT EXISTS
adlıcosmicworks
yeni bir veritabanı oluşturun.# Create a database using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
kullanarak
CREATE TABLE IF NOT EXISTS
adlıproducts
yeni bir kapsayıcı oluşturun. Bölüm anahtarı yolunu/category
olarak ayarladığınızdan ve saniye başına en yüksek istek birimi aktarım hızıyla (RU/sn) otomatik ölçeklendirme aktarım hızını etkinleştirdiğinizden1000
emin olun.# Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
, ve
/team
ile/organization
/department
ilgili sırasıyla bölüm anahtarı yolları kümesi olarak hiyerarşik bölüm anahtarı yapılandırması kullanarak adlıemployees
başka bir kapsayıcı oluşturun. Ayrıca, aktarım hızını el ile RU/sn miktarına400
ayarlayın# Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Veritabanınızın ve kapsayıcılarınızın NoSQL hesabı için API'nizde oluşturulduğunu doğrulamak için not defteri hücresini [s] çalıştırın .
Verileri alma
Örnek bir veri kümesi oluşturun ve ardından OLTP kullanarak bu verileri NoSQL kapsayıcısı için API'ye alın.
Örnek bir veri kümesi oluşturun.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Hedef kapsayıcıya örnek veriler eklemek için ve daha önce kaydedilmiş OLTP yapılandırmasını kullanın
spark.createDataFrame
.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Verileri sorgulama
Veriler üzerinde yaygın sorgular gerçekleştirmek için OLTP verilerini bir veri çerçevesine yükleyin. Çeşitli söz dizimleri filtreleyebilir veya verileri sorgulayabilirsiniz.
OLTP verilerini bir veri çerçevesi nesnesine yüklemek için kullanın
spark.read
. Bu öğreticinin önceki bölümlerinde kullanılan yapılandırmanın aynısını kullanın. Ayrıca Spark bağlayıcısının var olan öğeleri örnekleme yoluyla şemayı çıkarmasına izin vermek için true olarak ayarlayınspark.cosmos.read.inferSchema.enabled
.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
kullanarak veri çerçevesine yüklenen verilerin şemasını işleme
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Veri satırlarını sütunun değerinden
quantity
20
küçük olduğu yerlerde işle. Bu sorguyuwhere
gerçekleştirmek için veshow
işlevlerini kullanın.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Sütunun true olduğu
clearance
ilk veri satırını işleme. Bu sorguyufilter
gerçekleştirmek için işlevini kullanın.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Filtre veya kesme olmadan beş veri satırı işleyin.
show
İşlenen satırların görünümünü ve sayısını özelleştirmek için işlevini kullanın.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Bu ham NoSQL sorgu dizesini kullanarak verilerinizi sorgula:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Yaygın işlemler gerçekleştirme
Spark'ta NoSQL verileri için API ile çalışırken kısmi güncelleştirmeler gerçekleştirebilir veya verilerle ham JSON olarak çalışabilirsiniz.
Öğenin kısmi güncelleştirmesini gerçekleştirmek için şu adımları uygulayın:
Mevcut
config
yapılandırma değişkenini kopyalayın ve yeni kopyadaki özellikleri değiştirin. Özellikle; yazma stratejisiniItemPatch
olarak yapılandırın, toplu desteği devre dışı bırakın, sütunları ve eşlenen işlemleri ayarlayın ve son olarak varsayılan işlem türünü olarakSet
ayarlayın.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Bu düzeltme eki işleminin bir parçası olarak hedeflemek istediğiniz öğe bölüm anahtarı ve benzersiz tanımlayıcı için değişkenler oluşturun.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Hedef öğeyi belirtmek ve değiştirilmesi gereken alanları belirtmek için bir dizi düzeltme eki nesnesi oluşturun.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Düzeltme eki nesneleri kümesini kullanarak bir veri çerçevesi oluşturun ve düzeltme eki işlemini gerçekleştirmek için kullanın
write
.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Düzeltme eki işleminin sonuçlarını gözden geçirmek için bir sorgu çalıştırın. Öğe artık başka bir değişiklik olmadan adlandırılmalıdır
Yamba New Surfboard
.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Ham JSON verileriyle çalışmak için şu adımları gerçekleştirin:
Mevcut
config
yapılandırma değişkenini kopyalayın ve yeni kopyadaki özellikleri değiştirin. Özellikle; hedef kapsayıcıyı olarakemployees
değiştirin ve sütunu/alanı ham JSON verilerini kullanacak şekilde yapılandırıncontacts
.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Kapsayıcıya almak için bir çalışan kümesi oluşturun.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Bir veri çerçevesi oluşturun ve çalışan verilerini almak için kullanın
write
.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
kullanarak veri çerçevesindeki verileri işleme
show
. Sütunun çıkıştacontacts
ham JSON olduğunu gözlemleyin.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
İlgili içerik
- Apache Spark
- Azure Cosmos DB Katalog API'si
- Yapılandırma Parametresi Başvurusu
- Örnek "New York City Taxi verileri" not defteri
- Spark 2.4'ten Spark 3'e geçiş.*
- Sürüm Uyumluluğu
- Sürüm notları
- İndirme bağlantıları