Optimalizace úloh Apache Sparku ve službě Azure Synapse Analytics

Zjistěte, jak optimalizovat konfiguraci clusteru Apache Spark pro konkrétní úlohu. Nejběžnější výzvou je zatížení paměti kvůli nesprávným konfiguracím (zejména kvůli nesprávně nastavené velikosti exekutorů), dlouhotrvajícím operacím a úlohám, které vedou ke kartézským operacím. Úlohy můžete urychlit pomocí vhodného ukládání do mezipaměti a povolením nerovnoměrné distribuce dat. Pro zajištění nejlepšího výkonu monitorujte a zkontrolujte dlouhotrvající spouštění úloh Sparku s využitím prostředků.

Následující části popisují běžné optimalizace úloh Sparku a jejich doporučení.

Volba abstrakce dat

Starší verze Sparku používají sady RDD k abstrakci dat, Spark 1.3 a 1.6 zavedl datové rámce a datové sady v uvedeném pořadí. Zvažte následující relativní výhody:

  • Datové rámce
    • Nejlepší volba ve většině situací.
    • Poskytuje optimalizaci dotazů prostřednictvím Catalyst.
    • Generování kódu v celé fázi.
    • Přímý přístup do paměti.
    • Nízká režijní náklady na uvolňování paměti
    • Nejsou tak vhodné pro vývojáře jako datové sady, protože neexistují žádné kontroly kompilace ani programování objektů domény.
  • Soubory
    • Vhodné ve složitých kanálech ETL, kde je dopad na výkon přijatelný.
    • Není dobré v agregacích, kde může být dopad na výkon značný.
    • Poskytuje optimalizaci dotazů prostřednictvím Catalyst.
    • Vhodné pro vývojáře tím, že poskytuje programování objektů domény a kontroly v době kompilace.
    • Přidá režii serializace/deserializace.
    • Vysoká režie uvolňování paměti.
    • Přeruší generování kódu celé fáze.
  • Sady RDD
    • Sady RDD nemusíte používat, pokud nepotřebujete vytvořit novou vlastní sadu RDD.
    • Žádná optimalizace dotazů prostřednictvím Catalyst.
    • Žádné generování kódu celé fáze.
    • Vysoká režie uvolňování paměti.
    • Musí používat starší verze rozhraní API Sparku 1.x.

Použití optimálního formátu dat

Spark podporuje mnoho formátů, jako jsou csv, json, xml, parquet, orc a avro. Spark je možné rozšířit tak, aby podporoval mnoho dalších formátů s externími zdroji dat – další informace najdete v tématu Balíčky Apache Spark.

Nejlepší formát pro zajištění výkonu je parquet s kompresí snappy, což je výchozí hodnota ve Sparku 2.x. Parquet ukládá data ve sloupcových formátech a je vysoce optimalizovaný ve Sparku. Kromě toho může komprese snappy způsobit větší soubory, než je například komprese gzip. Vzhledem k dělitelné povaze těchto souborů se dekomprimují rychleji.

Použití mezipaměti

Spark poskytuje vlastní nativní mechanismy ukládání do mezipaměti, které je možné použít různými metodami, jako .persist()jsou , .cache()a CACHE TABLE. Toto nativní ukládání do mezipaměti je efektivní u malých datových sad a také v kanálech ETL, kde potřebujete ukládat do mezipaměti mezilehlé výsledky. Nativní ukládání do mezipaměti Sparku ale v současné době nefunguje dobře s dělením, protože tabulka uložená v mezipaměti neuchovává data dělení.

Efektivní využití paměti

Spark funguje tak, že umísťuje data do paměti, takže správa paměťových prostředků je klíčovým aspektem optimalizace provádění úloh Sparku. Existuje několik technik, které můžete použít k efektivnímu využití paměti clusteru.

  • Ve strategii dělení preferujte menší datové oddíly a účty pro velikost, typy a distribuci dat.
  • Zvažte novější, efektivnější serializaci dat Kryo, spíše než výchozí serializace Java.
  • Monitorování a ladění nastavení konfigurace Sparku

