Verbinding maken met de Azure Cosmos DB Cassandra-API vanuit Spark

VAN TOEPASSING OP: Cassandra-API

Dit artikel is een van een reeks artikelen over Azure Cosmos DB Cassandra-API integratie van Spark. De artikelen hebben betrekking op connectiviteit, DDL-bewerkingen (Data Definition Language), eenvoudige DML-bewerkingen (Data Manipulation Language) en geavanceerde Azure Cosmos DB Cassandra-API integratie van Spark.

Vereisten

Afhankelijkheden voor connectiviteit

  • Spark-connector voor Cassandra: Spark-connector wordt gebruikt om verbinding te maken met Azure Cosmos DB Cassandra-API. Identificeer en gebruik de versie van de connector die zich in Maven central bevindt en die compatibel is met de Spark- en Scala-versies van uw Spark-omgeving. We raden een omgeving aan die ondersteuning biedt voor Spark 3.0 of hoger en de Spark-connector die beschikbaar is op maven-coördinaten. com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 Als u Spark 2.x gebruikt, raden we een omgeving aan met Spark-versie 2.4.5, met behulp van spark-connector op maven-coördinaten. com.datastax.spark:spark-cassandra-connector_2.11:2.4.3

  • Azure Cosmos DB helperbibliotheek voor Cassandra-API: Als u een versie van Spark 2.x gebruikt, hebt u naast de Spark-connector een andere bibliotheek nodig met de naam azure-cosmos-cassandra-spark-helper met maven-coördinaten van Azure Cosmos DB om snelheidsbeperking af te com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 handelen. Deze bibliotheek bevat aangepaste verbindings factory- en beleidklassen voor opnieuw proberen.

    Het beleid voor opnieuw proberen in Azure Cosmos DB is geconfigureerd voor het afhandelen van http-statuscode 429("Request Rate Large")-uitzonderingen. De Azure Cosmos DB Cassandra-API deze uitzonderingen omgezet in overbelaste fouten in het systeemeigen Cassandra-protocol, en u kunt het opnieuw proberen met back-offs. Omdat Azure Cosmos DB een inrichtend doorvoermodel gebruikt, treden er uitzonderingen met een beperking van de aanvraagsnelheid op wanneer de ingress/egress-snelheid toeneemt. Het beleid voor opnieuw proberen beschermt uw Spark-taken tegen gegevenspieken die tijdelijk de doorvoer overschrijden die voor uw container is toegewezen. Als u de Spark 3.x-connector gebruikt, is het implementeren van deze bibliotheek niet vereist.

    Notitie

    Het beleid voor opnieuw proberen kan uw Spark-taken alleen beschermen tegen momentaire pieken. Als u niet voldoende BENODIGDE BENODIGDE R's hebt geconfigureerd om uw workload uit te voeren, is het beleid voor opnieuw proberen niet van toepassing en wordt de uitzondering opnieuw uitgevoerd in de beleidsklasse voor opnieuw proberen.

  • Azure Cosmos DB accountverbindingsgegevens: Uw Azure Cassandra-API accountnaam, account-eindpunt en sleutel.

Doorvoerconfiguratie van Spark-connector optimaliseren

In de volgende sectie worden alle relevante parameters vermeld voor het beheren van de doorvoer met behulp van de Spark-connector voor Cassandra. Om parameters te optimaliseren om de doorvoer voor spark.cassandra.output.concurrent.writes Spark-taken te maximaliseren, moeten de configuraties , en correct worden geconfigureerd om te voorkomen dat er te veel bandbreedte wordt teruggesneld (wat op zijn beurt kan leiden tot een lagere spark.cassandra.concurrent.reads spark.cassandra.input.reads_per_sec doorvoer).

De optimale waarde van deze configuraties is afhankelijk van vier factoren:

  • De hoeveelheid doorvoer (aanvraageenheden) die is geconfigureerd voor de tabel waarin gegevens worden opgenomen.
  • Het aantal werksters in uw Spark-cluster.
  • Het aantal uitvoerders dat is geconfigureerd voor uw Spark-taak (dat kan worden beheerd met of spark.cassandra.connection.connections_per_executor_max spark.cassandra.connection.remoteConnectionsPerExecutor afhankelijk van de Spark-versie)
  • De gemiddelde latentie van elke aanvraag voor cosmos DB, als u zich in hetzelfde datacentrum op dezelfde plaats in de buurt van dezelfde database hebt. Stel dat deze waarde 10 ms is voor schrijf schrijf- en 3 ms voor lees lezen.

