Azure Cosmos DB Bağlayıcısı için Apache Spark'ı kullanarak büyük veri analizi hızlandırınAccelerate big data analytics by using the Apache Spark to Azure Cosmos DB connector

Spark işlerini, Cosmos DB Spark bağlayıcısını kullanarak Azure Cosmos DB depolanan verilerle çalıştırabilirsiniz.You can run Spark jobs with data stored in Azure Cosmos DB using the Cosmos DB Spark connector. Cosmos, toplu işlem ve akış işleme için ve düşük gecikmeli erişim için bir hizmet katmanı olarak kullanılabilir.Cosmos can be used for batch and stream processing, and as a serving layer for low latency access.

Bağlayıcıyı Azure 'da yönetilen Spark kümeleri sağlayan Azure Databricks veya Azure HDInsightile kullanabilirsiniz.You can use the connector with Azure Databricks or Azure HDInsight, which provide managed Spark clusters on Azure. Aşağıdaki tabloda desteklenen Spark sürümleri gösterilmektedir.The following table shows supported Spark versions.

BileşenComponent SürümVersion
Apache SparkApache Spark 2.4. x, 2.3. x, 2.2. x ve 2.1. x2.4.x, 2.3.x, 2.2.x, and 2.1.x
ScalaScala 2.112.11
Azure Databricks çalışma zamanı sürümüAzure Databricks runtime version > 3.4> 3.4

Uyarı

Bu bağlayıcı Azure Cosmos DB Çekirdek (SQL) API 'sini destekler.This connector supports the core (SQL) API of Azure Cosmos DB. MongoDB API 'SI için Cosmos DB için MongoDB Spark bağlayıcısınıkullanın.For Cosmos DB for MongoDB API, use the MongoDB Spark connector. Cosmos DB Cassandra API için Cassandra Spark bağlayıcısınıkullanın.For Cosmos DB Cassandra API, use the Cassandra Spark connector.

Hızlı BaşlangıçQuickstart

  • Cosmos DB hesabı ayarlamak ve bazı verileri doldurmak için Java SDK 'sını kullanmaya başlama bölümündeki adımları izleyin.Follow the steps at Get started with the Java SDK to set up a Cosmos DB account, and populate some data.
  • Azure Databricks bir çalışma alanı ve küme ayarlamaya Başlarken Azure Databricks adımları izleyin.Follow the steps at Azure Databricks getting started to set up an Azure Databricks workspace and cluster.
  • Artık yeni not defterleri oluşturabilir ve Cosmos DB bağlayıcı kitaplığını içeri aktarabilirsiniz.You can now create new Notebooks, and import the Cosmos DB connector library. Çalışma alanınızı ayarlama hakkında ayrıntılar için Cosmos DB Bağlayıcısı Ile çalışmaya atlayın.Jump to Working with the Cosmos DB connector for details on how to set up your workspace.
  • Aşağıdaki bölümde, bağlayıcıyı kullanarak okuma ve yazma ile ilgili kod parçacıkları vardır.The following section has snippets on how to read and write using the connector.

Cosmos DB 'ten toplu iş okumaBatch reads from Cosmos DB

Aşağıdaki kod parçacığında, PySpark içinde Cosmos DB okumak için Spark veri çerçevesinin nasıl oluşturulacağı gösterilmektedir.The following snippet shows how to create a Spark DataFrame to read from Cosmos DB in PySpark.

# Read Configuration
readConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_pcoll",
    "query_custom": "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'" // Optional
}

# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format(
    "com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()

Ve Scala 'da aynı kod parçacığı:And the same code snippet in Scala:

// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Read Configuration
val readConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_pcoll",
  "query_custom" -> "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'" // Optional
))

// Connect via azure-cosmosdb-spark to create Spark DataFrame
val flights = spark.read.cosmosDB(readConfig)
flights.count()

Cosmos DB için toplu işlem yazmaBatch writes to Cosmos DB

Aşağıdaki kod parçacığında, PySpark içinde Cosmos DB bir veri çerçevesinin nasıl yazılacağı gösterilmektedir.The following snippet shows how to write a data frame to Cosmos DB in PySpark.

# Write configuration
writeConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_fromsea",
    "Upsert": "true"
}

# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(
    **writeConfig).save()