Pro referenci jsou na následujícím obrázku znázorněny struktura paměti Sparku a některé parametry paměti exekutoru klíčů.

Důležité informace o paměti Sparku

Apache Spark v Azure Synapse používá YARN Apache Hadoop YARN, YARN řídí maximální součet paměti využité všemi kontejnery na každém uzlu Sparku. Následující diagram znázorňuje klíčové objekty a jejich vztahy.

Správa paměti Sparku YARN

Pokud chcete vyřešit zprávy typu Nedostatek paměti, zkuste:

  • Projděte si náhodné prohazování správy DAG. Redukce snížením počtu zdrojových dat na straně mapy, před rozdělením zdrojových dat do oddílů (nebo rozdělením do oddílů), maximalizací jednotlivých náhodného prohazování a snížením množství odesílaných dat.
  • Přednost s ReduceByKey pevným limitem paměti pro GroupByKey, který poskytuje agregace, práci s okny a další funkce, ale má limit paměti bez vazby.
  • Upřednostněte TreeReduce, která provádí více práce na exekutorech nebo oddílech, před Reduce, která všechno funguje na ovladači.
  • Místo objektů RDD nižší úrovně využijte datové rámce.
  • Vytvořte objekty ComplexType, které zapouzdřují akce, například "Prvních N", různé agregace nebo operace s okny.

Optimalizace serializace dat

Úlohy Sparku se distribuují, takže pro nejlepší výkon je důležitá odpovídající serializace dat. Pro Spark existují dvě možnosti serializace:

  • Výchozí je serializace Java.
  • Serializace Kryo je novější formát a může vést k rychlejší a kompaktnější serializaci než Java. Kryo vyžaduje, abyste zaregistrovali třídy v programu, a to ještě nepodporuje všechny serializovatelné typy.

Použití rozdělování do kbelíků

Vytváření kontejnerů je podobné dělení dat, ale každý kbelík může obsahovat sadu hodnot sloupců, a ne jenom jednu. Dělení do oddílů funguje dobře u velkých (v milionech a více) počtech hodnot, jako jsou identifikátory produktů. Kbelík se určí hashováním klíče kbelíku řádku. Tabulky v intervalech nabízejí jedinečné optimalizace, protože ukládají metadata o tom, jak byly roztříděny a seřazeny.

Mezi pokročilé funkce dělení do kontejnerů patří:

  • Optimalizace dotazů na základě rozdělení metainformací.
  • Optimalizované agregace.
  • Optimalizovaná spojení.

Můžete použít dělení na oddíly a dělení na oddíly současně.

Optimalizace spojení a náhodného prohazování metodou shuffle

Pokud máte pomalé úlohy ve spojení nebo náhodném prohazování, příčinou je pravděpodobně nerovnoměrná distribuce dat, což je asymetrie v datech úlohy. Například úloha mapování může trvat 20 sekund, ale spuštění úlohy, ve které jsou data spojená nebo prohazovaná, trvá hodiny. Pokud chcete vyřešit nerovnoměrnou distribuci dat, měli byste celý klíč zasolit nebo použít izolovanou sůl jenom pro některé podmnožinu klíčů. Pokud používáte izolovanou sůl, měli byste dále filtrovat a izolovat podmnožinu slaných klíčů ve spojeních s mapami. Další možností je nejprve zavést sloupec kbelíku a předběžnou agregaci v kontejnerech.

Dalším faktorem, který způsobuje pomalé spojení, může být typ spojení. Spark ve výchozím nastavení používá SortMerge typ spojení. Tento typ spojení je nejvhodnější pro velké datové sady, ale jinak je výpočetně náročný, protože před sloučením musí nejprve seřadit levou a pravou stranu dat.

Spojení Broadcast je nejvhodnější pro menší datové sady nebo tam, kde je jedna strana spojení mnohem menší než druhá strana. Tento typ spojení vysílá jednu stranu všem exekutorům, a proto obecně vyžaduje více paměti pro všesměrová vysílání.