Als we bijvoorbeeld 5 werksters en een waarde van = 1 en een waarde van = 1 hebben, dan hebben we 5 werksters die gelijktijdig naar de tabel schrijven, elk met spark.cassandra.output.concurrent.writes spark.cassandra.connection.remoteConnectionsPerExecutor 1 thread. Als het 10 ms duurt om één schrijftijd uit te voeren, kunnen we per thread 100 aanvragen verzenden (1000 milliseconden gedeeld door 10). Met 5 werksters is dit 500 schrijf schrijfwerk per seconde. Tegen een gemiddelde kosten van 5 aanvraageenheden (AANVRAAGeenheden) per schrijfeenheid, moet voor de doeltabel minimaal 2500 aanvraageenheden zijn ingericht (5 AANVRAAGeenheden x 500 schrijf schrijfeenheden per seconde).

Als u het aantal uitvoerders verhoogt, kan het aantal threads in een bepaalde taak toenemen, waardoor de doorvoer kan toenemen. De exacte impact ervan kan echter afhankelijk van de taak variabel zijn, terwijl het beheren van de doorvoer met het aantal werkpersoneel deterministischer is. U kunt ook de exacte kosten van een bepaalde aanvraag bepalen door deze te profileren om de ru-kosten (Request Unit) te krijgen. Dit helpt u nauwkeuriger te zijn bij het inrichten van doorvoer voor uw tabel of keyspace. Bekijk ons artikel hier om te begrijpen hoe u kosten voor aanvraageenheden op aanvraagniveau kunt krijgen.

Doorvoer in de database schalen

De Cassandra Spark-connector zal de doorvoer in Azure Cosmos DB zeer efficiënt verzadigen. Als gevolg hiervan moet u, zelfs bij effectieve nieuwe proberen, ervoor zorgen dat er voldoende doorvoer (RUs) is ingericht op tabel- of keyspaceniveau om fouten met betrekking tot snelheidsbeperking te voorkomen. De minimale instelling van 400 RUs in een bepaalde tabel of keyspace is niet voldoende. Zelfs bij configuratie-instellingen voor minimale doorvoer kan de Spark-connector schrijven met een snelheid die overeenkomt met ongeveer 6000 aanvraageenheden of meer.

Als de RU-instelling die is vereist voor gegevensver movement met behulp van Spark hoger is dan wat is vereist voor uw workload met een stabiele status, kunt u de doorvoer in Azure Cosmos DB eenvoudig systematisch omhoog en omlaag schalen om te voldoen aan de behoeften van uw workload voor een bepaalde periode. Lees ons artikel over elastisch schalen in Cassandra-API inzicht in de verschillende opties voor programmatisch en dynamisch schalen.

Notitie

De bovenstaande richtlijnen gaan uit van een redelijk uniforme verdeling van gegevens. Als er een aanzienlijke scheefheid in de gegevens is (dat wil zeggen, een uitzonderlijk groot aantal lees-/schrijfgegevens naar dezelfde partitiesleutelwaarde), kunnen er nog steeds knelpunten ontstaan, zelfs als u een groot aantal aanvraageenheden hebt ingericht in uw tabel. Aanvraageenheden worden gelijkmatig verdeeld over fysieke partities en grote gegevensverschil kan leiden tot een knelpunt van aanvragen voor één partitie.

Configuratieparameters voor doorvoer van Spark-connector

De volgende tabel bevat Azure Cosmos DB Cassandra-API specifieke configuratieparameters voor doorvoer die door de connector worden geleverd. Zie de configuratiereferentiepagina van de spark Cassandra-connector voor een gedetailleerde lijst met alle configuratieparameters GitHub opslagplaats.

