Přístup ke službě Azure Cosmos DB for Apache Cassandra ze Sparku na YARN pomocí HDInsight

PLATÍ PRO: Cassandra

Tento článek popisuje, jak získat přístup ke službě Azure Cosmos DB for Apache Cassandra ze Sparku na YARN s HDInsight-Spark z spark-shell. HDInsight je Hortonworks Hadoop PaaS od Microsoftu v Azure. Používá úložiště objektů pro HDFS a má několik variant, včetně Sparku. I když tento článek odkazuje na HDInsight-Spark, platí pro všechny distribuce Hadoop.

Požadavky

Než začnete, projděte si základy připojení ke službě Azure Cosmos DB for Apache Cassandra.

Potřebujete následující požadavky:

  • Zřízení služby Azure Cosmos DB pro Apache Cassandra Viz Vytvoření databázového účtu.

  • Zřiďte cluster HDInsight-Spark. Viz Vytvoření clusteru Apache Spark ve službě Azure HDInsight pomocí šablony ARM.

  • Konfigurace rozhraní API pro Cassandra ve Spark2. Konektor Spark pro Cassandra vyžaduje inicializaci podrobností připojení Cassandra jako součást kontextu Sparku. Když spustíte poznámkový blok Jupyter, relace Sparku a kontext jsou již inicializovány. Nezastavujte a znovu inicializujte kontext Sparku, pokud není dokončený s každou sadou konfigurace v rámci výchozího spuštění poznámkového bloku Jupyter ve službě HDInsight. Jedním z alternativních řešení je přidat podrobnosti o instanci Cassandra přímo do konfigurace služby Ambari, Spark2. Tento přístup je jednorázová aktivita na cluster, která vyžaduje restartování služby Spark2.

    1. Přejděte na Ambari, službu Spark2 a vyberte konfigurace.

    2. Přejděte na vlastní výchozí hodnoty spark2 a přidejte novou vlastnost s následujícími vlastnostmi a restartujte službu Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Můžete použít cqlsh k ověření. Další informace najdete v tématu Připojení ke službě Azure Cosmos DB for Apache Cassandra ze Sparku.

Přístup ke službě Azure Cosmos DB for Apache Cassandra z prostředí Spark

Prostředí Spark se používá k testování a zkoumání.

  • Spusťte spark-shell s požadovanými závislostmi mavenu kompatibilními s verzí Sparku vašeho clusteru.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Provádění některých operací DDL a DML

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Spuštění operací CRUD

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Přístup ke službě Azure Cosmos DB for Apache Cassandra z poznámkových bloků Jupyter

HDInsight-Spark se dodává se službami poznámkových bloků Zeppelin a Jupyter. Obě jsou to webová prostředí poznámkových bloků, která podporují Jazyk Scala a Python. Poznámkové bloky jsou skvělé pro interaktivní průzkumné analýzy a spolupráci, ale nejsou určené pro provozní nebo produkční procesy.

Následující poznámkové bloky Jupyter můžete nahrát do clusteru HDInsight Spark a poskytnout připravené ukázky pro práci se službou Azure Cosmos DB for Apache Cassandra. Nezapomeňte si projít první poznámkový blok 1.0-ReadMe.ipynb a projít si konfiguraci služby Spark pro připojení ke službě Azure Cosmos DB pro Apache Cassandra.

Stáhněte si poznámkové bloky v části azure-cosmos-db-cassandra-api-spark-notebooks-jupyter do svého počítače.

Jak nahrát

Když spustíte Jupyter, přejděte na Scala. Vytvořte adresář a pak do adresáře nahrajte poznámkové bloky. Tlačítko Nahrát je v pravém horním rohu.

Postup spuštění

Projděte si poznámkové bloky a jednotlivé buňky poznámkového bloku postupně. Vyberte tlačítko Spustit v horní části každého poznámkového bloku, aby se spustily všechny buňky, nebo stiskněte Klávesu+Enter pro každou buňku.

Přístup pomocí služby Azure Cosmos DB for Apache Cassandra z programu Spark Scala

V případě automatizovaných procesů v produkčním prostředí se programy Sparku odesílají do clusteru pomocí spark-submit.

Další kroky