HDInsight を使って YARN で Spark から Azure Cosmos DB for Apache Cassandra にアクセスする

適用対象: Cassandra

この記事では、spark-shell の HDInsight-Spark を使用して、YARN で Spark から Azure Cosmos DB for Apache Cassandra にアクセスする方法について説明します。 HDInsight は、Azure 上の Microsoft の Hortonworks Hadoop PaaS です。 HDFS 用のオブジェクト ストレージが使用され、Spark など、いくつかの種類があります。 この記事では HDInsight-Spark を参照していますが、これはすべての Hadoop ディストリビューションに適用されます。

前提条件

開始する前に、Azure Cosmos DB for Apache Cassandra への接続の基本について確認してください。

次の前提条件を満たす必要があります。

  • Azure Cosmos DB for Apache Cassandra をプロビジョニングする 「データベース アカウントの作成」を参照してください。

  • HDInsight-Spark クラスターをプロビジョニングする。 「ARM テンプレートを使用して Azure HDInsight 内に Apache Spark クラスターを作成する」を参照してください。

  • Spark2 での Cassandra 用 API の構成。 Cassandra の Spark コネクタでは、Spark コンテキストの一部として Cassandra 接続の詳細を初期化する必要があります。 Jupyter Notebook を起動すると、Spark セッションとコンテキストは既に初期化されています。 HDInsight の既定の Jupyter Notebook の起動の一部としてすべての構成セットが完了した場合を除いて、Spark コンテキストは停止および再初期化しないでください。 回避策の 1 つは、Ambari、Spark2 サービス構成に Cassandra インスタンスの詳細を直接追加することです。 この方法は、クラスターあたり 1 回限りのアクティビティで、Spark2 サービスの再起動が必要です。

    1. Ambari、Spark2 サービスに移動し、configs を選択します。

    2. カスタム spark2-defaults に移動し、次を含む新しいプロパティを追加し、Spark2 サービスを再起動します。

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

検証のために cqlsh を使用できます。 詳細については、Spark からの Azure Cosmos DB for Apache Cassandra への接続に関するページをご覧ください。

Spark シェルで Azure Cosmos DB for Apache Cassandra にアクセスする

Spark シェルは、テストと探索のために使用されます。

  • クラスターの Spark バージョンと互換性のある、必須の Maven の依存関係を持つ spark-shell を起動します。

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • 一部の DDL および DML 操作を実行します

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • CRUD 操作を実行します

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Jupyter ノートブックから Azure Cosmos DB for Apache Cassandra にアクセスする

HDInsight-Spark は、Zeppelin および Jupyter Notebook サービスと共に提供されます。 これらはどちらも Scala と Python をサポートする Web ベースのノートブック環境です。 ノートブックは対話型の探索的分析およびコラボレーションに適していますが、運用または生産プロセスには適していません。

次の Jupyter ノートブックは HDInsight Spark クラスターにアップロードできます。これには、Azure Cosmos DB for Apache Cassandra を操作するためのすぐに使えるサンプルが用意されています。 最初のノートブック 1.0-ReadMe.ipynb で、Azure Cosmos DB for Apache Cassandra に接続するための Spark サービス構成を必ず確認してください。

azure-cosmos-db-cassandra-api-spark-notebooks-jupyter にあるノートブックを、お使いのマシンにダウンロードします。

アップロード方法

Jupyter を起動するときに、Scala に移動します。 ディレクトリを作成し、次にノートブックをディレクトリにアップロードします。 [アップロード] ボタンは上部右側にあります。

実行方法

ノートブックと、各ノートブック セルを順番に実行します。 各ノートブックの上部にある [実行] ボタンを選択してすべてのセルを実行するか、セルごとに Shift+Enter キーを押します。

Spark Scala プログラムから Azure Cosmos DB for Apache Cassandra でアクセスする

運用環境での自動化されたプロセスでは、Spark プログラムが spark-submit を使用してクラスターに送信されます。

次のステップ