Optimalizace úloh Apache Sparku ve službě Azure Synapse Analytics
Zjistěte, jak optimalizovat konfiguraci clusteru Apache Spark pro konkrétní úlohu. Nejběžnější výzvou je zatížení paměti kvůli nesprávným konfiguracím (zejména kvůli nesprávně nastavené velikosti exekutorů), dlouhotrvajícím operacím a úlohám, které vedou ke kartézským operacím. Úlohy můžete urychlit pomocí vhodného ukládání do mezipaměti a povolením nerovnoměrné distribuce dat. Pro zajištění nejlepšího výkonu monitorujte a zkontrolujte dlouhotrvající spouštění úloh Sparku s využitím prostředků.
Následující části popisují běžné optimalizace úloh Sparku a jejich doporučení.
Volba abstrakce dat
Starší verze Sparku používají sady RDD k abstrakci dat, Spark 1.3 a 1.6 zavedl datové rámce a datové sady v uvedeném pořadí. Zvažte následující relativní výhody:
- Datové rámce
- Nejlepší volba ve většině situací.
- Poskytuje optimalizaci dotazů prostřednictvím Catalyst.
- Generování kódu v celé fázi.
- Přímý přístup do paměti.
- Nízká režijní náklady na uvolňování paměti
- Nejsou tak vhodné pro vývojáře jako datové sady, protože neexistují žádné kontroly kompilace ani programování objektů domény.
- Soubory
- Vhodné ve složitých kanálech ETL, kde je dopad na výkon přijatelný.
- Není dobré v agregacích, kde může být dopad na výkon značný.
- Poskytuje optimalizaci dotazů prostřednictvím Catalyst.
- Vhodné pro vývojáře tím, že poskytuje programování objektů domény a kontroly v době kompilace.
- Přidá režii serializace/deserializace.
- Vysoká režie uvolňování paměti.
- Přeruší generování kódu celé fáze.
- Sady RDD
- Sady RDD nemusíte používat, pokud nepotřebujete vytvořit novou vlastní sadu RDD.
- Žádná optimalizace dotazů prostřednictvím Catalyst.
- Žádné generování kódu celé fáze.
- Vysoká režie uvolňování paměti.
- Musí používat starší verze rozhraní API Sparku 1.x.
Použití optimálního formátu dat
Spark podporuje mnoho formátů, jako jsou csv, json, xml, parquet, orc a avro. Spark je možné rozšířit tak, aby podporoval mnoho dalších formátů s externími zdroji dat – další informace najdete v tématu Balíčky Apache Spark.
Nejlepší formát pro zajištění výkonu je parquet s kompresí snappy, což je výchozí hodnota ve Sparku 2.x. Parquet ukládá data ve sloupcových formátech a je vysoce optimalizovaný ve Sparku. Kromě toho může komprese snappy způsobit větší soubory, než je například komprese gzip. Vzhledem k dělitelné povaze těchto souborů se dekomprimují rychleji.
Použití mezipaměti
Spark poskytuje vlastní nativní mechanismy ukládání do mezipaměti, které je možné použít různými metodami, jako .persist()
jsou , .cache()
a CACHE TABLE
. Toto nativní ukládání do mezipaměti je efektivní u malých datových sad a také v kanálech ETL, kde potřebujete ukládat do mezipaměti mezilehlé výsledky. Nativní ukládání do mezipaměti Sparku ale v současné době nefunguje dobře s dělením, protože tabulka uložená v mezipaměti neuchovává data dělení.
Efektivní využití paměti
Spark funguje tak, že umísťuje data do paměti, takže správa paměťových prostředků je klíčovým aspektem optimalizace provádění úloh Sparku. Existuje několik technik, které můžete použít k efektivnímu využití paměti clusteru.
- Ve strategii dělení preferujte menší datové oddíly a účty pro velikost, typy a distribuci dat.
- Zvažte novější, efektivnější serializaci dat Kryo, spíše než výchozí serializace Java.
- Monitorování a ladění nastavení konfigurace Sparku
Pro referenci jsou na následujícím obrázku znázorněny struktura paměti Sparku a některé parametry paměti exekutoru klíčů.
Důležité informace o paměti Sparku
Apache Spark v Azure Synapse používá YARN Apache Hadoop YARN, YARN řídí maximální součet paměti využité všemi kontejnery na každém uzlu Sparku. Následující diagram znázorňuje klíčové objekty a jejich vztahy.
Pokud chcete vyřešit zprávy typu Nedostatek paměti, zkuste:
- Projděte si náhodné prohazování správy DAG. Redukce snížením počtu zdrojových dat na straně mapy, před rozdělením zdrojových dat do oddílů (nebo rozdělením do oddílů), maximalizací jednotlivých náhodného prohazování a snížením množství odesílaných dat.
- Přednost s
ReduceByKey
pevným limitem paměti proGroupByKey
, který poskytuje agregace, práci s okny a další funkce, ale má limit paměti bez vazby. - Upřednostněte
TreeReduce
, která provádí více práce na exekutorech nebo oddílech, předReduce
, která všechno funguje na ovladači. - Místo objektů RDD nižší úrovně využijte datové rámce.
- Vytvořte objekty ComplexType, které zapouzdřují akce, například "Prvních N", různé agregace nebo operace s okny.
Optimalizace serializace dat
Úlohy Sparku se distribuují, takže pro nejlepší výkon je důležitá odpovídající serializace dat. Pro Spark existují dvě možnosti serializace:
- Výchozí je serializace Java.
- Serializace Kryo je novější formát a může vést k rychlejší a kompaktnější serializaci než Java. Kryo vyžaduje, abyste zaregistrovali třídy v programu, a to ještě nepodporuje všechny serializovatelné typy.
Použití rozdělování do kbelíků
Vytváření kontejnerů je podobné dělení dat, ale každý kbelík může obsahovat sadu hodnot sloupců, a ne jenom jednu. Dělení do oddílů funguje dobře u velkých (v milionech a více) počtech hodnot, jako jsou identifikátory produktů. Kbelík se určí hashováním klíče kbelíku řádku. Tabulky v intervalech nabízejí jedinečné optimalizace, protože ukládají metadata o tom, jak byly roztříděny a seřazeny.
Mezi pokročilé funkce dělení do kontejnerů patří:
- Optimalizace dotazů na základě rozdělení metainformací.
- Optimalizované agregace.
- Optimalizovaná spojení.
Můžete použít dělení na oddíly a dělení na oddíly současně.
Optimalizace spojení a náhodného prohazování metodou shuffle
Pokud máte pomalé úlohy ve spojení nebo náhodném prohazování, příčinou je pravděpodobně nerovnoměrná distribuce dat, což je asymetrie v datech úlohy. Například úloha mapování může trvat 20 sekund, ale spuštění úlohy, ve které jsou data spojená nebo prohazovaná, trvá hodiny. Pokud chcete vyřešit nerovnoměrnou distribuci dat, měli byste celý klíč zasolit nebo použít izolovanou sůl jenom pro některé podmnožinu klíčů. Pokud používáte izolovanou sůl, měli byste dále filtrovat a izolovat podmnožinu slaných klíčů ve spojeních s mapami. Další možností je nejprve zavést sloupec kbelíku a předběžnou agregaci v kontejnerech.
Dalším faktorem, který způsobuje pomalé spojení, může být typ spojení. Spark ve výchozím nastavení používá SortMerge
typ spojení. Tento typ spojení je nejvhodnější pro velké datové sady, ale jinak je výpočetně náročný, protože před sloučením musí nejprve seřadit levou a pravou stranu dat.
Spojení Broadcast
je nejvhodnější pro menší datové sady nebo tam, kde je jedna strana spojení mnohem menší než druhá strana. Tento typ spojení vysílá jednu stranu všem exekutorům, a proto obecně vyžaduje více paměti pro všesměrová vysílání.
Typ spojení v konfiguraci můžete změnit nastavením spark.sql.autoBroadcastJoinThreshold
nebo můžete nastavit nápovědu ke spojení pomocí rozhraní API datového rámce (dataframe.join(broadcast(df2))
).
// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)
// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
createOrReplaceTempView("V_JOIN")
sql("SELECT col1, col2 FROM V_JOIN")
Pokud používáte tabulky s rozdělením do intervalů, máte třetí typ spojení– Merge
spojení. Správně předem rozdělená a předem seřazená datová sada přeskočí náročnou SortMerge
fázi řazení ze spojení.
Pořadí spojení záleží, zejména u složitějších dotazů. Začněte s nejselektivnějšími spojeními. Pokud je to možné, přesouvejte spojení, která zvyšují počet řádků po agregaci.
Pokud chcete spravovat paralelismus pro kartézská spojení, můžete přidat vnořené struktury, práci s okny a možná přeskočit jeden nebo více kroků v úloze Sparku.
Vyberte správnou velikost exekutoru.
Při rozhodování o konfiguraci exekutoru vezměte v úvahu režijní náklady na uvolňování paměti Javy.
Faktory pro zmenšení velikosti exekutoru:
- Zmenšete velikost haldy pod 32 GB, abyste zachovali režii uvolňování paměti < o 10 %.
- Snižte počet jader, abyste zachovali režii uvolňování < paměti o 10 %.
Faktory pro zvětšení velikosti exekutoru:
- Snižte režii komunikace mezi exekutory.
- Snižte počet otevřených připojení mezi exekutory (N2) na větších clusterech (>100 exekutorů).
- Zvětšete velikost haldy, aby se pojala pro úlohy náročné na paměť.
- Volitelné: Snižte režii paměti na exekutor.
- Volitelné: Zvýšení využití a souběžnosti nadměrným odběrem procesoru
Obecně platí při výběru velikosti exekutoru:
- Začněte s 30 GB na exekutor a distribuujte dostupná jádra počítače.
- Zvyšte počet jader exekutoru pro větší clustery (> 100 exekutorů).
- Upravte velikost na základě zkušebních spuštění a předchozích faktorů, jako je režie GC.
Při spouštění souběžných dotazů zvažte následující:
- Začněte s 30 GB na exekutor a všechna jádra počítače.
- Vytváření více paralelních aplikací Sparku nadměrným odběrem procesoru (přibližně o 30 % zvýšení latence).
- Distribuujte dotazy napříč paralelními aplikacemi.
- Upravte velikost na základě zkušebních spuštění a předchozích faktorů, jako je režie GC.
Sledujte výkon dotazů z hlediska odlehlých hodnot nebo jiných problémů s výkonem– podívejte se na zobrazení časové osy, graf SQL, statistiky úloh atd. Někdy je jeden nebo několik exekutorů pomalejší než ostatní a provádění úkolů trvá mnohem déle. K tomu často dochází ve větších clusterech (> 30 uzlů). V takovém případě rozdělte práci na větší počet úkolů, aby plánovač mohl kompenzovat pomalé úkoly.
Například mít alespoň dvakrát tolik úkolů, než je počet jader exekutoru v aplikaci. Můžete také povolit spekulativní provádění úloh pomocí conf: spark.speculation = true
.
Optimalizace spouštění úloh
- Podle potřeby do mezipaměti, například pokud data používáte dvakrát, pak je do mezipaměti.
- Proměnné vysílání do všech exekutorů Proměnné jsou serializovány pouze jednou, což má za následek rychlejší vyhledávání.
- Použijte fond vláken na ovladači, což má za následek rychlejší provoz pro mnoho úloh.
Klíčem k výkonu dotazů Sparku 2.x je modul Tungsten, který závisí na generování kódu celé fáze. V některých případech může být generování kódu v celé fázi zakázané.
Pokud například ve výrazu SortAggregate
agregace použijete neměnitelný typ (string
), zobrazí se místo HashAggregate
. Pokud chcete například zlepšit výkon, vyzkoušejte následující postup a potom znovu povolte generování kódu:
MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))