Partilhar via


Aceder ao Azure Cosmos DB para Apache Cassandra a partir do Spark no YARN com o HDInsight

APLICA-SE A: Cassandra

Este artigo aborda como aceder ao Azure Cosmos DB para Apache Cassandra a partir do Spark no YARN com HDInsight-Spark a partir de spark-shell. O HDInsight é o Hortonworks Hadoop PaaS da Microsoft no Azure. Utiliza armazenamento de objetos para HDFS e inclui vários tipos, incluindo o Spark. Embora este artigo se refira ao HDInsight-Spark, aplica-se a todas as distribuições do Hadoop.

Pré-requisitos

Antes de começar, reveja as noções básicas da ligação ao Azure Cosmos DB para Apache Cassandra.

Precisa dos seguintes pré-requisitos:

  • Aprovisionar o Azure Cosmos DB para Apache Cassandra. Veja Criar uma conta de base de dados.

  • Aprovisionar um cluster HDInsight-Spark. Veja Criar um cluster do Apache Spark no Azure HDInsight com o modelo arm.

  • API para configuração do Cassandra no Spark2. O conector do Spark para Cassandra requer que os detalhes de ligação do Cassandra sejam inicializados como parte do contexto do Spark. Quando inicia um bloco de notas do Jupyter, a sessão e o contexto do Spark já são inicializados. Não pare e reinicialize o contexto do Spark, a menos que esteja completo com todas as configurações definidas como parte do arranque do bloco de notas do Jupyter predefinido do HDInsight. Uma solução é adicionar diretamente os detalhes da instância do Cassandra ao Ambari, configuração do serviço Spark2. Esta abordagem é uma atividade única por cluster que requer um reinício do serviço Spark2.

    1. Aceda ao serviço Ambari, Spark2 e selecione configurações.

    2. Aceda a predefinições personalizadas do Spark2, adicione uma nova propriedade com o seguinte e reinicie o serviço Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Pode utilizar cqlsh para validação. Para obter mais informações, veja Connecting to Azure Cosmos DB for Apache Cassandra from Spark (Ligar ao Azure Cosmos DB para Apache Cassandra a partir do Spark).

Aceder ao Azure Cosmos DB para Apache Cassandra a partir da shell do Spark

A shell do Spark é utilizada para testes e exploração.

  • Inicie spark-shell com as dependências do maven necessárias compatíveis com a versão do Spark do cluster.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Executar algumas operações DDL e DML

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Executar operações CRUD

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Aceder ao Azure Cosmos DB para Apache Cassandra a partir de blocos de notas do Jupyter

HDInsight-Spark inclui os serviços zeppelin e jupyter notebook. Ambos são ambientes de blocos de notas baseados na Web que suportam Scala e Python. Os blocos de notas são ótimos para análises exploratórias interativas e colaboração, mas não para processos operacionais ou de produção.

Os seguintes blocos de notas do Jupyter podem ser carregados para o cluster do HdInsight Spark e fornecer exemplos prontos para trabalhar com o Azure Cosmos DB para Apache Cassandra. Certifique-se de que revê o primeiro bloco 1.0-ReadMe.ipynb de notas para rever a configuração do serviço Spark para ligar ao Azure Cosmos DB para Apache Cassandra.

Transfira os blocos de notas em azure-cosmos-db-cassandra-api-spark-notebooks-jupyter para o seu computador.

Como carregar

Quando iniciar o Jupyter, navegue para Scala. Crie um diretório e, em seguida, carregue os blocos de notas para o diretório. O botão Carregar encontra-se no canto superior direito.

Como executar

Percorra os blocos de notas e cada célula do bloco de notas sequencialmente. Selecione o botão Executar na parte superior de cada bloco de notas para executar todas as células ou Shift+Enter para cada célula.

Aceder ao Azure Cosmos DB para Apache Cassandra a partir do programa Spark Scala

Para processos automatizados em produção, os programas Spark são submetidos para o cluster através do spark-submit.

Passos seguintes