Apache Spark-Azure Cosmos DB コネクタを使用したビッグ データ分析の高速化Accelerate big data analytics by using the Apache Spark to Azure Cosmos DB connector

Cosmos DB Spark コネクタを使用して、Azure Cosmos DB に格納されているデータで Spark ジョブを実行できます。You can run Spark jobs with data stored in Azure Cosmos DB using the Cosmos DB Spark connector. Cosmos は、バッチおよびストリーム処理に使用可能であり、低待機時間でのアクセスのためのサービス レイヤーとして使用できます。Cosmos can be used for batch and stream processing, and as a serving layer for low latency access.

このコネクタは、Azure 上でマネージド Spark クラスターを提供する Azure Databricks または Azure HDInsight と一緒に使用できます。You can use the connector with Azure Databricks or Azure HDInsight, which provide managed Spark clusters on Azure. サポートする Spark バージョンを次の表に示します。The following table shows supported Spark versions.

コンポーネントComponent VersionVersion
Apache SparkApache Spark 2.4.x、2.3.x、2.2.x、2.1.x2.4.x, 2.3.x, 2.2.x, and 2.1.x
ScalaScala 2.112.11
Azure Databricks ランタイムのバージョンAzure Databricks runtime version 3.4 より新しいバージョン> 3.4

警告

このコネクタは、Azure Cosmos DB のコア (SQL) API をサポートします。This connector supports the core (SQL) API of Azure Cosmos DB. Cosmos DB for MongoDB API の場合は、MongoDB Spark コネクタを使用してください。For Cosmos DB for MongoDB API, use the MongoDB Spark connector. Cosmos DB Cassandra API の場合は、Cassandra Spark コネクタを使用してください。For Cosmos DB Cassandra API, use the Cassandra Spark connector.

クイック スタートQuickstart

  • Java SDK の開始の手順に従って Cosmos DB アカウントをセットアップし、データを入力してください。Follow the steps at Get started with the Java SDK to set up a Cosmos DB account, and populate some data.
  • Azure Databricks の開始の手順に従って Azure Databricks ワークスペースとクラスターをセットアップします。Follow the steps at Azure Databricks getting started to set up an Azure Databricks workspace and cluster.
  • これで、新しい Notebook を作成し、Cosmos DB コネクタ ライブラリをインポートすることができるようになりました。You can now create new Notebooks, and import the Cosmos DB connector library. ワークスペースのセットアップ方法についての詳細は、Cosmos DB コネクタの操作に関する記事をご覧ください。Jump to Working with the Cosmos DB connector for details on how to set up your workspace.
  • 次のセクションには、コネクタを使用した読み取りと書き込みの方法のスニペットがあります。The following section has snippets on how to read and write using the connector.

Cosmos DB からのバッチ読み取りBatch reads from Cosmos DB

次のスニペットでは、PySpark で Cosmos DB からの読み取りのための Spark DataFrame を作成する方法を示します。The following snippet shows how to create a Spark DataFrame to read from Cosmos DB in PySpark.

# Read Configuration
readConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_pcoll",
    "query_custom": "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'" // Optional
}

# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format(
    "com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()

さらに、Scala での同様のコード スニペットを以下に示します。And the same code snippet in Scala:

// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Read Configuration
val readConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_pcoll",
  "query_custom" -> "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'" // Optional
))

// Connect via azure-cosmosdb-spark to create Spark DataFrame
val flights = spark.read.cosmosDB(readConfig)
flights.count()

Cosmos DB へのバッチ書き込みBatch writes to Cosmos DB

次のスニペットでは、PySpark で Cosmos DB にデータ フレームを書き込む方法を示します。The following snippet shows how to write a data frame to Cosmos DB in PySpark.

# Write configuration
writeConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_fromsea",
    "Upsert": "true"
}

# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(
    **writeConfig).save()

さらに、Scala での同様のコード スニペットを以下に示します。And the same code snippet in Scala:

// Write configuration

val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_fromsea",
  "Upsert" : "true"
))

// Write to Cosmos DB from the flights DataFrame
import org.apache.spark.sql.SaveMode
flights.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

Cosmos DB からのストリーミング読み取りStreaming reads from Cosmos DB

次のスニペットは、Azure Cosmos DB 変更フィードに接続し、そこから読み取る方法を示します。The following snippet shows how to connect to and read from Azure Cosmos DB Change Feed.

# Read Configuration
readConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_pcoll",
    "ReadChangeFeed": "true",
    "ChangeFeedQueryName": "Departure-Delays",
    "ChangeFeedStartFromTheBeginning": "false",
    "InferStreamSchema": "true",
    "ChangeFeedCheckpointLocation": "dbfs:/Departure-Delays"
}


# Open a read stream to the Cosmos DB Change Feed via azure-cosmosdb-spark to create Spark DataFrame
changes = (spark
           .readStream
           .format("com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSourceProvider")
           .options(**readConfig)
           .load())

さらに、Scala での同様のコード スニペットを以下に示します。And the same code snippet in Scala:

// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Read Configuration
val readConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_pcoll",
  "ReadChangeFeed" -> "true",
  "ChangeFeedQueryName" -> "Departure-Delays",
  "ChangeFeedStartFromTheBeginning" -> "false",
  "InferStreamSchema" -> "true",
  "ChangeFeedCheckpointLocation" -> "dbfs:/Departure-Delays"
))

// Open a read stream to the Cosmos DB Change Feed via azure-cosmosdb-spark to create Spark DataFrame
val df = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(readConfig).load()

Cosmos DB へのストリーミング書き込みStreaming writes to Cosmos DB

次のスニペットでは、PySpark で Cosmos DB にデータ フレームを書き込む方法を示します。The following snippet shows how to write a data frame to Cosmos DB in PySpark.

