Conexión con Cassandra API de Azure Cosmos DB desde Spark

SE APLICA A: Cassandra API

Este artículo es uno de los que se encuentra entre la serie de artículos sobre la integración de Cassandra API de Azure Cosmos DB desde Spark. En los artículos se explica la conectividad, las operaciones de lenguaje de definición de datos (DDL), las operaciones básicas de lenguaje de manipulación de datos (DML) y la integración avanzada de Cassandra API de Azure Cosmos DB desde Spark.

Requisitos previos

Dependencias de conectividad

  • Conector de Spark para Cassandra: el conector de Spark se usa para establecer conexión con Cassandra API de Azure Cosmos DB. Identifique y use la versión del conector que se encuentra en la central de Maven que sea compatible con las versiones de Spark y Scala de su entorno de Spark. Se recomienda un entorno que admita Spark 3.0 o una versión posterior y el conector de Spark disponible en las coordenadas de Maven com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0. Si usa Spark 2.x, se recomienda un entorno con la versión de Spark 2.4.5, con el conector de Spark en las coordenadas de Maven com.datastax.spark:spark-cassandra-connector_2.11:2.4.3.

  • Biblioteca auxiliar de Azure Cosmos DB para Cassandra API: si usa una versión de Spark 2.x, además del conector de Spark, necesita otra biblioteca llamada azure-cosmos-cassandra-spark-helper con las coordenadas de Maven com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 de Azure Cosmos DB para administrar el límite de velocidad. Esta biblioteca contiene un generador de conexión y clases de directivas de reintentos personalizados.

    La directiva de reintentos de Azure Cosmos DB está configurada para controlar las excepciones del código de estado HTTP 429 ("Tasa de solicitud grande"). Cassandra API de Azure Cosmos DB traslada estas excepciones como errores sobrecargados en el protocolo nativo de Cassandra y el usuario puede volver a intentarlo con interrupciones. Dado que Azure Cosmos DB usa el modelo de rendimiento aprovisionado, se producen excepciones de limitación de tasas de solicitud al aumentar las tasas de entrada/salida. La directiva de reintentos protege los trabajos de Spark frente a picos de datos que superan momentáneamente el rendimiento asignado al contenedor. Si usa el conector de Spark 3.x, no es necesario implementar esta biblioteca.

    Nota

    La directiva de reintentos solo puede proteger los trabajos de Spark frente a picos momentáneos. Si no ha configurado suficientes RU necesarias para ejecutar la carga de trabajo, la directiva de reintentos no es aplicable y la clase de directiva de reintentos vuelve a producir la excepción.

  • Detalles de conexión de la cuenta de Azure Cosmos DB: nombre de la cuenta, punto de conexión de la cuenta y clave de Cassandra API de Azure.

Optimización de la configuración de rendimiento del conector de Spark

En la sección siguiente se muestran todos los parámetros pertinentes para controlar el rendimiento mediante el conector de Spark para Cassandra. Para optimizar los parámetros a fin de maximizar el rendimiento de los trabajos de Spark, las configuraciones de spark.cassandra.output.concurrent.writes, spark.cassandra.concurrent.reads y spark.cassandra.input.reads_per_sec deben ser las correctas para así evitar demasiados límites e interrupciones (lo que, a su vez, puede dar lugar a un menor rendimiento).

El valor óptimo de estas configuraciones depende de cuatro factores:

  • La cantidad de rendimiento (unidades de solicitud) configurada para la tabla en la que se ingieren los datos.
  • El número de trabajos en el clúster de Spark.
  • El número de ejecutores configurados para el trabajo de Spark (que se puede controlar mediante spark.cassandra.connection.connections_per_executor_max o spark.cassandra.connection.remoteConnectionsPerExecutor, según la versión de Spark).
  • La latencia media de cada solicitud a Cosmos DB, si se le coloca en el mismo centro de datos. Suponga que este valor es de 10 ms para escrituras y 3 ms para lecturas.

