共用方式為


透過 HDInsight 從 Spark on YARN 存取 Azure Cosmos DB for Apache Cassandra

適用於: Cassandra

此文章說明如何透過 spark-shell 中的 HDInsight-Spark,從 Spark on YARN 存取 Azure Cosmos DB for Apache Cassandra。 HDInsight 為 Microsoft 在 Azure 上的 Hortonworks Hadoop PaaS。 其會使用 HDFS 的物件儲存體,並隨附數種變體,包括 Spark。 雖然本文指的是 HDInsight-Spark,但其適用於所有 Hadoop 散發套件。

必要條件

在您開始之前,請檢閱連線至 Azure Cosmos DB for Apache Cassandra 的基本概念

您必須符合下列必要條件:

  • 佈建 Azure Cosmos DB for Apache Cassandra。 參閱建立資料庫帳戶

  • 佈建 HDInsight-Spark 叢集。 參閱使用 ARM 範本在 Azure HDInsight 中建立 Apache Spark 叢集

  • 在 Spark2 中的 API for Cassandra 設定。 適用於 Cassandra 的 Spark 連接器需要將 Cassandra 連線詳細資料初始化以作為 Spark 內容的一部分。 當您啟動 Jupyter notebook 時,Spark 工作階段和內容已經初始化。 請不要停止並重新初始化 Spark 內容,除非其已完成所有組態集合為 HDInsight 預設 Jupyter notebook 啟動的一部分。 有個解決方法是將 Cassandra 執行個體的詳細資料直接新增至 Ambari (Spark2) 服務組態。 此方法為個別叢集的一次性活動,在完成後需要重新啟動 Spark2 服務。

    1. 移至 Ambari、Spark2 服務並選取 [設定]。

    2. 移至自訂 spark2-defaults 並以下列方式新增屬性,然後重新啟動 Spark2 服務:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

您可以使用 cqlsh 進行驗證。 如需詳細資訊,請參閱從 Spark 連線至 Azure Cosmos DB for Apache Cassandra

從 Spark 殼層存取 Azure Cosmos DB for Apache Cassandra

Spark Shell 可供測試及探索之用。

  • 使用與叢集的 Spark 版本相容的必要 Maven 相依性啟動 spark-shell

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • 執行某些 DDL 和 DML 作業

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • 執行 CRUD 作業

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

從 Jupyter 筆記本存取 Azure Cosmos DB for Apache Cassandra

HDInsight-Spark 隨附 Zeppelin 和 Jupyter Notebook 服務。 兩者都是以 Web 為基礎的 Notebook 環境,且都支援 Scala 及 Python。 Notebook 非常適用於互動式探勘分析與共同作業,但不適用於作業或生產流程。

下列 Jupyter 筆記本可上傳至您的 HDInsight Spark 叢集,並提供可與 Azure Cosmos DB for Apache Cassandra 搭配使用的現成範例。 請務必檢閱第一個筆記本 1.0-ReadMe.ipynb,以檢閱連線至 Azure Cosmos DB for Apache Cassandra 的 Spark 服務設定。

請將 azure-cosmos-db-cassandra-api-spark-notebooks-jupyter 下方的 Notebook 下載到您的電腦。

如何上傳

當您啟動 Jupyter 時,請瀏覽至 Scala。 建立目錄,然後將 Notebook 上傳至目錄。 上傳按鈕位於右上方。

如何執行

循序執行 Notebook 和每個 Notebook 資料格。 選取每個 Notebook 頂端的 [執行] 按鈕以執行所有資料格,或按下 [shift+enter] 以執行個別資料格。

從 Spark Scala 程式使用 Azure Cosmos DB for Apache Cassandra 進行存取

在生產環境中執行自動化程序時,會使用 spark-submit 將 Spark 程式提交至叢集。

下一步