Herstellen einer Verbindung mit Azure Cosmos DB for Apache Cassandra in Spark

GILT FÜR: Cassandra

Dieser Artikel gehört zu einer Reihe von Artikeln zur Azure Cosmos DB for Apache Cassandra-Integration in Spark. In den Artikeln werden Konnektivität, DDL-Vorgänge (Data Definition Language), grundlegende DML-Vorgänge (Data Manipulation Language) und die erweiterte Azure Cosmos DB for Apache Cassandra-Integration in Spark behandelt.

Voraussetzungen

Abhängigkeiten für die Konnektivität

  • Spark-Connector für Cassandra: Der Spark-Connector wird verwendet, um eine Verbindung mit Azure Cosmos DB for Apache Cassandra herzustellen. Identifizieren und verwenden Sie die Version des Connectors unter Maven > Central, die mit den Spark- und Scala-Versionen Ihrer Spark-Umgebung kompatibel ist. Wir empfehlen eine Umgebung, die Spark 3.2.1 oder höher unterstützt, und den Spark-Connector, der unter den Maven-Koordinaten com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 verfügbar ist. Bei Verwendung von Spark 2.x wird eine Umgebung mit Spark-Version 2.4.5 empfohlen, in der der Spark-Connector bei den maven-Koordinaten com.datastax.spark:spark-cassandra-connector_2.11:2.4.3 verwendet wird.

  • Azure Cosmos DB-Hilfsbibliothek für die API für Cassandra: Wenn Sie eine Spark-Version 2.x verwenden, benötigen Sie zusätzlich zum Spark-Connector eine weitere Bibliothek namens azure-cosmos-cassandra-spark-helper mit den Maven-Koordinaten com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 aus Azure Cosmos DB, um mit Ratenbegrenzung umgehen zu können. Diese Bibliothek enthält benutzerdefinierte Klassen für die Verbindungsfactory und Wiederholungsrichtlinie.

    Die Wiederholungsrichtlinie in Azure Cosmos DB ist für die Verarbeitung von Ausnahmen mit dem HTTP-Statuscode 429 („Anforderungsrate ist groß“) konfiguriert. Azure Cosmos DB for Apache Cassandra übersetzt diese Ausnahmen in Überladungsfehler im nativen Cassandra-Protokoll, und Sie können mit Backoffs wiederholen. Da Azure Cosmos DB das bereitgestellte Durchsatzmodell verwendet, treten Ausnahmen in Bezug auf große Anforderungsraten auf, wenn die Rate der eingehenden/ausgehenden Daten steigt. Durch die Wiederholungsrichtlinie werden Ihre Spark-Aufträge vor Datenspitzen geschützt, die den für Ihren Container zugeordneten Durchsatz vorübergehend überschreiten. Wenn Sie den Spark 3.x-Connector verwenden, ist die Implementierung dieser Bibliothek nicht erforderlich.

    Hinweis

    Die Wiederholungsrichtlinie kann Ihre Spark-Aufträge nur vor temporären Spitzen schützen. Wenn Sie nicht genügend RUs konfiguriert haben, die zum Ausführen Ihrer Workloads erforderlich sind, ist die Wiederholungsrichtlinie nicht anwendbar, und die Richtlinienklasse löst erneut die Ausnahme aus.

  • Verbindungsdetails zum Azure Cosmos DB-Konto: Der Name Ihres API für Cassandra-Kontos, der Kontoendpunkt und der Schlüssel in Azure.

Optimierten der Durchsatzkonfiguration für den Spark-Connector

Im nächsten Abschnitt sind alle relevanten Parameter zum Steuern des Durchsatzes bei Verwendung des Spark-Connectors für Cassandra aufgeführt. Zum Optimieren der Parameter für einen maximalen Durchsatz von Spark-Aufträgen müssen die Konfigurationen spark.cassandra.output.concurrent.writes, spark.cassandra.concurrent.reads und spark.cassandra.input.reads_per_sec ordnungsgemäß konfiguriert werden, um zu hohe Drosselung und Backoff zu vermeiden (was wiederum zu einem geringeren Durchsatz führen kann).

Der optimale Wert dieser Konfigurationen hängt von vier Faktoren ab:

  • Die Größe des Durchsatzes (Anforderungseinheiten), der für die Tabelle konfiguriert ist, in der Daten erfasst werden.
  • Die Anzahl der Worker im Spark-Cluster.
  • Die Anzahl der für den Spark-Auftrag konfigurierten Executors (die abhängig von der Spark-Version mit spark.cassandra.connection.connections_per_executor_max oder spark.cassandra.connection.remoteConnectionsPerExecutor gesteuert werden können).
  • Die durchschnittliche Latenz der einzelnen Anforderungen an Azure Cosmos DB, wenn Sie sich im selben Rechenzentrum befinden. Als Wert dafür kann 10 ms für Schreibvorgänge und 3 ms für Lesevorgänge angenommen werden.