Ve Scala 'da aynı kod parçacığı:And the same code snippet in Scala:

// Write configuration

val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_fromsea",
  "Upsert" : "true"
))

// Write to Cosmos DB from the flights DataFrame
import org.apache.spark.sql.SaveMode
flights.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

Cosmos DB akış okumaStreaming reads from Cosmos DB

Aşağıdaki kod parçacığında Azure Cosmos DB değişiklik akışına bağlanma ve buradan okuma işlemlerinin nasıl yapılacağı gösterilmektedir.The following snippet shows how to connect to and read from Azure Cosmos DB Change Feed.

# Read Configuration
readConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_pcoll",
    "ReadChangeFeed": "true",
    "ChangeFeedQueryName": "Departure-Delays",
    "ChangeFeedStartFromTheBeginning": "false",
    "InferStreamSchema": "true",
    "ChangeFeedCheckpointLocation": "dbfs:/Departure-Delays"
}


# Open a read stream to the Cosmos DB Change Feed via azure-cosmosdb-spark to create Spark DataFrame
changes = (spark
           .readStream
           .format("com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSourceProvider")
           .options(**readConfig)
           .load())

Ve Scala 'da aynı kod parçacığı:And the same code snippet in Scala:

// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Read Configuration
val readConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_pcoll",
  "ReadChangeFeed" -> "true",
  "ChangeFeedQueryName" -> "Departure-Delays",
  "ChangeFeedStartFromTheBeginning" -> "false",
  "InferStreamSchema" -> "true",
  "ChangeFeedCheckpointLocation" -> "dbfs:/Departure-Delays"
))

// Open a read stream to the Cosmos DB Change Feed via azure-cosmosdb-spark to create Spark DataFrame
val df = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(readConfig).load()

Cosmos DB için akış yazmaStreaming writes to Cosmos DB

Aşağıdaki kod parçacığında, PySpark içinde Cosmos DB bir veri çerçevesinin nasıl yazılacağı gösterilmektedir.The following snippet shows how to write a data frame to Cosmos DB in PySpark.

# Write configuration
writeConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_fromsea",
    "Upsert": "true",
    "WritingBatchSize": "500",
    "CheckpointLocation": "/checkpointlocation_write1"
}

# Write to Cosmos DB from the flights DataFrame
changeFeed = (changes
              .writeStream
              .format("com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider")
              .outputMode("append")
              .options(**writeconfig)
              .start())

Ve Scala 'da aynı kod parçacığı:And the same code snippet in Scala:

// Write configuration

val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_fromsea",
  "Upsert" -> "true",
  "WritingBatchSize" -> "500",
  "CheckpointLocation" -> "/checkpointlocation_write1"
))

// Write to Cosmos DB from the flights DataFrame
df
.writeStream
.format(classOf[CosmosDBSinkProvider].getName)
.options(writeConfig)
.start()

Daha fazla kod parçacığı ve uçtan uca örnek, bkz. Jupyıter.More more snippets and end to end samples, see Jupyter.

Bağlayıcıyla çalışmaWorking with the connector

Bağlayıcıyı GitHub 'da kaynaktan oluşturabilir veya aşağıdaki bağlantılardan Maven 'ten Uber jar dosyaları dışındaki 'ı indirebilirsiniz.You can build the connector from source in GitHub, or download the uber jars from Maven in the links below.

SparkSpark ScalaScala En son sürümLatest version
2.4.02.4.0 2.112.11 Azure-cosmosdb-spark_ 2.4.0 _ 2.11 _ 1.4.0azure-cosmosdb-spark_2.4.0_2.11_1.4.0
2.3.02.3.0 2.112.11 Azure-cosmosdb-spark_ 2.3.0 _ 2.11 _ 1.3.3azure-cosmosdb-spark_2.3.0_2.11_1.3.3
2.2.02.2.0 2.112.11 Azure-cosmosdb-spark_ 2.2.0 _ 2.11 _ 1.1.1azure-cosmosdb-spark_2.2.0_2.11_1.1.1
2.1.02.1.0 2.112.11 Azure-cosmosdb-spark_ 2.1.0 _ 2.11 _ 1.2.2azure-cosmosdb-spark_2.1.0_2.11_1.2.2

