Optimering av databearbetning för Apache Spark

Den här artikeln beskriver hur du optimerar konfigurationen av ditt Apache Spark-kluster för bästa prestanda i Azure HDInsight.

Översikt

Om du har långsamma jobb på en Koppling eller Shuffle är orsaken förmodligen datasnedställning. Dataskevhet är asymmetri i dina jobbdata. Ett kartjobb kan till exempel ta 20 sekunder. Men att köra ett jobb där data är anslutna eller blandade tar flera timmar. Om du vill åtgärda datasnedställning bör du salta hela nyckeln eller använda ett isolerat salt för endast en delmängd nycklar. Om du använder ett isolerat salt bör du filtrera ytterligare för att isolera delmängden av saltade nycklar i kartkopplingar. Ett annat alternativ är att introducera en bucketkolumn och föraggregera i bucketar först.

En annan faktor som orsakar långsamma kopplingar kan vara kopplingstypen. Som standard använder Spark kopplingstypen SortMerge . Den här typen av koppling passar bäst för stora datamängder. Men är annars beräkningsmässigt dyrt eftersom det först måste sortera vänster och höger sida av data innan de sammanfogas.

En Broadcast koppling passar bäst för mindre datamängder, eller där ena sidan av kopplingen är mycket mindre än den andra sidan. Den här typen av koppling sänder en sida till alla utförare och kräver därför mer minne för sändningar i allmänhet.

Du kan ändra kopplingstypen i konfigurationen genom att ange spark.sql.autoBroadcastJoinThreshold, eller så kan du ange ett kopplingstips med hjälp av DataFrame-API:erna (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Om du använder bucketade tabeller har du en tredje kopplingstyp, Merge kopplingen. En korrekt förpartitionerad och försorterad datauppsättning hoppar över den dyra sorteringsfasen från en SortMerge koppling.

Ordningen på kopplingar är viktig, särskilt i mer komplexa frågor. Börja med de mest selektiva kopplingarna. Flytta även kopplingar som ökar antalet rader efter aggregeringar när det är möjligt.

Om du vill hantera parallellitet för kartesiska kopplingar kan du lägga till kapslade strukturer, fönster och kanske hoppa över ett eller flera steg i ditt Spark-jobb.

Optimera jobbkörningen

  • Cachelagrade efter behov, till exempel om du använder data två gånger, och cachelagrade dem sedan.
  • Sända variabler till alla utförare. Variablerna serialiseras bara en gång, vilket resulterar i snabbare sökningar.
  • Använd trådpoolen på drivrutinen, vilket resulterar i snabbare åtgärder för många uppgifter.

Övervaka dina jobb som körs regelbundet för prestandaproblem. Om du behöver mer insikt i vissa problem kan du överväga något av följande verktyg för prestandaprofilering:

Nyckeln till Spark 2.x-frågeprestanda är tungstenmotorn, som är beroende av kodgenerering i hela stadiet. I vissa fall kan kodgenerering i hela steget inaktiveras. Om du till exempel använder en icke-föränderlig typ (string) i aggregeringsuttrycket SortAggregate visas i stället för HashAggregate. Om du till exempel vill ha bättre prestanda kan du prova följande och sedan återaktivera kodgenereringen:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Nästa steg