Wenn beispielsweise fünf Worker vorhanden sind und der Wert spark.cassandra.output.concurrent.writes = 1 sowie der Wert spark.cassandra.connection.remoteConnectionsPerExecutor = 1 ist, gibt es fünf Worker, die gleichzeitig in die Tabelle schreiben, mit jeweils einem Thread. Wenn ein einzelner Schreibvorgang 10 ms dauert, können pro Thread 100 Anforderungen (1000 Millisekunden geteilt durch 10) pro Sekunde gesendet werden. Bei fünf Workern wären dies 500 Schreibvorgänge pro Sekunde. Bei durchschnittlichen Kosten von fünf Anforderungseinheiten (Request Units, RUs) pro Schreibvorgang müssen für die Zieltabelle mindestens 2.500 Anforderungseinheiten (5 RUs · 500 Schreibvorgänge pro Sekunde) bereitgestellt werden.

Eine Erhöhung der Anzahl der Executors kann die Anzahl der Threads in einem bestimmten Auftrag erhöhen, was wiederum den Durchsatz erhöhen kann. Die genauen Auswirkungen können jedoch vom jeweiligen Auftrag abhängen. Demgegenüber ist die Steuerung des Durchsatzes über die Anzahl der Worker deterministischer. Sie können auch die genauen Kosten einer bestimmten Anforderung ermitteln, indem Sie ein Anfroderungsprofil erstellen, um die RU-Gebühr zu erhalten. Auf diese Weise können Sie bei der Bereitstellung des Durchsatzes für Ihre Tabelle oder Ihren Keyspace die Genauigkeit erhöhen. In diesem Artikel erfahren Sie, wie Sie RU-Gebühren auf Anforderungsebene ermitteln.

Skalieren des Durchsatzes in der Datenbank

Der Cassandra Spark-Connector schöpft den Durchsatz in Azure Cosmos DB effizient aus. Daher müssen Sie auch bei effektiven Wiederholungen sicherstellen, dass ausreichend Durchsatz (RUs) auf Tabellen- oder Keyspaceebene bereitgestellt wird, um Ratenbegrenzungsfehler zu vermeiden. Die Mindesteinstellung von 400 RUs in einer Tabelle oder einem Keyspace ist nicht ausreichend. Selbst bei der Mindestkonfigurationseinstellung für den Durchsatz kann der Spark-Connector mit einer Rate von ca. 6.000 Anforderungseinheiten oder mehr schreiben.

Wenn für die Datenverschiebung mit Spark eine höhere RU-Einstellung benötigt wird als für Ihre Workload im aktiven Zustand erforderlich ist, können Sie den Durchsatz in Azure Cosmos DB problemlos systematisch auf- und abskalieren, um die Anforderungen Ihrer Workload für einen bestimmten Zeitraum zu erfüllen. Lesen Sie den Artikel zur elastischen Skalierung in der API für Cassandra, um die verschiedenen Optionen für die programmgesteuerte und dynamische Skalierung zu verstehen.

Hinweis

In der obigen Anleitung wird von einer relativ einheitlichen Verteilung der Daten ausgegangen. Wenn die Daten stark abweichen (d. h. die Anzahl von Lese-/Schreibvorgängen für denselben Partitionsschlüsselwert übermäßig groß ist), treten möglicherweise auch dann, wenn eine große Anzahl von Anforderungseinheiten in der Tabelle bereitgestellt wird, weiterhin Engpässe auf. Anforderungseinheiten werden gleichmäßig auf die physischen Partitionen verteilt werden und eine starke Datenschiefe kann einen Engpass bei Anforderungen an eine einzelne Partition verursachen.

Parameter zur Konfiguration des Durchsatzes für den Spark-Connector

Die folgende Tabelle enthält die für Azure Cosmos DB for Apache Cassandra spezifischen Parameter für die Durchsatzkonfiguration, die vom Connector bereitgestellt werden. Eine detaillierte Liste aller Konfigurationsparameter finden Sie im GitHub-Repository für den Cassandra-Connector für Spark auf der Seite Konfigurationsreferenz.

