Uzyskiwanie dostępu do usługi Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark w usłudze YARN przy użyciu usługi HDInsight

DOTYCZY: Cassandra

W tym artykule opisano sposób uzyskiwania dostępu do usługi Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark w usłudze YARN przy użyciu HDInsight-Spark z usługi spark-shell. Usługa HDInsight to platforma Hortonworks Hadoop PaaS firmy Microsoft na platformie Azure. Używa magazynu obiektów dla systemu plików HDFS i zawiera kilka odmian, w tym Spark. Ten artykuł dotyczy usługi HDInsight-Spark, ale dotyczy wszystkich dystrybucji usługi Hadoop.

Wymagania wstępne

Przed rozpoczęciem zapoznaj się z podstawami nawiązywania połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra.

Potrzebne są następujące wymagania wstępne:

  • Aprowizuj usługę Azure Cosmos DB dla bazy danych Apache Cassandra. Zobacz Tworzenie konta bazy danych.

  • Aprowizuj klaster HDInsight-Spark. Zobacz Tworzenie klastra Apache Spark w usłudze Azure HDInsight przy użyciu szablonu usługi ARM.

  • Interfejs API dla konfiguracji bazy danych Cassandra na platformie Spark2. Łącznik Spark dla bazy danych Cassandra wymaga zainicjowania szczegółów połączenia Cassandra w ramach kontekstu platformy Spark. Po uruchomieniu notesu Jupyter sesja platformy Spark i kontekst są już inicjowane. Nie należy zatrzymywać i ponownie inicjować kontekstu platformy Spark, chyba że jest on kompletny z każdym zestawem konfiguracji w ramach domyślnego uruchamiania notesu Jupyter usługi HDInsight. Jednym z obejść jest bezpośrednie dodanie szczegółów wystąpienia cassandra do konfiguracji usługi Ambari, Spark2. Takie podejście jest jednorazowym działaniem na klaster, który wymaga ponownego uruchomienia usługi Spark2.

    1. Przejdź do pozycji Ambari, usługa Spark2 i wybierz pozycję Konfiguracje.

    2. Przejdź do niestandardowych wartości spark2-defaults i dodaj nową właściwość z następującymi elementami i uruchom ponownie usługę Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Można użyć cqlsh do weryfikacji. Aby uzyskać więcej informacji, zobacz Connecting to Azure Cosmos DB for Apache Cassandra from Spark (Nawiązywanie połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra z platformy Spark).

Uzyskiwanie dostępu do usługi Azure Cosmos DB dla bazy danych Apache Cassandra z poziomu powłoki Spark

Powłoka Spark jest używana do testowania i eksploracji.

  • Uruchom program spark-shell z wymaganymi zależnościami maven zgodnymi z wersją platformy Spark klastra.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Wykonywanie niektórych operacji DDL i DML

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Uruchamianie operacji CRUD

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Uzyskiwanie dostępu do usługi Azure Cosmos DB dla bazy danych Apache Cassandra z poziomu notesów Jupyter

HDInsight-Spark są dostarczane z usługami notesów Zeppelin i Jupyter. Są to środowiska notesów internetowych, które obsługują język Scala i Python. Notesy doskonale nadają się do interaktywnej analizy eksploracyjnej i współpracy, ale nie są przeznaczone dla procesów operacyjnych lub produkcyjnych.

Następujące notesy Jupyter można przekazać do klastra HDInsight Spark i udostępnić gotowe przykłady do pracy z usługą Azure Cosmos DB dla usługi Apache Cassandra. Zapoznaj się z pierwszym notesem 1.0-ReadMe.ipynb , aby zapoznać się z konfiguracją usługi Spark na potrzeby nawiązywania połączenia z usługą Azure Cosmos DB dla bazy danych Apache Cassandra.

Pobierz notesy w obszarze azure-cosmos-db-cassandra-api-spark-notebooks-jupyter na maszynę.

Jak przekazać

Po uruchomieniu programu Jupyter przejdź do języka Scala. Utwórz katalog, a następnie przekaż notesy do katalogu. Przycisk Przekaż znajduje się w prawym górnym rogu.

Jak uruchomić

Przejrzyj notesy i każdą komórkę notesu sekwencyjnie. Wybierz przycisk Uruchom w górnej części każdego notesu, aby uruchomić wszystkie komórki, lub klawisz Shift+Enter dla każdej komórki.

Dostęp za pomocą usługi Azure Cosmos DB dla bazy danych Apache Cassandra z poziomu programu Spark Scala

W przypadku zautomatyzowanych procesów w środowisku produkcyjnym programy platformy Spark są przesyłane do klastra przy użyciu funkcji spark-submit.

Następne kroki