Aceder ao Azure Cosmos DB para Apache Cassandra a partir do Spark no YARN com o HDInsight
APLICA-SE A: Cassandra
Este artigo aborda como aceder ao Azure Cosmos DB para Apache Cassandra a partir do Spark no YARN com HDInsight-Spark a partir de spark-shell
. O HDInsight é o Hortonworks Hadoop PaaS da Microsoft no Azure. Utiliza armazenamento de objetos para HDFS e inclui vários tipos, incluindo o Spark. Embora este artigo se refira ao HDInsight-Spark, aplica-se a todas as distribuições do Hadoop.
Pré-requisitos
Antes de começar, reveja as noções básicas da ligação ao Azure Cosmos DB para Apache Cassandra.
Precisa dos seguintes pré-requisitos:
Aprovisionar o Azure Cosmos DB para Apache Cassandra. Veja Criar uma conta de base de dados.
Aprovisionar um cluster HDInsight-Spark. Veja Criar um cluster do Apache Spark no Azure HDInsight com o modelo arm.
API para configuração do Cassandra no Spark2. O conector do Spark para Cassandra requer que os detalhes de ligação do Cassandra sejam inicializados como parte do contexto do Spark. Quando inicia um bloco de notas do Jupyter, a sessão e o contexto do Spark já são inicializados. Não pare e reinicialize o contexto do Spark, a menos que esteja completo com todas as configurações definidas como parte do arranque do bloco de notas do Jupyter predefinido do HDInsight. Uma solução é adicionar diretamente os detalhes da instância do Cassandra ao Ambari, configuração do serviço Spark2. Esta abordagem é uma atividade única por cluster que requer um reinício do serviço Spark2.
Aceda ao serviço Ambari, Spark2 e selecione configurações.
Aceda a predefinições personalizadas do Spark2, adicione uma nova propriedade com o seguinte e reinicie o serviço Spark2:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Pode utilizar cqlsh
para validação. Para obter mais informações, veja Connecting to Azure Cosmos DB for Apache Cassandra from Spark (Ligar ao Azure Cosmos DB para Apache Cassandra a partir do Spark).
Aceder ao Azure Cosmos DB para Apache Cassandra a partir da shell do Spark
A shell do Spark é utilizada para testes e exploração.
Inicie
spark-shell
com as dependências do maven necessárias compatíveis com a versão do Spark do cluster.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Executar algumas operações DDL e DML
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Executar operações CRUD
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Aceder ao Azure Cosmos DB para Apache Cassandra a partir de blocos de notas do Jupyter
HDInsight-Spark inclui os serviços zeppelin e jupyter notebook. Ambos são ambientes de blocos de notas baseados na Web que suportam Scala e Python. Os blocos de notas são ótimos para análises exploratórias interativas e colaboração, mas não para processos operacionais ou de produção.
Os seguintes blocos de notas do Jupyter podem ser carregados para o cluster do HdInsight Spark e fornecer exemplos prontos para trabalhar com o Azure Cosmos DB para Apache Cassandra. Certifique-se de que revê o primeiro bloco 1.0-ReadMe.ipynb
de notas para rever a configuração do serviço Spark para ligar ao Azure Cosmos DB para Apache Cassandra.
Transfira os blocos de notas em azure-cosmos-db-cassandra-api-spark-notebooks-jupyter para o seu computador.
Como carregar
Quando iniciar o Jupyter, navegue para Scala. Crie um diretório e, em seguida, carregue os blocos de notas para o diretório. O botão Carregar encontra-se no canto superior direito.
Como executar
Percorra os blocos de notas e cada célula do bloco de notas sequencialmente. Selecione o botão Executar na parte superior de cada bloco de notas para executar todas as células ou Shift+Enter para cada célula.
Aceder ao Azure Cosmos DB para Apache Cassandra a partir do programa Spark Scala
Para processos automatizados em produção, os programas Spark são submetidos para o cluster através do spark-submit.