Partilhar via


Ligar ao Azure Cosmos DB para Apache Cassandra a partir do Spark

APLICA-SE A: Cassandra

Este artigo está entre uma série de artigos sobre a integração do Azure Cosmos DB para Apache Cassandra a partir do Spark. Os artigos abrangem a conectividade, as operações DDL (Data Definition Language), as operações básicas da Linguagem de Manipulação de Dados (DML) e a integração avançada do Azure Cosmos DB para Apache Cassandra a partir do Spark.

Pré-requisitos

Dependências de conectividade

  • Conector do Spark para Cassandra: O conector do Spark é utilizado para ligar ao Azure Cosmos DB para Apache Cassandra. Identifique e utilize a versão do conector localizado no maven central que é compatível com as versões spark e Scala do seu ambiente do Spark. Recomendamos um ambiente que suporte o Spark 3.2.1 ou superior e o conector spark disponível nas coordenadas com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0do Maven. Se utilizar o Spark 2.x, recomendamos um ambiente com a versão 2.4.5 do Spark, com o conector spark nas coordenadas com.datastax.spark:spark-cassandra-connector_2.11:2.4.3do Maven.

  • Biblioteca de programa auxiliar do Azure Cosmos DB para a API para Cassandra: Se estiver a utilizar uma versão do Spark 2.x, além do conector do Spark, precisa de outra biblioteca chamada azure-cosmos-cassandra-spark-helper com coordenadas com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 maven do Azure Cosmos DB para processar a limitação de taxa. Esta biblioteca contém classes de política de repetição e fábrica de ligações personalizadas.

    A política de repetição no Azure Cosmos DB está configurada para processar exceções do código de estado HTTP 429("Taxa de Pedidos Grande"). O Azure Cosmos DB para Apache Cassandra traduz estas exceções em erros sobrecarregados no protocolo nativo do Cassandra e pode tentar novamente com back-offs. Uma vez que o Azure Cosmos DB utiliza o modelo de débito aprovisionado, as exceções de limitação da taxa de pedidos ocorrem quando as taxas de entrada/saída aumentam. A política de repetição protege os trabalhos do spark contra picos de dados que excedem momentaneamente o débito alocado ao contentor. Se estiver a utilizar o conector spark 3.x, a implementação desta biblioteca não é necessária.

    Nota

    A política de repetição pode proteger as tarefas do spark apenas contra picos momentâneos. Se não tiver configurado RUs suficientes necessárias para executar a carga de trabalho, a política de repetição não é aplicável e a classe de política de repetição volta a criar a exceção.

  • Detalhes da ligação da conta do Azure Cosmos DB: O nome da conta, o ponto final da conta e a chave da sua API do Azure para Cassandra.

Otimizar a configuração do débito do conector do Spark

Listados na secção seguinte encontram-se todos os parâmetros relevantes para controlar o débito com o Conector spark para Cassandra. Para otimizar os parâmetros para maximizar o débito das tarefas do Spark, as spark.cassandra.output.concurrent.writesconfigurações , spark.cassandra.concurrent.readse spark.cassandra.input.reads_per_sec têm de ser configuradas corretamente, de modo a evitar demasiada limitação e ativação (o que, por sua vez, pode levar a um débito mais baixo).

O valor ideal destas configurações depende de quatro fatores:

  • A quantidade de débito (Unidades de Pedido) configurada para a tabela na qual os dados estão a ser ingeridos.
  • O número de trabalhos no cluster do Spark.
  • O número de executores configurados para a tarefa do Spark (que podem ser controlados com spark.cassandra.connection.connections_per_executor_max ou spark.cassandra.connection.remoteConnectionsPerExecutor dependendo da versão do Spark)
  • A latência média de cada pedido para o Azure Cosmos DB, se for colocado no mesmo DataCenter. Suponha que este valor é 10 ms para escritas e 3 ms para leituras.

Por exemplo, se tivermos cinco trabalhadores e um valor de spark.cassandra.output.concurrent.writes= 1 e um valor de spark.cassandra.connection.remoteConnectionsPerExecutor = 1, temos cinco trabalhadores que estão simultaneamente a escrever na tabela, cada um com um thread. Se demorar 10 ms a efetuar uma única escrita, podemos enviar 100 pedidos (1000 milissegundos divididos por 10) por segundo, por thread. Com cinco trabalhadores, seriam 500 escritas por segundo. Com um custo médio de cinco unidades de pedido (RUs) por escrita, a tabela de destino precisaria de um mínimo de 2500 unidades de pedido aprovisionadas (5 RUs x 500 escritas por segundo).

Aumentar o número de executores pode aumentar o número de threads numa determinada tarefa, o que pode, por sua vez, aumentar o débito. No entanto, o impacto exato desta situação pode ser variável consoante o trabalho, ao passo que controlar o débito com o número de trabalhadores é mais determinista. Também pode determinar o custo exato de um determinado pedido através da criação de perfis para obter o custo da Unidade de Pedido (RU). Isto irá ajudá-lo a ser mais preciso ao aprovisionar o débito para a sua tabela ou espaço de chaves. Veja o nosso artigo aqui para compreender como obter os custos da unidade de pedido a um nível por pedido.

Dimensionar débito na base de dados

O conector do Apache Spark para Cassandra saturará o débito no Azure Cosmos DB de forma eficiente. Como resultado, mesmo com repetições efetivas, terá de garantir que tem débito (RUs) suficiente aprovisionado ao nível da tabela ou do espaço de chaves para evitar erros relacionados com a limitação da taxa. A definição mínima de 400 RUs numa determinada tabela ou espaço de chaves não será suficiente. Mesmo com as definições mínimas de configuração de débito, o conector do Spark pode escrever a uma taxa correspondente a cerca de 6000 unidades de pedido ou mais.

