Подключение к Azure Cosmos DB для Apache Cassandra из Spark

ПРИМЕНИМО К: Кассандра

Эта статья является одной из серии статей об интеграции Azure Cosmos DB с Apache Cassandra из Spark. В статьях рассматриваются возможности подключения, операции языка определения данных (DDL), базовые операции языка обработки данных (DML) и расширенная интеграция Azure Cosmos DB для Apache Cassandra из Spark.

Предварительные требования

Зависимости для подключения

  • Соединитель Spark для Cassandra: Соединитель Spark используется для подключения к Azure Cosmos DB для Apache Cassandra. Определите и используйте версию соединителя, расположенного в Maven central, которая совместима с версиями Spark и Scala вашей среды Spark. Мы рекомендуем использовать окружение, которое поддерживает оболочку Spark 3.2.1 или более поздней версии, а также соединитель Spark, который доступен по координатам Maven com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. При использовании Spark 2.x мы рекомендуем применять среду с Spark версии 2.4.5 с помощью соединителя Spark в координатах Maven com.datastax.spark:spark-cassandra-connector_2.11:2.4.3.

  • Вспомогательская библиотека Azure Cosmos DB для API для Cassandra: Если вы используете версию Spark 2.x, то в дополнение к соединителю Spark вам потребуется еще одна библиотека с именем azure-cosmos-cassandra-spark-helper с координатами com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 Maven из Azure Cosmos DB для обработки ограничения скорости. Эта библиотека содержит классы настраиваемой фабрики подключений и политик повтора.

    Политика повтора в Azure Cosmos DB настроена для обработки исключений кода состояния HTTP 429 ("Высокая частота запросов"). Azure Cosmos DB для Apache Cassandra преобразует эти исключения в перегруженные ошибки по собственному протоколу Cassandra, и вы можете повторить попытку с откатом. Так как Azure Cosmos DB использует модель подготовленной пропускной способности, то исключения ограничения частоты запроса возникают тогда, когда увеличивается скорость входа/выхода. Политика повтора обеспечивает защиту заданий Spark от скачка данных, который кратковременно превышает пропускную способность, выделенную для вашего контейнера. При использовании соединителя Spark версии 3.x реализация этой библиотеки не требуется.

    Примечание

    Политика повтора может защитить задания Spark только от кратковременных скачков. Если вы не настроили достаточное количество ЕЗ, необходимых для выполнения вашей рабочей нагрузки, тогда политика повтора не применима, а класс политики повтора еще раз генерирует исключение.

  • Сведения о подключении учетной записи Azure Cosmos DB: Имя учетной записи, конечная точка и ключ учетной записи Azure API для Cassandra.

Оптимизация конфигурации пропускной способности соединителя Spark

В следующем разделе перечислены все необходимые параметры для управления пропускной способностью с помощью соединителя Spark для Cassandra. Чтобы оптимизировать параметры для увеличения пропускной способности заданий Spark, необходимо правильно настроить конфигурации spark.cassandra.output.concurrent.writes, spark.cassandra.concurrent.reads и spark.cassandra.input.reads_per_sec. Это поможет избежать слишком большого объема регулирования количества запросов и слишком большой задержки (что, в свою очередь, может привести к снижению пропускной способности).

Оптимальное значение этих конфигураций зависит от 4 факторов:

  • Объем пропускной способности (количество единиц запроса), настроенный для таблицы, в которую отправляются данные.
  • Число рабочих ролей в кластере Spark.
  • Число исполнителей, настроенных для задания Spark (которых можно контролировать с помощью spark.cassandra.connection.connections_per_executor_max или spark.cassandra.connection.remoteConnectionsPerExecutor).
  • Средняя задержка каждого запроса к Azure Cosmos DB, если вы находитесь в одном центре обработки данных. Предположим, что это значение равно 10 мс для операций записи и 3 мс для операций чтения.

Например, если у нас есть 5 рабочих ролей и значение spark.cassandra.output.concurrent.writes равно 1, а значение spark.cassandra.connection.remoteConnectionsPerExecutor также равно 1, то у нас есть 5 рабочих ролей, которые одновременно записываются в таблицу, каждая с 1 потоком. Если для выполнения одной операции записи требуется 10 мс, мы можем отправить 100 запросов (1000 миллисекунд разделить на 10) в секунду на один поток. При использовании 5 рабочих ролей получится 500 операций записи в секунду. При средней стоимости 5 единиц запросов на одну операцию записи целевой таблице потребуется минимум 2500 подготовленных единиц запросов (5 единиц запросов умножить на 500 операций записи в секунду).

При увеличении числа исполнителей может увеличиться количество потоков в этом задании, что, в свою очередь, может увеличить пропускную способность. Однако точное воздействие этого может быть переменчивым в зависимости от задания, а управление пропускной способностью с помощью количества рабочих ролей — более детерминированным. Вы также можете определить точную стоимость запроса, выполнив его профилирование для получения платы за единицу запроса. С помощью этого вы получите более точные сведения при подготовке пропускной способности для таблицы или ключевого пространства. Ознакомьтесь с этой статьей, чтобы понять, как получить плату за единицу запросов на уровне запроса.

Масштабирование пропускной способности в базе данных

Соединитель Cassandra Spark эффективно повышает пропускную способность Azure Cosmos DB. В результате даже при эффективных повторных попытках необходимо обеспечить достаточную пропускную способность (ЕЗ) на уровне таблицы или пространства ключей, чтобы предотвратить ошибки, связанные с ограничением скорости. Минимального значения 400 ЕЗ в данной таблице или пространстве ключей будет недостаточно. Даже при минимальной конфигурации пропускной способности соединитель Spark может записывать данные с частотой не менее 6000 единиц запросов.