Por ejemplo, si tenemos 5 trabajos y un valor de spark.cassandra.output.concurrent.writes= 1 y un valor de spark.cassandra.connection.remoteConnectionsPerExecutor= 1, tenemos 5 trabajos que escriben simultáneamente en la tabla, cada uno con 1 subproceso. Si se tardan 10 ms en realizar una sola escritura, podemos enviar 100 solicitudes (1000 milisegundos divididos por 10) por segundo, por subproceso. Con 5 trabajos, serían 500 escrituras por segundo. Con un costo medio de 5 unidades de solicitud (RU) por escritura, la tabla de destino necesitaría 2500 unidades de solicitud aprovisionadas como mínimo (5 RU x 500 escrituras por segundo).

Al aumentar el número de ejecutores, puede aumentar el número de subprocesos de un trabajo determinado, lo que a su vez puede aumentar el rendimiento. Sin embargo, el efecto exacto de esta acción puede ser variable en función del trabajo, mientras que el control del rendimiento con el número de trabajos es más determinista. También puede determinar el costo exacto de una solicitud determinada mediante la generación de perfiles para obtener el cargo de unidad de solicitud (RU). Esta opción le ayudará a ser más preciso al aprovisionar el rendimiento de la tabla o el espacio de claves. Consulte nuestro artículo aquí para comprender cómo obtener los cargos por unidad de solicitud en un nivel de solicitud.

Escalado del rendimiento en la base de datos

El conector de Cassandra Spark saturará el rendimiento en Azure Cosmos DB de forma muy eficaz. Como resultado, incluso con reintentos efectivos, deberá asegurarse de que tiene suficiente rendimiento (RU) aprovisionado en el nivel de tabla o espacio de claves para evitar errores relacionados con la limitación de velocidad. La configuración mínima de 400 RU en una tabla o un espacio de claves determinados no será suficiente. Incluso con las opciones de configuración de rendimiento mínimo, el conector de Spark puede escribir a una velocidad correspondiente a aproximadamente 6000 unidades de solicitud o más.

Si la configuración de RU necesaria para el movimiento de datos mediante Spark es mayor que la necesaria para la carga de trabajo de estado estable, puede escalar y reducir verticalmente fácilmente el rendimiento de forma sistemática en Azure Cosmos DB para satisfacer las necesidades de la carga de trabajo durante un período de tiempo determinado. Lea nuestro artículo sobre el escalado elástico en Cassandra API para conocer las distintas opciones de escalado mediante programación y dinámica.

Nota

En la guía anterior se supone una distribución de datos razonablemente uniforme. Si tiene un sesgo significativo en los datos (es decir, un número desmesuradamente grande de lecturas y escrituras para el mismo valor de clave de partición), es posible que siga experimentando cuellos de botella, incluso si tiene un gran número de unidades de solicitud aprovisionadas en la tabla. Las unidades de solicitud se dividen equitativamente entre las particiones físicas y, una asimetría de datos intensiva puede causar un cuello de botella de las solicitudes en una única partición.

Parámetros de configuración de rendimiento del conector de Spark

En la tabla siguiente se enumeran los parámetros de configuración de rendimiento específicos de Cassandra API de Azure Cosmos DB proporcionados por el conector. Para obtener una lista detallada de todos los parámetros de configuración, consulte la página de referencia de configuración del repositorio de GitHub del conector de Cassandra de Spark.