# Write configuration
writeConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_fromsea",
    "Upsert": "true",
    "WritingBatchSize": "500",
    "CheckpointLocation": "/checkpointlocation_write1"
}

# Write to Cosmos DB from the flights DataFrame
changeFeed = (changes
              .writeStream
              .format("com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider")
              .outputMode("append")
              .options(**writeconfig)
              .start())

さらに、Scala での同様のコード スニペットを以下に示します。And the same code snippet in Scala:

// Write configuration

val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_fromsea",
  "Upsert" -> "true",
  "WritingBatchSize" -> "500",
  "CheckpointLocation" -> "/checkpointlocation_write1"
))

// Write to Cosmos DB from the flights DataFrame
df
.writeStream
.format(classOf[CosmosDBSinkProvider].getName)
.options(writeConfig)
.start()

その他のスニペットやエンド ツー エンドのサンプルは、Jupyter を参照してください。More more snippets and end to end samples, see Jupyter.

コネクタの操作Working with the connector

以下のリンクで、GitHub でソースからコネクタを作成したり、Maven から uber jar をダウンロードしたりすることができます。You can build the connector from source in GitHub, or download the uber jars from Maven in the links below.

SparkSpark ScalaScala 最新バージョンLatest version
2.4.02.4.0 2.112.11 azure-cosmosdb-spark_2.4.0_2.11_1.4.0azure-cosmosdb-spark_2.4.0_2.11_1.4.0
2.3.02.3.0 2.112.11 azure-cosmosdb-spark_2.3.0_2.11_1.3.3azure-cosmosdb-spark_2.3.0_2.11_1.3.3
2.2.02.2.0 2.112.11 azure-cosmosdb-spark_2.2.0_2.11_1.1.1azure-cosmosdb-spark_2.2.0_2.11_1.1.1
2.1.02.1.0 2.112.11 azure-cosmosdb-spark_2.1.0_2.11_1.2.2azure-cosmosdb-spark_2.1.0_2.11_1.2.2

Databricks ノートブックの使用Using Databricks notebooks

Azure Databricks ガイドのガイダンスで Azure Cosmos DB Spark コネクタの使用に関する記事に従って、Databricks ワークスペースでライブラリを作成しますCreate a library using your Databricks workspace by following the guidance in the Azure Databricks Guide > Use the Azure Cosmos DB Spark connector

注意

Azure Cosmos DB Spark コネクタの使用に関するページは現在最新ではありませんのでご注意ください。Note, the Use the Azure Cosmos DB Spark Connector page is currently not up-to-date. 6 つの別々の jar を 6 つの異なるライブラリにダウンロードするのではなく、 https://search.maven.org/artifact/com.microsoft.azure/azure-cosmosdb-spark_2.4.0_2.11/1.4.0/jar) の Maven から uber jar をダウンロードして、ライブラリごとに 1 つの jar をインストールすることができます。Instead of downloading the six separate jars into six different libraries, you can download the uber jar from maven at https://search.maven.org/artifact/com.microsoft.azure/azure-cosmosdb-spark_2.4.0_2.11/1.4.0/jar) and install this one jar/library.

spark-cli の使用Using spark-cli

spark-cli (つまり spark-shellpysparkspark-submit) を使用してコネクタを操作するために、--packages パラメーターをコネクタの Maven 座標と一緒に使用することができます。To work with the connector using the spark-cli (that is, spark-shell, pyspark, spark-submit), you can use the --packages parameter with the connector's maven coordinates.

spark-shell --master yarn --packages "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.4.0"

Jupyter ノートブックの使用Using Jupyter notebooks

HDInsight 内で Jupyter ノートブックを使用している場合は、spark-magic %%configure セルを使用してコネクタの Maven 座標を指定できます。If you're using Jupyter notebooks within HDInsight, you can use spark-magic %%configure cell to specify the connector's maven coordinates.

{ "name":"Spark-to-Cosmos_DB_Connector",
  "conf": {
    "spark.jars.packages": "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.4.0",
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
   ...
}

spark.jars.excludes を含めるのは、コネクタ、Apache Spark、Livy の間の潜在的な競合を削除するためです。Note, the inclusion of the spark.jars.excludes is specific to remove potential conflicts between the connector, Apache Spark, and Livy.

コネクタを作成しますBuild the connector

現在、このコネクタ プロジェクトでは maven を使用しているため、依存関係なしのビルドを行うには、次のコマンドを実行します。Currently, this connector project uses maven so to build without dependencies, you can run:

mvn clean package

サンプルの実行Working with our samples

Cosmos DB Spark GitHub リポジトリでは、以下のサンプル ノートブックとスクリプトをお試しいただけます。The Cosmos DB Spark GitHub repository has the following sample notebooks and scripts that you can try.

詳細情報More Information

次の点に関する詳細情報は azure-cosmosdb-spark Wiki にあります。We have more information in the azure-cosmosdb-spark wiki including:

構成とセットアップConfiguration and Setup

トラブルシューティングTroubleshooting

パフォーマンスPerformance

変更フィードChange Feed

監視Monitoring

次の手順Next steps

まだ行っていない場合は、azure-cosmosdb-spark GitHub リポジトリから Spark-Azure Cosmos DB コネクタをダウンロードします。If you haven't already, download the Spark to Azure Cosmos DB connector from the azure-cosmosdb-spark GitHub repository. リポジトリ内で次の追加リソースを探します。Explore the following additional resources in the repo:

また、Apache Spark SQL、DataFrames、データセット ガイドAzure HDInsight 上の Apache Spark に関する記事を確認することもできます。You might also want to review the Apache Spark SQL, DataFrames, and Datasets Guide, and the Apache Spark on Azure HDInsight article.