Se a definição de RUs necessária para o movimento de dados com o Spark for superior ao necessário para a carga de trabalho de estado estável, pode aumentar e reduzir verticalmente de forma sistemática o débito no Azure Cosmos DB para satisfazer as necessidades da carga de trabalho durante um determinado período de tempo. Leia o nosso artigo sobre dimensionamento elástico na API para Cassandra para compreender as diferentes opções de dimensionamento de forma programática e dinâmica.

Nota

A documentação de orientação acima pressupõe uma distribuição razoavelmente uniforme de dados. Se tiver uma distorção significativa nos dados (ou seja, um número excessivamente grande de leituras/escritas no mesmo valor de chave de partição), poderá continuar a deparar-se com estrangulamentos, mesmo que tenha um grande número de unidades de pedido aprovisionadas na tabela. As unidades de pedido são divididas de forma igual entre partições físicas e a distorção de dados pesada pode causar um estrangulamento de pedidos para uma única partição.

Parâmetros de configuração do débito do conector spark

A tabela seguinte lista os parâmetros de configuração de débito específicos do Azure Cosmos DB para Apache Cassandra fornecidos pelo conector. Para obter uma lista detalhada de todos os parâmetros de configuração, veja a página de referência de configuração do repositório gitHub do Conector do Apache Cassandra do Spark.

Nome da Propriedade Valor predefinido Descrição
spark.cassandra.output.batch.size.rows 1 Número de linhas por único lote. Defina este parâmetro como 1. Este parâmetro é utilizado para obter um débito mais elevado para cargas de trabalho pesadas.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Nenhuma Número máximo de ligações por nó por executor. 10*n é equivalente a 10 ligações por nó num cluster do Cassandra de n nós. Por isso, se precisar de cinco ligações por nó por executor para um cluster cassandra de cinco nós, deve definir esta configuração como 25. Modifique este valor com base no grau de paralelismo ou no número de executores para os quais as tarefas do Spark estão configuradas.
spark.cassandra.output.concurrent.writes 100 Define o número de escritas paralelas que podem ocorrer por executor. Uma vez que define "batch.size.rows" como 1, certifique-se de que aumenta verticalmente este valor em conformidade. Modifique este valor com base no grau de paralelismo ou no débito que pretende alcançar para a carga de trabalho.
spark.cassandra.concurrent.reads 512 Define o número de leituras paralelas que podem ocorrer por executor. Modifique este valor com base no grau de paralelismo ou no débito que pretende alcançar para a carga de trabalho
spark.cassandra.output.throughput_mb_per_sec Nenhuma Define o débito total de escrita por executor. Este parâmetro pode ser utilizado como um limite superior para o débito da tarefa do Spark e baseá-lo no débito aprovisionado do contentor do Azure Cosmos DB.
spark.cassandra.input.reads_per_sec Nenhuma Define o débito de leitura total por executor. Este parâmetro pode ser utilizado como um limite superior para o débito da tarefa do Spark e baseá-lo no débito aprovisionado do contentor do Azure Cosmos DB.
spark.cassandra.output.batch.grouping.buffer.size 1000 Define o número de lotes por única tarefa do Spark que podem ser armazenados na memória antes de enviar para a API para Cassandra
spark.cassandra.connection.keep_alive_ms 60000 Define o período de tempo até que as ligações não utilizadas estejam disponíveis.

Ajuste o débito e o grau de paralelismo destes parâmetros com base na carga de trabalho esperada para os trabalhos do Spark e no débito que aprovisionou para a sua conta do Azure Cosmos DB.

Ligar ao Azure Cosmos DB para Apache Cassandra a partir do Spark

cqlsh

Os comandos seguintes detalham como ligar ao Azure Cosmos DB para Apache Cassandra a partir do cqlsh. Isto é útil para validação à medida que percorre os exemplos no Spark.
No Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

O artigo abaixo abrange o aprovisionamento de clusters do Azure Databricks, a configuração do cluster para ligar ao Azure Cosmos DB para Apache Cassandra e vários blocos de notas de exemplo que abrangem operações de DDL, operações de DML e muito mais.
Trabalhar com o Azure Cosmos DB para Apache Cassandra a partir do Azure Databricks

2. Azure HDInsight-Spark

O artigo abaixo abrange HDinsight-Spark serviço, aprovisionamento, configuração do cluster para ligar ao Azure Cosmos DB para Apache Cassandra e vários blocos de notas de exemplo que abrangem operações de DDL, operações de DML e muito mais.
Trabalhar com o Azure Cosmos DB para Apache Cassandra a partir do Azure HDInsight-Spark

3. Ambiente do Spark em geral

Embora as secções acima tenham sido específicas dos serviços PaaS baseados no Azure Spark, esta secção abrange qualquer ambiente geral do Spark. As dependências do conector, as importações e a configuração da sessão do Spark estão detalhadas abaixo. A secção "Passos seguintes" abrange exemplos de código para operações DDL, operações DML e muito mais.

Dependências do conector:

  1. Adicionar as coordenadas do maven para obter o conector do Cassandra para Spark
  2. Adicionar as coordenadas do maven para a biblioteca de programa auxiliar do Azure Cosmos DB para a API para Cassandra

Importações:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Configuração da sessão do Spark:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Passos seguintes

Os seguintes artigos demonstram a integração do Spark com o Azure Cosmos DB para Apache Cassandra.