Ansluta till Azure Cosmos DB Cassandra-API:et från Spark

GÄLLER för: API för Cassandra

Den här artikeln är en av en serie artiklar om Azure Cosmos DB API för Cassandra integrering från Spark. Artiklarna innehåller anslutningar, DDL-åtgärder (Data Definition Language), DML-åtgärder (Basic Data Manipulation Language) och avancerad Azure Cosmos DB API för Cassandra-integrering från Spark.

Förutsättningar

Beroenden för anslutning

  • Spark-anslutningsapp för Cassandra: Spark-anslutningsappen används för att ansluta Azure Cosmos DB API för Cassandra. Identifiera och använd den version av anslutningsappen som finns i Maven central och som är kompatibel med Spark- och Scala-versionerna av din Spark-miljö. Vi rekommenderar en miljö som stöder Spark 3.0 eller senare och Spark-anslutningsappen som är tillgänglig på maven-koordinaterna com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 . Om du använder Spark 2.x rekommenderar vi en miljö med Spark version 2.4.5 med Spark-anslutningsappen på maven-koordinaterna com.datastax.spark:spark-cassandra-connector_2.11:2.4.3 .

  • Azure Cosmos DB hjälpbibliotek för API för Cassandra: Om du använder en Spark 2.x-version behöver du förutom Spark-anslutningsappen även ett annat bibliotek med namnet azure-cosmos-cassandra-spark-helper med maven-koordinater från Azure Cosmos DB för att kunna hantera com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 hastighetsbegränsning. Det här biblioteket innehåller anpassade anslutningsfabriker och återförsöksprincipklasser.

    Återförsöksprincipen i den Azure Cosmos DB har konfigurerats för att hantera UNDANTAG med HTTP-statuskod 429("Begärandefrekvens stor"). Den Azure Cosmos DB API för Cassandra översätter dessa undantag till överbelastade fel på det interna Cassandra-protokollet och du kan försöka igen med back-offs. Eftersom Azure Cosmos DB använder en modell för etablerat dataflöde inträffar undantag för begränsning av begärandefrekvens när ingress-/egress-frekvensen ökar. Återförsöksprincipen skyddar dina Spark-jobb mot datatoppar som tillfälligt överskrider det tilldelade dataflödet för containern. Om du använder Spark 3.x-anslutningsappen krävs inte implementering av det här biblioteket.

    Anteckning

    Återförsöksprincipen kan endast skydda dina Spark-jobb mot tillfälliga toppar. Om du inte har konfigurerat tillräckligt med RU:er för att köra arbetsbelastningen är återförsöksprincipen inte tillämplig och återförsöksprincipklassen gör om undantaget.

  • Azure Cosmos DB kontoanslutningsinformation: Ditt Azure API för Cassandra kontonamn, kontoslutpunkt och nyckel.

Optimera dataflödeskonfigurationen för Spark-anslutningsappen

I nästa avsnitt visas alla relevanta parametrar för att styra dataflödet med hjälp av Spark Connector för Cassandra. För att optimera parametrar för att maximera dataflödet för Spark-jobb måste konfigurationerna , och vara korrekt konfigurerade för att undvika för mycket begränsning och back-off (vilket i sin tur kan leda till lägre spark.cassandra.output.concurrent.writes spark.cassandra.concurrent.reads spark.cassandra.input.reads_per_sec dataflöde).

Det optimala värdet för dessa konfigurationer beror på 4 faktorer:

  • Mängden dataflöde (enheter för begäran) som konfigurerats för den tabell som data matas in i.
  • Antalet arbetare i sparkklustret.
  • Antalet utförare som har konfigurerats för spark-jobbet (som kan styras med eller spark.cassandra.connection.connections_per_executor_max spark.cassandra.connection.remoteConnectionsPerExecutor beroende på Spark-version)
  • Den genomsnittliga svarstiden för varje begäran till Cosmos DB om du är samplacerad i samma datacenter. Anta att värdet är 10 ms för skrivningar och 3 ms för läsningar.

Om vi till exempel har 5 arbetare och värdet = 1 och värdet = 1 så har vi 5 arbetare som samtidigt skriver till tabellen, var och en med spark.cassandra.output.concurrent.writes spark.cassandra.connection.remoteConnectionsPerExecutor 1 tråd. Om det tar 10 ms att utföra en enda skrivning kan vi skicka 100 begäranden (1 000 millisekunder dividerat med 10) per sekund och per tråd. Med 5 arbetare skulle detta vara 500 skrivningar per sekund. Med en genomsnittlig kostnad på 5 enheter för begäran (RU:er) per skrivning behöver måltabellen minst 2 500 etablerade enheter för begäran (5 RU:er x 500 skrivningar per sekund).

Att öka antalet utförare kan öka antalet trådar i ett visst jobb, vilket i sin tur kan öka dataflödet. Den exakta effekten av detta kan dock variera beroende på jobbet, medan det är mer deterministiskt att kontrollera dataflödet med antalet arbetare. Du kan också fastställa den exakta kostnaden för en viss begäran genom att profilera den för att få kostnaden för enheter för begäran (RU). Detta hjälper dig att bli mer exakt när du etablerar dataflöde för din tabell eller ditt nyckelutrymme. Ta en titt på vår artikel här för att förstå hur du hämtar avgifter för enheter för begäran på begärandenivå.

Skala dataflöde i databasen

Cassandra Spark-anslutningsappen kommer att mätta dataflödet i Azure Cosmos DB mycket effektivt. Därför måste du, även med effektiva återförsök, se till att du har tillräckligt med dataflöde (RU:er) etablerat på tabell- eller nyckelutrymmesnivå för att förhindra hastighetsbegränsningsrelaterade fel. Den minsta inställningen på 400 RU:er i en viss tabell eller ett visst nyckelutrymme räcker inte. Även vid minsta konfigurationsinställningar för dataflöde kan Spark-anslutningsappen skriva med en hastighet som motsvarar cirka 6 000 enheter för programbegäran eller mer.