Eigenschaftenname Standardwert Beschreibung
spark.cassandra.output.batch.size.rows 1 Anzahl der Zeilen pro Batch. Legen Sie diesen Parameter auf „1“ fest. Dieser Parameter wird verwendet, um einen höheren Durchsatz für umfassende Workloads zu erzielen.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Keine Maximale Anzahl von Verbindungen pro Knoten pro Executor. 10 x n entspricht 10 Verbindungen pro Knoten in einem Cassandra-Cluster mit n Knoten. Wenn Sie also fünf Verbindungen pro Knoten pro Executor für einen Cassandra-Cluster mit fünf Knoten benötigen, sollten Sie diese Konfiguration auf „25“ festlegen. Ändern Sie diesen Wert basierend auf dem Parallelitätsgrad oder der Anzahl von Executors, für die Ihre Spark-Aufträge konfiguriert sind.
spark.cassandra.output.concurrent.writes 100 Definiert die Anzahl von parallelen Schreibvorgängen, die pro Executor auftreten können. Da Sie „batch.size.rows“ auf „1“ festlegen, skalieren Sie diesen Wert entsprechend hoch. Ändern Sie diesen Wert basierend auf dem Parallelitätsgrad oder dem Durchsatz, den Sie für Ihren Workload erzielen möchten.
spark.cassandra.concurrent.reads 512 Definiert die Anzahl der parallelen Lesevorgänge, die pro Executor auftreten können. Ändern Sie diesen Wert basierend auf dem Parallelitätsgrad oder dem Durchsatz, den Sie für Ihren Workload erzielen möchten.
spark.cassandra.output.throughput_mb_per_sec Keine Definiert den gesamten Schreibdurchsatz pro Executor. Dieser Parameter kann als oberer Grenzwert für den Durchsatz Ihres Spark-Auftrags verwendet werden und basiert auf dem bereitgestellten Durchsatz Ihres Azure Cosmos DB-Containers.
spark.cassandra.input.reads_per_sec Keine Definiert den gesamten Lesedurchsatz pro Executor. Dieser Parameter kann als oberer Grenzwert für den Durchsatz Ihres Spark-Auftrags verwendet werden und basiert auf dem bereitgestellten Durchsatz Ihres Azure Cosmos DB-Containers.
spark.cassandra.output.batch.grouping.buffer.size 1000 Definiert die Anzahl der Batches pro Spark-Task, die In-Memory gespeichert werden können, bevor diese an die API für Cassandra gesendet werden.
spark.cassandra.connection.keep_alive_ms 60000 Definiert den Zeitraum, in dem nicht verwendete Verbindungen zur Verfügung stehen.

Passen Sie den Durchsatz und den Parallelitätsgrad dieser Parameter basierend auf der Workload an, die Sie für Ihre Spark-Aufträge erwarten, sowie dem Durchsatz, den Sie für Ihr Azure Cosmos DB-Konto bereitgestellt haben.

Herstellen einer Verbindung mit Azure Cosmos DB for Apache Cassandra in Spark

cqlsh

Die folgenden Befehle verdeutlichen, wie eine Verbindung mit Azure Cosmos DB for Apache Cassandra über cqlsh hergestellt werden kann. Dies ist nützlich für die Validierung, während Sie die Beispiele in Spark durchgehen.
Auf Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

Im folgenden Artikel werden die Azure Databricks-Clusterbereitstellung, die Clusterkonfiguration für die Verbindung mit Azure Cosmos DB for Apache Cassandra sowie einige Beispielnotebooks behandelt, die u. a. DDL- und DML-Vorgänge abdecken.
Arbeiten mit Azure Cosmos DB for Apache Cassandra in Azure Databricks

2. Azure HDInsight-Spark

Im folgenden Artikel werden der HDinsight Spark-Dienst, die Bereitstellung, die Clusterkonfiguration für die Verbindung mit Azure Cosmos DB for Apache Cassandra sowie einige Beispielnotebooks behandelt, die u. a. DDL- und DML-Vorgänge abdecken.
Arbeiten mit Azure Cosmos DB for Apache Cassandra in Azure HDInsight-Spark

3. Spark-Umgebung im Allgemeinen

Die obigen Abschnitte gelten speziell für Azure Spark-basierte PaaS-Dienste. In diesem Abschnitt wird ein allgemeiner Überblick über die Spark-Umgebung vermittelt. Nachfolgend werden Connectorabhängigkeiten, Importe und die Spark-Sitzungskonfiguration beschrieben. Der Abschnitt „Nächste Schritte“ enthält Codebeispiele für DDL- und DML-Vorgänge und vieles mehr.

Connectorabhängigkeiten:

  1. Fügen Sie die Maven-Koordinaten zum Abrufen des Cassandra-Connectors für Spark hinzu.
  2. Fügen Sie die Maven-Koordinaten für die Bibliothek für das Azure Cosmos DB-Hilfsprogramm für die API für Cassandra hinzu.

Importe:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Spark-Sitzungskonfiguration:

 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000 

Nächste Schritte

In den folgenden Artikeln wird die Spark-Integration in Azure Cosmos DB for Apache Cassandra behandelt.