Připojení k rozhraní API Cassandra služby Azure Cosmos DB ze Sparku
PLATÍ PRO:
rozhraní API Cassandra
tento článek je jedním z řady článků o Azure Cosmos DB rozhraní API Cassandra integraci z sparku. články zahrnují připojení, operace jazyka DDL (data Definition language), operace DML (basic data prohledává jazyk) a rozšířené Azure Cosmos DB rozhraní API Cassandra integraci z sparku.
Požadavky
Zajištění výběru prostředí Spark [Azure Databricks | Azure HDInsight – Spark | Ostatní].
Závislosti pro připojení
Konektor Spark pro Cassandra: konektor Spark slouží k připojení k Azure Cosmos DB rozhraní API Cassandra. Identifikujte a používejte verzi konektoru, která se nachází v Maven Central , která je kompatibilní s verzemi Spark a Scala vašeho prostředí Spark. Doporučujeme prostředí, které podporuje Spark 3,0 nebo vyšší, a konektor Spark dostupný na mavench souřadnicích
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0. Pokud používáte Spark 2. x, doporučujeme prostředí s Sparkem verze 2.4.5 pomocí konektoru Spark na souřadnicích Mavencom.datastax.spark:spark-cassandra-connector_2.11:2.4.3.Azure Cosmos DB pomocná knihovna pro rozhraní API Cassandra: pokud používáte verzi spark 2. x a pak kromě konektoru Spark, budete potřebovat další knihovnu s názvem azure-cosmos-cassandra-Spark-helper s maven souřadnicemi
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0od Azure Cosmos DB, aby bylo možné zpracovat omezení rychlosti. Tato knihovna obsahuje třídy zásad vlastního vytváření připojení a opakování.zásady opakování v Azure Cosmos DB jsou nakonfigurované tak, aby zpracovávala stavový kód HTTP 429 ("četnost požadavků"). Azure Cosmos DB rozhraní API Cassandra tyto výjimky překládá na chyby přetížení v nativním protokolu Cassandra a můžete je opakovat pomocí back-startů. vzhledem k tomu, že Azure Cosmos DB používá zřízený model propustnosti, při nárůstu počtu vstupních/výstupních rychlostí dojde k výjimkám omezení rychlosti požadavků. Zásady opakování chrání vaše úlohy Spark proti špičkám dat, které dokončují propustnost přidělené pro váš kontejner. Pokud používáte konektor Spark 3. x, implementace této knihovny není nutná.
Poznámka
Zásady opakování můžou chránit vaše úlohy Sparku jenom v momentech pouze těch špiček. Pokud jste nenakonfigurovali dostatek ru potřebných ke spuštění úlohy, pak se zásady opakování nepoužijí a třída zásad opakování vyvolá výjimku znovu.
podrobnosti připojení Azure Cosmos DB účtu: Název účtu Azure rozhraní API Cassandra, koncový bod účtu a klíč.
Optimalizace konfigurace propustnosti Spark Connectoru
V další části jsou uvedeny všechny relevantní parametry pro řízení propustnosti pomocí konektoru Spark pro Cassandra. Aby bylo možné optimalizovat parametry pro maximalizaci propustnosti úloh Spark, spark.cassandra.output.concurrent.writes spark.cassandra.concurrent.reads spark.cassandra.input.reads_per_sec musí být správně nakonfigurovány konfigurace, a, aby nedošlo k příliš velkému omezení a bylo možné je snížit (což může vést k nižší propustnosti).
Optimální hodnota těchto konfigurací závisí na 4 faktorech:
- Množství propustnosti (jednotky požadavků) nakonfigurované pro tabulku, do které se data ingestují.
- Počet pracovních procesů v clusteru Spark.
- Počet prováděcích modulů konfigurovaných pro vaši úlohu Spark (který se dá řídit pomocí
spark.cassandra.connection.connections_per_executor_maxnebospark.cassandra.connection.remoteConnectionsPerExecutorv závislosti na verzi Sparku) - Průměrná latence jednotlivých požadavků na Cosmos DB, pokud jste společně umístěného ve stejném datovém centru. Předpokládat, že tato hodnota je 10 MS pro zápisy a 3 MS pro čtení.
Například pokud máme 5 pracovních procesů a hodnotu spark.cassandra.output.concurrent.writes = 1 a hodnotu = spark.cassandra.connection.remoteConnectionsPerExecutor 1, potom máme 5 pracovních procesů, které jsou souběžně zapsány do tabulky, každý s 1 vláknem. Pokud k provedení jediného zápisu trvá 10 MS, můžeme poslat žádosti 100 (1000 MS) za sekundu za vlákno. S 5 pracovními procesy by to bylo 500 zápisů za sekundu. V rámci průměrné ceny 5 jednotek žádostí (ru) na zápis musí cílová tabulka vyžadovat minimálně 2500 jednotek žádostí (5 ru x 500 zápisů za sekundu).
Zvýšení počtu prováděcích modulů může zvýšit počet vláken v dané úloze, což může zvýšit propustnost. Přesný dopad tohoto může však být proměnlivý v závislosti na úloze a řízení propustnosti s počtem pracovních procesů je deterministické. Můžete také určit přesné náklady na daný požadavek tím, že profilujte, abyste získali poplatek za jednotku žádosti (RU). To vám pomůže přesnější při zřizování propustnosti pro vaši tabulku nebo pro místo na disku. Podíváme se na náš článek, abyste se seznámili s tím, jak získat poplatky za jednotky žádostí na úrovni jednotlivých žádostí.
Škálování propustnosti v databázi
konektor Spark Cassandra bude efektivně nasycen propustnost v Azure Cosmos DB. V důsledku toho bude nutné zajistit, abyste měli jistotu, že máte dostatečnou propustnost (ru), která je zřízená na úrovni tabulky nebo prostoru, aby nedocházelo k chybám souvisejícím s omezením rychlosti. Minimální nastavení 400 ru v dané tabulce nebo v prostoru klíčů nebude dostatečné. I při nastavení konfigurace minimální propustnosti může konektor Spark zapisovat rychlostí odpovídající přibližně 6000 jednotkám žádosti nebo více.
pokud je nastavení RU vyžadované pro přesun dat pomocí sparku vyšší než to, co je potřeba pro zatížení s ustáleným stavem, můžete snadno škálovat propustnost nahoru a dolů v Azure Cosmos DB tak, aby splňovala požadavky vaší zátěže za dané časové období. Přečtěte si náš článek o elastickém škálování v rozhraní API Cassandra , abyste pochopili různé možnosti pro programové a dynamické škálování.
Poznámka
Výše uvedené pokyny předpokládají rozumně jednotnou distribuci dat. Pokud máte v datech významné zkosení (tj. inordinately velký počet čtení/zápisů na stejnou hodnotu klíče oddílu), může stále docházet k kritickým bodům, a to i v případě, že máte v tabulce zřízen velký počet jednotek požadavků . Jednotky žádostí jsou rozdělené rovnoměrně mezi fyzické oddíly a vysoké zešikmení dat může způsobit kritické body požadavků na jeden oddíl.
Parametry konfigurace propustnosti Spark Connectoru
v následující tabulce jsou uvedeny Azure Cosmos DB parametry konfigurace propustnosti specifické pro rozhraní API Cassandra od konektoru. podrobný seznam všech parametrů konfigurace najdete v tématu referenční dokumentace ke konfiguraci konektoru Spark Cassandra GitHub úložiště.
| Název vlastnosti | Výchozí hodnota | Popis |
|---|---|---|
| spark.cassandra.output.batch. Size. Rows | 1 | Počet řádků na jednu dávku Nastavte tento parametr na hodnotu 1. Tento parametr slouží k dosažení vyšší propustnosti pro náročné úlohy. |
| spark.cassandra.connection.connections_per_executor_max (Spark 2. x) Spark. Cassandra. Connection. remoteConnectionsPerExecutor (Spark 3. x) | Žádné | Maximální počet připojení na uzel na jeden prováděcí modul. 10 * n je ekvivalentem 10 připojení na uzel v clusteru Cassandra n-Node. Pokud tedy budete potřebovat 5 připojení na uzel na jeden prováděcí modul pro cluster Cassandra s pěti uzly, měli byste nastavit tuto konfiguraci na 25. Změňte tuto hodnotu na základě stupně paralelismu nebo počtu prováděcích modulů, pro které jsou úlohy Spark nakonfigurované. |
| Spark. Cassandra. Output. souběžné. zápisy | 100 | Definuje počet paralelních zápisů, které mohou být provedeny na vykonavateli. Vzhledem k tomu, že jste nastavili "Batch. Size. Rows" na hodnotu 1, nezapomeňte odpovídajícím způsobem škálovat tuto hodnotu. Tuto hodnotu upravte na základě úrovně paralelismu nebo propustnosti, kterou chcete pro své zatížení dosáhnout. |
| Spark. Cassandra. souběžné. čtení | 512 | Definuje počet paralelních čtení, ke kterým může dojít na vykonavatel. Změňte tuto hodnotu na základě úrovně paralelismu nebo propustnosti, kterou chcete pro své zatížení dosáhnout. |
| spark.cassandra.output.throughput_mb_per_sec | Žádné | Definuje celkovou propustnost zápisu na vykonavatele. tento parametr se dá použít jako horní limit pro propustnost úloh sparku a založit ho na zřízené propustnosti vašeho kontejneru Cosmos. |
| spark.cassandra.input.reads_per_sec | Žádné | Definuje celkovou propustnost čtení na vykonavatele. tento parametr se dá použít jako horní limit pro propustnost úloh sparku a založit ho na zřízené propustnosti vašeho kontejneru Cosmos. |
| spark.cassandra.output.batch. Grouping. Buffer. Size | 1000 | Definuje počet dávek na jednu úlohu Spark, které mohou být před odesláním do rozhraní API Cassandra uloženy v paměti. |
| spark.cassandra.connection.keep_alive_ms | 60000 | Definuje časovou prodlevu, po kterou nejsou k dispozici žádná nepoužívaná připojení. |
nastavte propustnost a stupeň paralelismu těchto parametrů na základě zatížení, které očekáváte pro úlohy spark, a propustnosti, kterou jste zřídili pro váš účet Cosmos DB.
Připojení k rozhraní API Cassandra služby Azure Cosmos DB ze Sparku
cqlsh
Následující příkazy podrobně popisují, jak se připojit k Azure CosmosDB rozhraní API Cassandra z cqlsh. To je užitečné při ověřování při spuštění ukázek ve Sparku.
Ze systému Linux/UNIX/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
níže uvedený článek popisuje Azure Databricks zřízení clusteru, konfiguraci clusteru pro připojení k Azure Cosmos DB rozhraní API Cassandra a několik ukázkových poznámkových bloků, které zahrnují operace DDL, operace DML a další.
práce s Azure Cosmos DB rozhraní API Cassandra z Azure Databricks
2. HDInsight-Spark Azure
níže uvedený článek popisuje HDinsight-Spark služby, zřizování, konfiguraci clusteru pro připojení k Azure Cosmos DB rozhraní API Cassandra a několik ukázkových poznámkových bloků, které zahrnují operace DDL, operace DML a další.
práce s Azure Cosmos DB rozhraní API Cassandra z Azure HDInsight – Spark
3. sparkové prostředí obecně
I když byly výše uvedené části specifické pro služby PaaS založené na Azure Spark, Tato část se zabývá všemi obecnými prostředími Spark. Závislosti konektorů, importů a konfigurace relací Spark jsou podrobně popsané níže. Oddíl "další kroky" obsahuje ukázky kódu pro operace DDL, operace DML a další.
Závislosti konektoru:
- Přidáním souřadnic Maven získáte konektor Cassandra pro Spark .
- přidání souřadnic maven pro pomocnou knihovnu Azure Cosmos DB pro rozhraní API Cassandra
Objem
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Konfigurace relace Spark:
//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
//Throughput-related. You can adjust the values as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
//spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") // Spark 2.x
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10") // Spark 3.x
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")
Další kroky
následující články ukazují integraci sparku s Azure Cosmos DB rozhraní API Cassandra.