Acceso a Azure Cosmos DB for Apache Cassandra desde Spark en YARN con HDInsight

SE APLICA A: Cassandra

En este artículo se explica cómo acceder a Azure Cosmos DB for Apache Cassandra desde Spark en YARN con HDInsight-Spark desde spark-shell. HDInsight es un PaaS de Hortonworks Hadoop de Microsoft en Azure. Usa el almacenamiento de objetos para HDFS y se suministra con varios tipos, incluido Spark. Aunque en este artículo se hace referencia a HDInsight-Spark, se aplica a todas las distribuciones de Hadoop.

Requisitos previos

Antes de empezar, revise los conceptos básicos de la conexión a Azure Cosmos DB for Apache Cassandra.

Los siguientes requisitos previos son necesarios:

  • Aprovisionamiento de Azure Cosmos DB for Apache Cassandra. Consulte Creación de una cuenta de base de datos.

  • Aprovisionamiento de un clúster de HDInsight-Spark. Consulte Creación de un clúster de Apache Spark en Azure HDInsight mediante una plantilla de ARM.

  • Configuración de la API para Cassandra en Spark2. El conector de Spark para Cassandra requiere que los detalles de la conexión de Cassandra se inicialicen como parte del contexto de Spark. Al iniciar una instancia de Jupyter Notebook, la sesión y el contexto ya se inicializan. Continúe y reinicialice el contexto de Spark, a menos que se complete con cada configuración establecida como parte del inicio predeterminado de una instancia de Jupyter Notebook por parte de HDInsight. Una solución alternativa es agregar los detalles de la instancia de Cassandra directamente a la configuración del servicio Ambari, Spark2. Este enfoque es una actividad que se realiza una sola vez por clúster y que requiere que se reinicie el servicio Spark2.

    1. Vaya al servicio Ambari, Spark2, y seleccione las configuraciones.

    2. Luego, vaya a spark2-defaults personalizado y agregue una nueva propiedad con lo siguiente, y reinicie el servicio Spark2:

    spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
    spark.cassandra.connection.port=10350<br>
    spark.cassandra.connection.ssl.enabled=true<br>
    spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
    spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
    

Puede usar cqlsh para la validación. Para más información, consulte Conexión a Azure Cosmos DB for Apache Cassandra desde Spark.

Acceso a Azure Cosmos DB for Apache Cassandra desde el shell de Spark

El shell de Spark se usa para la prueba y exploración.

  • Inicie spark-shell con las dependencias de Maven necesarias, compatibles con la versión de Spark de su clúster.

    spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
    
  • Ejecute algunas operaciones DDL y DML

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
    import org.apache.spark.sql.cassandra._
    
    //Spark connector
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    //CosmosDB library for multiple retry
    import com.microsoft.azure.cosmosdb.cassandra
    
    // Specify connection factory for Cassandra
    spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
    
    // Parallelism and throughput configs
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
    
  • Ejecute operaciones CRUD

    //1) Create table if it does not exist
    val cdbConnector = CassandraConnector(sc)
    cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;"))
    
    //2) Delete data from potential prior runs
    cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))
    
    //3) Generate a few rows
    val booksDF = Seq(
    ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
    ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
    ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
    ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
    ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
    ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
    
    //4) Persist
    booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()
    
    //5) Read the data in the table
    spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
    

Acceso a Azure Cosmos DB for Apache Cassandra desde cuadernos de Jupyter

HDInsight-Spark incluye los servicios Zeppelin y Jupyter Notebook. Ambos son entornos de cuadernos basados en web que admiten Scala y Python. Los cuadernos son excelentes para la colaboración y el análisis exploratorio interactivo, pero no están pensados para procesos operativos ni de producción.

Los siguientes cuadernos de Jupyter Notebook se pueden cargar en un clúster de HDInsight Spark y proporcionan ejemplos para trabajar con Azure Cosmos DB for Apache Cassandra. Asegúrese de consultar el primer cuaderno 1.0-ReadMe.ipynb para revisar la configuración del servicio de Spark para conectarse a Azure Cosmos DB for Apache Cassandra.

Descargue los cuadernos de azure-cosmos-db-cassandra-api-spark-notebooks-jupyter en su equipo.

Procedimiento para la carga

Al iniciar Jupyter, vaya a Scala. Cree un directorio y, después, cargue los cuadernos en el directorio. El botón Cargar se encuentra en la parte superior derecha.

Procedimiento para la ejecución

Recorra los cuadernos, y cada celda de ellos, de manera secuencial. Seleccione el botón Ejecutar de la parte superior de cada cuaderno para ejecutar todas las celdas, o bien Mayús+Entrar para ejecutar cada una de ellas.

Acceso con Azure Cosmos DB for Apache Cassandra desde el programa Spark Scala

En los procesos automatizados en producción, los programas de Spark se envían al clúster a través de spark-submit.

Pasos siguientes