Если значение параметра ЕЗ, необходимое для перемещения данных с помощью Spark, выше, чем требуется для рабочей нагрузки в стабильном состоянии, можно легко масштабировать пропускную способность в Azure Cosmos DB в соответствии с потребностями рабочей нагрузки для конкретного периода. Ознакомьтесь со статьей об эластичном масштабировании в API для Cassandra , чтобы понять различные варианты масштабирования программным и динамическим способом.

Примечание

В приведенных выше рекомендациях предполагается достаточно равномерное распределение данных. При значительном неравномерном распределении данных (т. е. большом количестве операций чтения или записи для одного значения ключа секции) могут возникнуть узкие места, даже если в таблице подготовлено большое количество единиц запросов. Единицы запросов делятся между физическими секциями, а чрезмерно неравномерное распределение данных может привести к узким местам запросов к одной секции.

Параметры конфигурации пропускной способности соединителя Spark

В следующей таблице перечислены параметры конфигурации пропускной способности Apache Cassandra для Azure Cosmos DB, предоставляемые соединителем. Подробный список всех параметров конфигурации см. на странице Справочник по конфигурации соединителя Cassandra Spark в репозитории GitHub.

Имя свойства Значение по умолчанию Описание
spark.cassandra.output.batch.size.rows 1 Количество строк в одном пакете. Задайте для параметра значение 1. Этот параметр используется для улучшения пропускной способности для больших рабочих нагрузок.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Нет Максимальное число подключений на узел для каждого исполнителя. 10*n равно 10 подключениям на узел для кластера Cassandra с n узлов. Итак, если вам требуется 5 подключений на узел для каждого исполнителя для кластера Cassandra с 5 узлами, то вы должны настроить эту конфигурацию на 25. Меняйте это значение в зависимости от степени параллелизма или количества исполнителей, для которых настроены ваши задания Spark.
spark.cassandra.output.concurrent.writes 100 Определяет количество параллельных записей, которые могут происходить на каждом исполнителе. Поскольку "batch.size.rows" равно 1, убедитесь, что вы увеличили масштаб этого значения соответствующим образом. Меняйте это значение в зависимости от степени параллелизма или пропускной способности, которую вы хотите получить для рабочей нагрузки.
spark.cassandra.concurrent.reads 512 Определяет количество параллельных процессов операций чтения, которые могут происходить на каждом исполнителе. Меняйте это значение в зависимости от степени параллелизма или пропускной способности, которую вы хотите получить для рабочей нагрузки
spark.cassandra.output.throughput_mb_per_sec Нет Определяет общую пропускную способность записи для каждого исполнителя. Этот параметр можно использовать в качестве верхнего предела пропускной способности задания Spark и основывать его на подготовленной пропускной способности контейнера Azure Cosmos DB.
spark.cassandra.input.reads_per_sec Нет Определяет общую пропускную способность чтения для каждого исполнителя. Этот параметр можно использовать в качестве верхнего предела пропускной способности задания Spark и основывать его на подготовленной пропускной способности контейнера Azure Cosmos DB.
spark.cassandra.output.batch.grouping.buffer.size 1000 Определяет количество пакетов для одной задачи Spark, которые можно сохранить в памяти перед отправкой в API для Cassandra.
spark.cassandra.connection.keep_alive_ms 60 000 Определяет период времени, до которого доступны соединения, которые не используются.

Настройте пропускную способность и степень параллелизма этих параметров в зависимости от ожидаемой рабочей нагрузки для заданий Spark и пропускной способности, подготовленной для учетной записи Azure Cosmos DB.

Подключение к Azure Cosmos DB для Apache Cassandra из Spark

cqlsh

В следующих командах подробно описано, как подключиться к Azure Cosmos DB для Apache Cassandra из cqlsh. Это можно использовать для проверки во время запуска образцов в оболочке Spark.
Из Linux, Unix и Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

В следующей статье рассматриваются подготовка кластера Azure Databricks, конфигурация кластера для подключения к Azure Cosmos DB для Apache Cassandra, а также несколько примеров записных книжек, которые охватывают операции DDL, операции DML и многое другое.
Работа с Azure Cosmos DB для Apache Cassandra из Azure Databricks

2. Azure HDInsight — Spark

В следующей статье рассматриваются HDinsight-Spark службы, подготовки, конфигурации кластера для подключения к Azure Cosmos DB для Apache Cassandra, а также несколько примеров записных книжек, охватывающих операции DDL, операции DML и многое другое.
Работа с Azure Cosmos DB для Apache Cassandra из Azure HDInsight-Spark

3. Общие сведения о среде Spark

Хотя приведенные выше разделы концентрировались на службах PaaS, основанных на Azure Spark, этот раздел охватывает общие сведения о среде Spark. Зависимости соединителя, импорт и конфигурация сеанса Spark приведены ниже. В разделе Next steps (Дальнейшие действия) рассматриваются примеры кода для операций DDL, DML и других.

Зависимости соединителя:

  1. Добавить координаты Maven, чтобы получить соединитель Cassandra для Spark
  2. Добавление координат maven для вспомогательной библиотеки Azure Cosmos DB для API для Cassandra

Импорт:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Конфигурация сеанса Spark:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Дальнейшие действия

В следующих статьях демонстрируется интеграция Spark с Azure Cosmos DB для Apache Cassandra.