Ottimizzare i processi di Apache Spark in Azure Synapse Analytics

Completato

Dopo aver controllato la scheda Monitoraggio nell'ambiente Azure Synapse Studio e aver deciso di migliorare le prestazioni dell'esecuzione di un pool di Apache Spark, è necessario considerare diversi aspetti, tra cui:

  • Scelta dell'astrazione dei dati
  • Usare il formato dati ottimale
  • Usare l'opzione della cache
  • Controllare l'efficienza della memoria
  • Usare il bucketing
  • Ottimizzare i join e le riproduzioni casuali
  • Ottimizzare l'esecuzione del processo

Per ottimizzare i processi di Apache Spark in Azure Synapse Analytics, è necessario tenere in considerazione la configurazione del cluster per il carico di lavoro in esecuzione in tale cluster. È possibile che si verifichino problemi, ad esempio l'utilizzo elevato della memoria (se non configurata correttamente scegliendo le dimensioni non appropriate per gli executor), operazioni a esecuzione prolungata e altre attività, che potrebbero dare origine a operazioni cartesiane.

Per velocizzare i processi, è necessario configurare in modo appropriato la memorizzazione nella cache per tale attività e verificare i join e le riproduzioni casuali in relazione all'asimmetria dei dati. È quindi indispensabile monitorare ed esaminare le esecuzioni del processo Apache Spark prolungate o con utilizzo elevato di risorse. Ecco alcune raccomandazioni per ottimizzare il processo di Apache Spark:

Scelta dell'astrazione dei dati

Alcune delle versioni precedenti di Apache Spark usano i set di dati distribuiti resilienti (RDD, Resilient Distributed DataSet) per astrarre i dati. In Apache Spark 1.3 e 1.6 è stato introdotto l'uso di dataframe e set di dati. I vantaggi relativi seguenti potrebbero essere d'aiuto per l'ottimizzazione dell'astrazione dei dati:

Dataframe

L'uso dei dataframe è un ottimo punto di partenza. I dataframe assicurano l'ottimizzazione query tramite Catalyst. Includono anche una generazione di codice whole-stage con accesso diretto alla memoria. Per ottenere un'esperienza di sviluppo il più intuitiva possibile, potrebbe essere preferibile usare i set di dati, perché non sono previsti controlli in fase di compilazione o programmazione di oggetti di dominio.

I set di dati sono utili per l'ottimizzazione di pipeline ETL complesse dove l'effetto sulle prestazioni è accettabile. I set di dati devono però essere usati con cautela nelle aggregazioni, perché potrebbero compromettere le prestazioni. Assicurano l'ottimizzazione query tramite Catalyst e un ambiente di sviluppo intuitivo grazie alla programmazione di oggetti e ai controlli in fase di compilazione.

Set di dati RDD

Non è necessario usare i set di dati RDD a meno che non si voglia o non sia necessario compilare un nuovo set di dati RDD personalizzato. Non è tuttavia prevista l'ottimizzazione query tramite Catalyst né la generazione di codice whole-stage e inoltre il sovraccarico di Garbage Collection (GC) sarebbe elevato. L'unico modo per usare i set di dati RDD è con le API legacy di Apache Spark 1.x.

Usare il formato dati ottimale