Typ spojení v konfiguraci můžete změnit nastavením spark.sql.autoBroadcastJoinThresholdnebo můžete nastavit nápovědu ke spojení pomocí rozhraní API datového rámce (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Pokud používáte tabulky s rozdělením do intervalů, máte třetí typ spojení– Merge spojení. Správně předem rozdělená a předem seřazená datová sada přeskočí náročnou SortMerge fázi řazení ze spojení.

Pořadí spojení záleží, zejména u složitějších dotazů. Začněte s nejselektivnějšími spojeními. Pokud je to možné, přesouvejte spojení, která zvyšují počet řádků po agregaci.

Pokud chcete spravovat paralelismus pro kartézská spojení, můžete přidat vnořené struktury, práci s okny a možná přeskočit jeden nebo více kroků v úloze Sparku.

Vyberte správnou velikost exekutoru.

Při rozhodování o konfiguraci exekutoru vezměte v úvahu režijní náklady na uvolňování paměti Javy.

  • Faktory pro zmenšení velikosti exekutoru:

    • Zmenšete velikost haldy pod 32 GB, abyste zachovali režii uvolňování paměti < o 10 %.
    • Snižte počet jader, abyste zachovali režii uvolňování < paměti o 10 %.
  • Faktory pro zvětšení velikosti exekutoru:

    • Snižte režii komunikace mezi exekutory.
    • Snižte počet otevřených připojení mezi exekutory (N2) na větších clusterech (>100 exekutorů).
    • Zvětšete velikost haldy, aby se pojala pro úlohy náročné na paměť.
    • Volitelné: Snižte režii paměti na exekutor.
    • Volitelné: Zvýšení využití a souběžnosti nadměrným odběrem procesoru

Obecně platí při výběru velikosti exekutoru:

  • Začněte s 30 GB na exekutor a distribuujte dostupná jádra počítače.
  • Zvyšte počet jader exekutoru pro větší clustery (> 100 exekutorů).
  • Upravte velikost na základě zkušebních spuštění a předchozích faktorů, jako je režie GC.

Při spouštění souběžných dotazů zvažte následující:

  • Začněte s 30 GB na exekutor a všechna jádra počítače.
  • Vytváření více paralelních aplikací Sparku nadměrným odběrem procesoru (přibližně o 30 % zvýšení latence).
  • Distribuujte dotazy napříč paralelními aplikacemi.
  • Upravte velikost na základě zkušebních spuštění a předchozích faktorů, jako je režie GC.

Sledujte výkon dotazů z hlediska odlehlých hodnot nebo jiných problémů s výkonem– podívejte se na zobrazení časové osy, graf SQL, statistiky úloh atd. Někdy je jeden nebo několik exekutorů pomalejší než ostatní a provádění úkolů trvá mnohem déle. K tomu často dochází ve větších clusterech (> 30 uzlů). V takovém případě rozdělte práci na větší počet úkolů, aby plánovač mohl kompenzovat pomalé úkoly.

Například mít alespoň dvakrát tolik úkolů, než je počet jader exekutoru v aplikaci. Můžete také povolit spekulativní provádění úloh pomocí conf: spark.speculation = true.

Optimalizace spouštění úloh

  • Podle potřeby do mezipaměti, například pokud data používáte dvakrát, pak je do mezipaměti.
  • Proměnné vysílání do všech exekutorů Proměnné jsou serializovány pouze jednou, což má za následek rychlejší vyhledávání.
  • Použijte fond vláken na ovladači, což má za následek rychlejší provoz pro mnoho úloh.

Klíčem k výkonu dotazů Sparku 2.x je modul Tungsten, který závisí na generování kódu celé fáze. V některých případech může být generování kódu v celé fázi zakázané.

Pokud například ve výrazu SortAggregate agregace použijete neměnitelný typ (string), zobrazí se místo HashAggregate. Pokud chcete například zlepšit výkon, vyzkoušejte následující postup a potom znovu povolte generování kódu:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Další kroky