Om RU-inställningen som krävs för dataförflyttning med Spark är högre än vad som krävs för arbetsbelastningen med stabilt tillstånd kan du enkelt skala upp och ned dataflödet systematiskt i Azure Cosmos DB för att uppfylla behoven för din arbetsbelastning under en viss tidsperiod. Läs vår artikel om elastisk skalning API för Cassandra att förstå de olika alternativen för att skala programmatiskt och dynamiskt.

Anteckning

Riktlinjerna ovan förutsätter en någorlunda enhetlig fördelning av data. Om du har en betydande skeva data (det vill säga ett ovanligt stort antal läsningar/skrivningar till samma partitionsnyckelvärde) kan du fortfarande uppleva flaskhalsar, även om du har ett stort antal enheter för programbegäran etablerade i tabellen. Enheter för programbegäran delas jämnt mellan fysiska partitioner, och tung dataskevheter kan orsaka en flaskhals av begäranden till en enda partition.

Konfigurationsparametrar för dataflöde för Spark-anslutningsappen

I följande tabell visas Azure Cosmos DB API för Cassandra specifika konfigurationsparametrar för dataflöde som tillhandahålls av anslutningsappen. En detaljerad lista över alla konfigurationsparametrar finns på konfigurationsreferenssidan för Spark Cassandra Connector GitHub lagringsplats.

Egenskapens namn Standardvärdet Beskrivning
spark.cassandra.output.batch.size.rows 1 Antal rader per enskild batch. Ange den här parametern till 1. Den här parametern används för att uppnå högre dataflöde för tunga arbetsbelastningar.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Ingen Maximalt antal anslutningar per nod per utförare. 10*n motsvarar 10 anslutningar per nod i ett Cassandra-kluster med n noder. Så om du behöver 5 anslutningar per nod per utförare för ett Cassandra-kluster med 5 noder bör du ange den här konfigurationen till 25. Ändra det här värdet baserat på graden av parallellitet eller antalet utförare som dina Spark-jobb har konfigurerats för.
spark.cassandra.output.concurrent.writes 100 Definierar antalet parallella skrivningar som kan ske per utförare. Eftersom du anger "batch.size.rows" till 1 skalar du upp det här värdet i enlighet med detta. Ändra det här värdet baserat på graden av parallellitet eller det dataflöde som du vill uppnå för din arbetsbelastning.
spark.cassandra.concurrent.reads 512 Definierar antalet parallella läsningar som kan ske per utförare. Ändra det här värdet baserat på graden av parallellitet eller det dataflöde som du vill uppnå för din arbetsbelastning
spark.cassandra.output.throughput_mb_per_sec Ingen Definierar det totala skrivgenomflödet per utförare. Den här parametern kan användas som en övre gräns för spark-jobbets dataflöde och basera den på det etablerade dataflödet i Cosmos-containern.
spark.cassandra.input.reads_per_sec Ingen Definierar det totala läsgenomflödet per utförare. Den här parametern kan användas som en övre gräns för spark-jobbets dataflöde och basera den på det etablerade dataflödet i Cosmos-containern.
spark.cassandra.output.batch.grouping.buffer.size 1000 Definierar antalet batchar per enskild Spark-uppgift som kan lagras i minnet innan den skickas till API för Cassandra
spark.cassandra.connection.keep_alive_ms 60000 Definierar hur lång tid som oanvända anslutningar är tillgängliga.

Justera dataflödet och graden av parallellitet för dessa parametrar baserat på den arbetsbelastning som du förväntar dig för dina Spark-jobb och det dataflöde som du har etablerat för ditt Cosmos DB konto.

Ansluta till Azure Cosmos DB Cassandra-API:et från Spark

cqlsh

Följande kommandon beskriver hur du ansluter till Azure CosmosDB-API för Cassandra från cqlsh. Detta är användbart för validering när du kör exemplen i Spark.
Från Linux/Unix/Mac:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

Artikeln nedan beskriver Azure Databricks klusteretablering, klusterkonfiguration för att ansluta till Azure Cosmos DB API för Cassandra och flera exempelanteckningsböcker som täcker DDL-åtgärder, DML-åtgärder med mera.
Arbeta med Azure Cosmos DB API för Cassandra från Azure Databricks

2. Azure HDInsight-Spark

Artikeln nedan beskriver HDinsight-Spark, etablering, klusterkonfiguration för att ansluta till Azure Cosmos DB API för Cassandra och flera exempelanteckningsböcker som omfattar DDL-åtgärder, DML-åtgärder med mera.
Arbeta med Azure Cosmos DB API för Cassandra från Azure HDInsight-Spark

3. Spark-miljö i allmänhet

Avsnitten ovan var specifika för Azure Spark-baserade PaaS-tjänster, men det här avsnittet beskriver alla allmänna Spark-miljöer. Anslutningsberoenden, importer och Spark-sessionskonfiguration beskrivs nedan. Avsnittet Nästa steg beskriver kodexempel för DDL-åtgärder, DML-åtgärder med mera.

Anslutningsberoenden:

  1. Lägg till maven-koordinaterna för att hämta Cassandra-anslutningsappen för Spark
  2. Lägg till maven-koordinaterna för Azure Cosmos DB hjälpbibliotek för API för Cassandra

Import:

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Konfiguration av Spark-session:

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

//Throughput-related. You can adjust the values as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
//spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") // Spark 2.x
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10") // Spark 3.x
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

Nästa steg

Följande artiklar visar Spark-integrering med Azure Cosmos DB API för Cassandra.