HDInsight를 통해 YARN에서 Spark의 Azure Cosmos DB for Apache Cassandra에 액세스

적용 대상: Cassandra

이 문서에서는 spark-shell에서 HDInsight-Spark를 통해 YARN에서 Spark의 Azure Cosmos DB for Apache Cassandra에 액세스하는 방법을 알아봅니다. HDInsight는 Azure 기반 Microsoft의 Hortonworks Hadoop PaaS입니다. HDFS용 개체 스토리지를 사용하며 Spark를 비롯한 여러 가지 버전으로 제공됩니다. 이 문서에서는 HDInsight-Spark를 참조하지만 모든 Hadoop 배포에 적용됩니다.

필수 조건

시작하기 전에 Azure Cosmos DB for Apache Cassandra에 연결하기 위한 기본 사항을 검토합니다.

다음 필수 조건이 필요합니다.

  • Azure Cosmos DB for Apache Cassandra를 프로비저닝합니다. 데이터베이스 계정 만들기를 참조하세요.

  • HDInsight-Spark 클러스터를 프로비저닝합니다. ARM 템플릿을 사용하여 Azure HDInsight에서 Apache Spark 클러스터 만들기를 참조하세요.

  • Spark2의 API for Cassandra 구성. Cassandra용 Spark 커넥터를 사용하려면 Cassandra 연결 세부 정보를 Spark 컨텍스트의 일부로 초기화해야 합니다. Jupyter Notebook을 시작하면 Spark 세션과 컨텍스트가 이미 초기화됩니다. HDInsight 기본 Jupyter Notebook 시작의 일부로 설정된 모든 구성이 완료되지 않은 경우 Spark 컨텍스트를 중지하고 다시 초기화하지 마세요. 해결 방법은 Cassandra 인스턴스 세부 정보를 Spark2 서비스 구성인 Ambari에 직접 추가하는 것입니다. 이 방법은 Spark2 서비스를 다시 시작해야 하는 클러스터당 일회용 작업입니다.

    1. Spark2 서비스인 Ambari로 이동하고 구성을 선택합니다.

    2. 사용자 지정 spark2-defaults로 이동하고 다음을 사용하여 새 속성을 추가한 후, Spark2 서비스를 다시 시작합니다.

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

유효성 검사에 cqlsh를 사용할 수 있습니다. 자세한 내용은 Spark에서 Azure Cosmos DB for Apache Cassandra에 연결을 참조하세요.

Spark 셸에서 Azure Cosmos DB for Apache Cassandra에 액세스

Spark 셸은 테스트 및 탐색에 사용됩니다.

  • 클러스터의 Spark 버전과 호환되는 필수 maven 종속성을 사용하여 spark-shell을 시작합니다.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • 일부 DDL 및 DML 작업 실행

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • CRUD 작업 실행

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Jupyter Notebook에서 Azure Cosmos DB for Apache Cassandra에 액세스

HDInsight-Spark는 Zeppelin 및 Jupyter Notebook 서비스와 함께 제공됩니다. 모두 Scala 및 Python을 지원하는 웹 기반 Notebook 환경입니다. Notebooks는 대화형 탐색 분석 및 협업에 적합하지만 운영 또는 프로덕션 프로세스에는 적합하지 않습니다.

다음 Jupyter Notebooks는 HDInsight Spark 클러스터에 업로드할 수 있으며, Azure Cosmos DB for Apache Cassandra를 사용하기 위한 준비 샘플을 제공합니다. 첫 번째 Notebook 1.0-ReadMe.ipynb를 검토하여 Azure Cosmos DB for Apache Cassandra에 연결하기 위한 Spark 서비스 구성을 검토해 보세요.

머신에서 이러한 Notebook을 azure-cosmos-db-cassandra-api-spark-notebooks-jupyter에서 다운로드하세요.

업로드하는 방법

Jupyter를 시작하는 경우 Scala로 이동합니다. 디렉터리를 만든 다음 Notebooks를 디렉터리에 업로드합니다. 업로드 단추는 오른쪽 위에 있습니다.

실행 방법

Notebook 및 각 Notebook 셀을 순차적으로 살펴봅니다. 각 Notebook 상단의 실행 단추를 선택하여 모든 셀을 실행하거나 각 셀에 대해 Shift+Enter를 선택합니다.

Spark Scala 프로그램에서 Azure Cosmos DB for Apache Cassandra로 액세스

프로덕션의 자동화된 프로세스의 경우 Spark 프로그램은 spark-submit을 사용하여 클러스터에 제출됩니다.

다음 단계