Databricks not defterlerini kullanmaUsing Databricks notebooks

Azure Databricks kılavuzundaki kılavuzu izleyerek Databricks çalışma alanınızı kullanarak bir kitaplık oluşturun > Azure Cosmos DB Spark bağlayıcısını kullanınCreate a library using your Databricks workspace by following the guidance in the Azure Databricks Guide > Use the Azure Cosmos DB Spark connector

Not

Azure Cosmos DB Spark bağlayıcısını kullanma sayfasının Şu anda güncel olmadığına not edin.Note, the Use the Azure Cosmos DB Spark Connector page is currently not up-to-date. Altı ayrı jar dosyaları dışındaki 'ı altı farklı kitaplıklara indirmek yerine, Maven https://search.maven.org/artifact/com.microsoft.azure/azure-cosmosdb-spark_2.4.0_2.11/1.4.0/jar) 'den Uber jar 'i indirebilir ve bu tek jar/Library 'yi yükleyebilirsiniz.Instead of downloading the six separate jars into six different libraries, you can download the uber jar from maven at https://search.maven.org/artifact/com.microsoft.azure/azure-cosmosdb-spark_2.4.0_2.11/1.4.0/jar) and install this one jar/library.

Spark-CLI kullanmaUsing spark-cli

Spark-CLI (,,, spark-shell spark-submit) kullanarak bağlayıcı ile çalışmak için, bu pyspark --packages parametreyi bağlayıcının Maven koordinatlarıylabirlikte kullanabilirsiniz.To work with the connector using the spark-cli (that is, spark-shell, pyspark, spark-submit), you can use the --packages parameter with the connector's maven coordinates.

spark-shell --master yarn --packages "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.4.0"

Jupyıter not defterlerini kullanmaUsing Jupyter notebooks

HDInsight içinde jupyıter not defterlerini kullanıyorsanız, bağlayıcının Maven koordinatlarını belirtmek için Spark- %%configure Magic hücresini kullanabilirsiniz.If you're using Jupyter notebooks within HDInsight, you can use spark-magic %%configure cell to specify the connector's maven coordinates.

{ "name":"Spark-to-Cosmos_DB_Connector",
  "conf": {
    "spark.jars.packages": "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.4.0",
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
   ...
}

Öğesinin spark.jars.excludes dahil edilmesi, bağlayıcı, Apache Spark ve Livy arasındaki olası çakışmaları kaldırmak için özeldir.Note, the inclusion of the spark.jars.excludes is specific to remove potential conflicts between the connector, Apache Spark, and Livy.

Bağlayıcıyı oluşturmaBuild the connector

Şu anda bu bağlayıcı proje, maven bağımlılık olmadan derlemek için bu şekilde kullanıyor, şunu çalıştırabilirsiniz:Currently, this connector project uses maven so to build without dependencies, you can run:

mvn clean package

Örneklerimizde çalışmaWorking with our samples

Cosmos DB Spark GitHub deposunda , deneyebileceğiniz aşağıdaki örnek Not defterleri ve betikler bulunur.The Cosmos DB Spark GitHub repository has the following sample notebooks and scripts that you can try.

Daha Fazla BilgiMore Information

azure-cosmosdb-spark Wiki 'de şunlar da dahil olmak üzere daha fazla bilgi sunuyoruz:We have more information in the azure-cosmosdb-spark wiki including:

Yapılandırma ve KurulumConfiguration and Setup

Sorun gidermeTroubleshooting

PerformansPerformance

Değişiklik AkışıChange Feed

İzlemeMonitoring

Sonraki adımlarNext steps

Henüz yapmadıysanız, Spark Azure Cosmos DB Bağlayıcısı'ndan indirin azure cosmosdb spark GitHub deposu.If you haven't already, download the Spark to Azure Cosmos DB connector from the azure-cosmosdb-spark GitHub repository. Aşağıdaki ek kaynaklara depodaki keşfedin:Explore the following additional resources in the repo:

İncelemek isteyebilirsiniz Apache Spark SQL ve DataFrames veri kümeleri Kılavuzuve Azure HDInsight üzerinde Apache Spark makalesi.You might also want to review the Apache Spark SQL, DataFrames, and Datasets Guide, and the Apache Spark on Azure HDInsight article.