Eigenschapsnaam Standaardwaarde Beschrijving
spark.cassandra.output.batch.size.rows 1 Aantal rijen per batch. Stel deze parameter in op 1. Deze parameter wordt gebruikt om een hogere doorvoer te bereiken voor zware werkbelastingen.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Geen Maximum aantal verbindingen per knooppunt per uitvoerder. 10*n is gelijk aan 10 verbindingen per knooppunt in een Cassandra-cluster met n-knooppunt. Dus als u vijf verbindingen per knooppunt per uitvoerder nodig hebt voor een Cassandra-cluster met vijf knooppunt, moet u deze configuratie instellen op 25. Wijzig deze waarde op basis van de mate van parallelle uitvoering of het aantal uitvoerders waar uw Spark-taken voor zijn geconfigureerd.
spark.cassandra.output.concurrent.writes 100 Hiermee definieert u het aantal parallelle schrijf schrijfingen dat per uitvoerder kan plaatsvinden. Omdat u 'batch.size.rows' in stelt op 1, moet u deze waarde dienovereenkomstig opschalen. Wijzig deze waarde op basis van de mate van parallellisme of de doorvoer die u wilt bereiken voor uw workload.
spark.cassandra.concurrent.reads 512 Hiermee definieert u het aantal parallelle leesingen dat per uitvoerder kan plaatsvinden. Wijzig deze waarde op basis van de mate van parallellisme of de doorvoer die u wilt bereiken voor uw workload
spark.cassandra.output.throughput_mb_per_sec Geen Definieert de totale schrijfdoorvoer per uitvoerder. Deze parameter kan worden gebruikt als een bovengrens voor de doorvoer van uw Spark-taak en deze baseren op de inrichtende doorvoer van uw Cosmos-container.
spark.cassandra.input.reads_per_sec Geen Definieert de totale leesdoorvoer per uitvoerder. Deze parameter kan worden gebruikt als een bovengrens voor de doorvoer van uw Spark-taak en deze baseren op de inrichtende doorvoer van uw Cosmos-container.
spark.cassandra.output.batch.grouping.buffer.size 1000 Hiermee definieert u het aantal batches per spark-taak dat in het geheugen kan worden opgeslagen voordat deze naar Cassandra-API
spark.cassandra.connection.keep_alive_ms 60000 Hiermee definieert u de periode tot welke ongebruikte verbindingen beschikbaar zijn.

Pas de doorvoer en mate van parallellisme van deze parameters aan op basis van de workload die u verwacht voor uw Spark-taken en de doorvoer die u hebt ingericht voor uw Cosmos DB account.

Verbinding maken met de Azure Cosmos DB Cassandra-API vanuit Spark

cqlsh

De volgende opdrachten beschrijven hoe u verbinding maakt met Azure CosmosDB Cassandra-API cqlsh. Dit is handig voor validatie wanneer u de voorbeelden in Spark doorloop.
Van Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

In het onderstaande artikel Azure Databricks cluster inrichten, clusterconfiguratie voor verbinding maken met Azure Cosmos DB Cassandra-API en verschillende voorbeeldnote notebooks die betrekking hebben op DDL-bewerkingen, DML-bewerkingen en meer.
Werken met Azure Cosmos DB Cassandra-API van Azure Databricks

2. Azure HDInsight-Spark

In het onderstaande artikel worden HDinsight-Spark-service, inrichting, clusterconfiguratie voor verbinding met Azure Cosmos DB Cassandra-API en verschillende voorbeeldnote notebooks beschreven die betrekking hebben op DDL-bewerkingen, DML-bewerkingen en meer.
Werken met Azure Cosmos DB Cassandra-API van Azure HDInsight Spark

3. Spark-omgeving in het algemeen

Hoewel de bovenstaande secties specifiek zijn voor PaaS-services op basis van Azure Spark, wordt in deze sectie elke algemene Spark-omgeving beschreven. Hieronder vindt u informatie over connectorafhankelijkheden, importen en Spark-sessieconfiguratie. De sectie Volgende stappen bevat codevoorbeelden voor DDL-bewerkingen, DML-bewerkingen en meer.

Connectorafhankelijkheden:

  1. De Maven-coördinaten toevoegen om de Cassandra-connector voor Spark op te halen
  2. Voeg de Maven-coördinaten toe voor Azure Cosmos DB helperbibliotheek voor Cassandra-API

Invoer:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Configuratie van Spark-sessie:

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

//Throughput-related. You can adjust the values as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
//spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") // Spark 2.x
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10") // Spark 3.x
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

Volgende stappen

In de volgende artikelen wordt spark-integratie met Azure Cosmos DB Cassandra-API.