Få åtkomst till Azure Cosmos DB för Apache Cassandra från Spark på YARN med HDInsight

GÄLLER FÖR: Cassandra

Den här artikeln beskriver hur du kommer åt Azure Cosmos DB för Apache Cassandra från Spark på YARN med HDInsight-Spark från spark-shell. HDInsight är Microsofts Hortonworks Hadoop PaaS på Azure. Den använder objektlagring för HDFS och finns i flera varianter, inklusive Spark. Även om den här artikeln refererar till HDInsight-Spark gäller den för alla Hadoop-distributioner.

Förutsättningar

Innan du börjar bör du läsa grunderna för att ansluta till Azure Cosmos DB för Apache Cassandra.

Du behöver följande krav:

  • Etablera Azure Cosmos DB för Apache Cassandra. Se Skapa ett databaskonto.

  • Etablera ett HDInsight-Spark kluster. Se Skapa Apache Spark-kluster i Azure HDInsight med arm-mall.

  • API för Cassandra-konfiguration i Spark2. Spark-anslutningsappen för Cassandra kräver att Cassandra-anslutningsinformationen initieras som en del av Spark-kontexten. När du startar en Jupyter-anteckningsbok har spark-sessionen och kontexten redan initierats. Stoppa och initiera inte Spark-kontexten igen om den inte är komplett med alla konfigurationsuppsättningar som en del av HDInsight-standardstarten för Jupyter Notebook. En lösning är att lägga till information om Cassandra-instansen i Ambari, Spark2-tjänstkonfiguration, direkt. Den här metoden är en engångsaktivitet per kluster som kräver en omstart av Spark2-tjänsten.

    1. Gå till Ambari, Spark2-tjänsten och välj konfigurationer.

    2. Gå till anpassade spark2-standardvärden och lägg till en ny egenskap med följande och starta om Spark2-tjänsten:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Du kan använda cqlsh för validering. Mer information finns i Ansluta till Azure Cosmos DB för Apache Cassandra från Spark.

Få åtkomst till Azure Cosmos DB för Apache Cassandra från Spark Shell

Spark Shell används för testning och utforskning.

  • Starta spark-shell med nödvändiga maven-beroenden som är kompatibla med klustrets Spark-version.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Köra vissa DDL- och DML-åtgärder

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Köra CRUD-åtgärder

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Få åtkomst till Azure Cosmos DB för Apache Cassandra från Jupyter Notebooks

HDInsight-Spark levereras med Zeppelin- och Jupyter Notebook-tjänster. De är båda webbaserade notebook-miljöer som stöder Scala och Python. Notebook-filer är bra för interaktiv undersökande analys och samarbete, men inte avsedda för drift- eller produktionsprocesser.

Följande Jupyter-notebook-filer kan laddas upp till ditt HDInsight Spark-kluster och tillhandahålla färdiga exempel för att arbeta med Azure Cosmos DB för Apache Cassandra. Se till att granska den första notebook-filen 1.0-ReadMe.ipynb för att granska Spark-tjänstkonfigurationen för anslutning till Azure Cosmos DB för Apache Cassandra.

Ladda ned notebook-filerna under azure-cosmos-db-cassandra-api-spark-notebooks-jupyter till datorn.

Ladda upp

När du startar Jupyter navigerar du till Scala. Skapa en katalog och ladda sedan upp anteckningsböckerna till katalogen. Knappen Ladda upp finns längst upp till höger.

Så här kör du

Gå igenom notebook-filerna och varje notebook-cell sekventiellt. Välj knappen Kör överst i varje anteckningsbok för att köra alla celler eller Skift+retur för varje cell.

Åtkomst med Azure Cosmos DB för Apache Cassandra från ditt Spark Scala-program

För automatiserade processer i produktion skickas Spark-program till klustret med hjälp av spark-submit.

Nästa steg