Apache Spark supporta diversi formati di dati. I formati che è possibile usare sono CSV, JSON, XML, parquet e così via. Sono previsti anche altri formati con origini dati esterne. Un suggerimento che può essere utile è quello di usare Parquet con la compressione snappy (che è anche l'impostazione predefinita in Apache Spark 2.x) perché archivia i dati in un formato a colonne, è compresso e altamente ottimizzato in Apache Spark ed è divisibile per la decompressione.

Usare l'opzione della cache

Per quanto riguarda la memorizzazione nella cache, è disponibile un meccanismo predefinito nativo di Apache Spark, che può essere usato con metodi diversi, ad esempio: .persist(), .cache() e CACHE TABLE. Potrebbe essere efficace quando si usano set di dati di piccole dimensioni.

Può essere utile anche nelle pipeline ETL in cui è necessario memorizzare nella cache i risultati intermedi. Tenere presente che, quando è necessario eseguire il partizionamento, il meccanismo nativo di memorizzazione nella cache di Apache Spark potrebbe presentare alcuni svantaggi perché una tabella memorizzata nella cache non conserverà i dati di partizionamento.

Controllare l'efficienza della memoria

È anche fondamentale comprendere come usare la memoria in modo efficiente. Apache Spark funziona inserendo i dati in memoria, quindi la gestione delle risorse di memoria è un aspetto dell'ottimizzazione delle esecuzioni dei processi di Apache Spark.

Un modo per gestire le risorse della memoria può essere il controllo delle partizioni di dati più piccole e il controllo di dimensioni, tipi e distribuzioni dei dati quando si formula una strategia di partizionamento. Per ottimizzare le esecuzioni, è anche possibile prendere in considerazione la serializzazione dei dati di Kryo invece della serializzazione Java predefinita. Si ricordi di continuare a monitorare e ottimizzare le impostazioni di configurazione di Apache Spark.

Usare il bucketing

Il bucketing corrisponde in pratica al partizionamento dei dati, con la differenza che un bucket include un set di valori di colonna invece di uno. Potrebbe funzionare bene quando si esegue il partizionamento di valori elevati (milioni o più), ad esempio gli identificatori di prodotto. Un bucket è determinato dall'hashing della chiave del bucket di una riga. Le tabelle bucket sono ottimizzate perché si tratta di un'operazione di metadati sulla modalità di bucket e ordinamento dei dati.

Ecco alcune funzionalità bucket avanzate:

  • Ottimizzazione query in base alla ripartizione in bucket di metainformazioni
  • Aggregazioni ottimizzate
  • Join ottimizzati

Il bucketing, tuttavia, non esclude il partizionamento, ma è possibile usare contemporaneamente il partizionamento e il bucketing.

Ottimizzare join e riproduzioni con sequenza casuale

Il rallentamento delle prestazioni dei processi di join o di riproduzione casuale può essere causato dall'asimmetria dei dati. L'asimmetrica dei dati è causata dai dati che vengono archiviati in modo asimmetrico nel sistema. Un processo, ad esempio, in genere richiede solo 20 secondi, ma se si esegue lo stesso processo eseguendo il join e la riproduzione casuale dei dati, potrebbero essere necessarie ore.

Per correggere tale asimmetria dei dati, è possibile effettuare il salting dell'intera chiave o usare un salt isolato soltanto per alcuni subset di chiavi. Un'altra opzione da prendere in esame potrebbe essere l'introduzione di una colonna bucket o di dati preaggregati nei bucket.

Il rallentamento delle prestazioni, tuttavia, non è causato esclusivamente dai join. La causa potrebbe anche essere il tipo di join.

Apache Spark usa il tipo di join SortMerge. Questo tipo di join è ideale per grandi set di dati, mentre in caso contrario risulta oneroso, perché è prima di tutto necessario ordinare i lati sinistro e destro dei dati prima di unirli. Un join Broadcast potrebbe essere quindi più adatto per i set di dati più piccoli o per quelli in cui un lato del join è molto più piccolo dell'altro.

È possibile modificare il tipo di join nella configurazione impostando spark.sql.autoBroadcastJoinThreshold. In alternativa, è possibile impostare un join hint usando le API del frame di dati (dataframe.join(broadcast(df2))), come illustrato nel codice seguente.

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Se si è deciso di usare le tabelle inserite in bucket, è disponibile un terzo tipo di join, il join Merge. Un set di dati correttamente pre-partizionato e preordinato ignorerà la fase costosa di ordinamento da un join SortMerge. Un altro aspetto da tenere presente è che l'ordine dei diversi tipi di join è importante, soprattutto nelle query complesse. È pertanto consigliabile iniziare con i join più selettivi. Inoltre, quando è possibile, provare a spostare dopo le aggregazioni i join che fanno aumentare il numero di righe.

Ottimizzare l'esecuzione del processo

Esaminando il dimensionamento degli executor per aumentare le prestazioni nel processo di Apache Spark, è possibile osservare l'overhead della Java Garbage Collection (GC) e usare i fattori seguenti per ridurre le dimensioni degli executor:

  • Ridurre le dimensioni heap al di sotto di 32 GB per mantenere un overhead di GC < 10%.
  • Ridurre il numero di core per mantenere un overhead di GC < 10%.

Oppure considerare i fattori seguenti per aumentare le dimensioni degli executor:

  • Ridurre l'overhead di comunicazione tra executor.
  • Ridurre il numero di connessioni aperte tra executor (N2) nei cluster di grandi dimensioni (>100 executor).
  • Aumentare le dimensioni dell'heap per consentire un uso intensivo della memoria.
  • ridurre l'overhead della memoria per ogni executor.
  • aumentare la concorrenza e l'uso mediante sovrascrittura della CPU.

Come regola generale, quando si selezionano le dimensioni degli executor:

  • Iniziare con 30 GB per executor e distribuire i core disponibili sul computer.
  • Aumentare il numero di core per executor per i cluster di grandi dimensioni (> 100 executor).
  • Modificare le dimensioni in base alle esecuzioni di prova e ai fattori precedenti, ad esempio l'overhead di Garbage Collection.

Durante l'esecuzione di query simultanee, considerare quanto segue:

  • Iniziare con 30 GB per ogni executor e per tutti i core del computer.
  • Creare più applicazioni Apache Spark sottoscrivendo una capacità superiore a quella della CPU (miglioramento della latenza del 30% circa).
  • Distribuire le query tra applicazioni parallele.
  • Modificare le dimensioni in base alle esecuzioni di prova e ai fattori precedenti, ad esempio l'overhead di Garbage Collection.

Come indicato in precedenza, è importante continuare a monitorare le prestazioni, soprattutto gli outlier, usando la visualizzazione della cronologia, il grafo SQL, le statistiche dei processi e così via. A volte uno degli executor può essere più lento degli altri, soprattutto nei cluster di grandi dimensioni (più di 30 nodi). Si può quindi valutare l'opportunità di dividere il lavoro in più attività in modo che l'utilità di pianificazione possa compensare le attività più lente.

Se è necessario ottimizzare l'esecuzione di un processo, tenere presente la memorizzazione nella cache (ad esempio, è possibile usare i dati due volte, ma memorizzandoli nella cache). Se si trasmettono variabili in tutti gli executor configurati, poiché le variabili vengono serializzate solo una volta, le ricerche saranno più veloci.

Oppure si potrebbe usare il pool di thread in esecuzione sul driver, che può velocizzare le operazioni per molte attività.