Nombre de la propiedad Valor predeterminado Descripción
spark.cassandra.output.batch.size.rows 1 Número de filas por lote único. Establezca este parámetro en 1. Este parámetro se utiliza para lograr un mayor rendimiento para las cargas de trabajo altas.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) None Número máximo de conexiones por nodo y ejecutor. 10*n equivale a diez conexiones por nodo en un clúster de Cassandra con n nodos. Por lo tanto, si necesita cinco conexiones por nodo y ejecutor para un clúster de Cassandra de cinco nodos, debe establecer esta configuración en 25. Modifique este valor según el grado de paralelismo o el número de ejecutores configurados para los trabajos de Spark.
spark.cassandra.output.concurrent.writes 100 Define el número de escrituras paralelas que pueden producirse por ejecutor. Dado que ha establecido "batch.size.rows" en 1, asegúrese de escalar verticalmente este valor en consecuencia. Modifique este valor según el grado de paralelismo o el rendimiento que desea lograr para la carga de trabajo.
spark.cassandra.concurrent.reads 512 Define el número de lecturas en paralelo que pueden producirse por ejecutor. Modifique este valor según el grado de paralelismo o el rendimiento que desea lograr para la carga de trabajo.
spark.cassandra.output.throughput_mb_per_sec None Define el rendimiento de escritura total por ejecutor. Este parámetro puede usarse como límite superior para el rendimiento de trabajo de Spark y basarse en el rendimiento aprovisionado del contenedor de Cosmos.
spark.cassandra.input.reads_per_sec None Define el rendimiento de lectura total por ejecutor. Este parámetro puede usarse como límite superior para el rendimiento de trabajo de Spark y basarse en el rendimiento aprovisionado del contenedor de Cosmos.
spark.cassandra.output.batch.grouping.buffer.size 1000 Define el número de lotes por cada tarea única de Spark que se pueden almacenar en la memoria antes de enviarlos a Cassandra API.
spark.cassandra.connection.keep_alive_ms 60000 Define el período de tiempo hasta el que están disponibles las conexiones no utilizadas.

Ajuste el rendimiento y el grado de paralelismo de estos parámetros en función de la carga de trabajo que espera para los trabajos de Spark y el rendimiento que se ha aprovisionado para la cuenta de Cosmos DB.

Conexión con Cassandra API de Azure Cosmos DB desde Spark

cqlsh

Los comandos siguientes proporcionan información detallada acerca de cómo conectarse a Cassandra API de Azure CosmosDB desde cqlsh. Esto es útil para la validación mientras se ejecuta a través de los ejemplos de Spark.
Desde Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

En el siguiente artículo se contempla el aprovisionamiento del clúster de Azure Databricks, la configuración del clúster para conectarse a Cassandra API de Azure Cosmos DB y varios cuadernos de ejemplo que abarcan operaciones DDL, operaciones DML y mucho más.
Uso de Cassandra API de Azure Cosmos DB desde Azure Databricks

2. Spark de Azure HDInsight

En el siguiente artículo se contempla el servicio de Spark de Azure HDInsight, el aprovisionamiento del clúster de Azure Databricks, la configuración del clúster para conectarse a Cassandra API de Azure Cosmos DB y varios cuadernos de ejemplo que abarcan operaciones DDL, operaciones DML y mucho más.
Uso de Cassandra API de Azure Cosmos DB desde Spark de Azure HDInsight

3. Entorno de Spark en general

Mientras que las secciones anteriores eran específicas de servicios de PaaS basados en Spark de Azure, esta sección trata cualquier entorno general de Spark. A continuación se proporciona información detallada acerca de las dependencias del conector, las importaciones y la configuración de sesión de Spark. En la sección "Siguientes pasos" se muestran ejemplos de código para operaciones DDL, operaciones DML y mucho más.

Dependencias del conector:

  1. Agregue las coordenadas de Maven para obtener el conector de Cassandra para Spark.
  2. Agregue las coordenadas de Maven para la biblioteca auxiliar de Azure Cosmos DB para Cassandra API.

Importaciones:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Configuración de la sesión de Spark:

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

//Throughput-related. You can adjust the values as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
//spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") // Spark 2.x
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10") // Spark 3.x
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

Pasos siguientes

En los artículos siguientes se muestra la integración de Spark con Cassandra API de Azure Cosmos DB.