Linee guida per l'ottimizzazione delle prestazioni di Storm in HDInsight e di Azure Data Lake StorePerformance tuning guidance for Storm on HDInsight and Azure Data Lake Store

È opportuno conoscere i fattori da tenere in considerazione quando si ottimizzano le prestazioni di una topologia di Storm in Azure.Understand the factors that should be considered when you tune the performance of an Azure Storm topology. È importante, ad esempio, comprendere le caratteristiche del lavoro svolto da spout e bolt (se il lavoro è I/O o usa intensivamente la memoria).For example, it's important to understand the characteristics of the work done by the spouts and the bolts (whether the work is I/O or memory intensive). Questo articolo illustra una serie di linee guida per l'ottimizzazione delle prestazioni, inclusa la risoluzione dei problemi comuni.This article covers a range of performance tuning guidelines, including troubleshooting common issues.

PrerequisitiPrerequisites

Ottimizzare il parallelismo della topologiaTune the parallelism of the topology

È possibile migliorare le prestazioni aumentando la concorrenza di I/O o da e verso Data Lake Store.You might be able to improve performance by increasing the concurrency of the I/O to and from Data Lake Store. Una topologia Storm consiste in un set di configurazioni che determinano il parallelismo:A Storm topology has a set of configurations that determine the parallelism:

  • Numero di processi di lavoro (i ruoli di lavoro sono distribuiti uniformemente tra le macchine virtuali).Number of worker processes (the workers are evenly distributed across the VMs).
  • Numero di istanze di executor di spout.Number of spout executor instances.
  • Numero di istanze di executor di bolt.Number of bolt executor instances.
  • Numero di attività spout.Number of spout tasks.
  • Numero di attività bolt.Number of bolt tasks.

Ad esempio, in un cluster con 4 macchine virtuali e 4 processi di lavoro, 32 executor di spout e 32 attività spout e 256 executor di bolt e 512 attività bolt, tenere presente quanto segue:For example, on a cluster with 4 VMs and 4 worker processes, 32 spout executors and 32 spout tasks, and 256 bolt executors and 512 bolt tasks, consider the following:

Ogni supervisore, che è un ruolo di lavoro, ha un singolo processo di lavoro JVM (Java Virtual Machine).Each supervisor, which is a worker node, has a single worker Java virtual machine (JVM) process. Questo processo JVM gestisce 4 thread di spout e 64 thread di bolt.This JVM process manages 4 spout threads and 64 bolt threads. In ogni thread le attività vengono eseguite in sequenza.Within each thread, tasks are run sequentially. Con la configurazione precedente, ogni thread di spout ha 1 attività e ogni thread di bolt ha 2 attività.With the preceding configuration, each spout thread has 1 task, and each bolt thread has 2 tasks.

Ecco i vari componenti coinvolti in Storm e il relativo effetto sul livello di parallelismo:In Storm, here are the various components involved, and how they affect the level of parallelism you have:

  • Il nodo head (denominato Nimbus in Storm) viene usato per inviare e gestire i processi.The head node (called Nimbus in Storm) is used to submit and manage jobs. Questi nodi non hanno alcun impatto sul livello di parallelismo.These nodes have no impact on the degree of parallelism.
  • I nodi supervisore.The supervisor nodes. In HDInsight corrispondono a una macchina virtuale di Azure con nodo di lavoro.In HDInsight, this corresponds to a worker node Azure VM.
  • Le attività di lavoro sono processi di Storm in esecuzione sulle macchine virtuali.The worker tasks are Storm processes running in the VMs. Ogni attività di lavoro corrisponde a un'istanza JVM.Each worker task corresponds to a JVM instance. Storm distribuisce il numero di processi di lavoro che vengono specificati ai nodi di lavoro nel modo più uniforme possibile.Storm distributes the number of worker processes you specify to the worker nodes as evenly as possible.
  • Istanze di executor di spout e bolt.Spout and bolt executor instances. Ogni istanza dell'executor corrisponde a un thread in esecuzione nei ruoli di lavoro (JVM).Each executor instance corresponds to a thread running within the workers (JVMs).
  • Attività di Storm.Storm tasks. Si tratta di attività logiche eseguite da ognuno di questi thread.These are logical tasks that each of these threads run. Ciò non modifica il livello di parallelismo, pertanto è necessario valutare se è opportuno disporre di più attività per ogni executor.This does not change the level of parallelism, so you should evaluate if you need multiple tasks per executor or not.

Ottenere prestazioni ottimali da Data Lake StoreGet the best performance from Data Lake Store

Quando si lavora con Data Lake Store, è possibile ottenere prestazioni ottimali procedendo come segue:When working with Data Lake Store, you get the best performance if you do the following:

  • Unire le piccole aggiunte in dimensioni maggiori (idealmente 4 MB).Coalesce your small appends into larger sizes (ideally 4 MB).
  • Eseguire il maggior numero possibile di richieste simultanee.Do as many concurrent requests as you can. Poiché ogni thread di bolt esegue il blocco delle letture, è opportuno avere un punto compreso nell'intervallo tra 8 e 12 thread per core.Because each bolt thread is doing blocking reads, you want to have somewhere in the range of 8-12 threads per core. Questo favorisce un buon utilizzo della scheda di interfaccia di rete e della CPU.This keeps the NIC and the CPU well utilized. Una macchina virtuale più grande consente di eseguire un maggior numero di richieste simultanee.A larger VM enables more concurrent requests.

Topologia di esempioExample topology

Si supponga di avere un cluster con 8 nodi di lavoro e una macchina virtuale di Azure D13v2.Let’s assume you have an 8 worker node cluster with a D13v2 Azure VM. Questa macchina virtuale ha 8 core, quindi negli 8 nodi di lavoro il totale è di 64 core.This VM has 8 cores, so among the 8 worker nodes, you have 64 total cores.

Si supponga di eseguire 8 thread bolt per core.Let’s say we do 8 bolt threads per core. Essendo i core 64, si vuole un totale di 512 istanze dell'executor (ovvero di thread) di bolt.Given 64 cores, that means we want 512 total bolt executor instances (that is, threads). In questo caso, si inizia con una JVM per ogni macchina virtuale e si usa principalmente la concorrenza tra thread nella JVM per ottenere una concorrenza.In this case, let’s say we start with one JVM per VM, and mainly use the thread concurrency within the JVM to achieve concurrency. Ciò significa che sono necessari 8 attività di lavoro (uno per ogni macchina virtuale di Azure) e 512 executor bolt.That means we need 8 worker tasks (one per Azure VM), and 512 bolt executors. Data questa configurazione, Storm prova a distribuire i ruoli di lavoro in modo uniforme tra i nodi di lavoro (chiamati anche nodi supervisore), assegnando a ogni nodo di lavoro 1 JVM.Given this configuration, Storm tries to distribute the workers evenly across worker nodes (also known as supervisor nodes), giving each worker node 1 JVM. A questo punto nei supervisor Storm prova a distribuire gli executor in modo uniforme tra i supervisori, assegnando a ogni supervisore (ovvero a ogni JVM) 8 thread.Now within the supervisors, Storm tries to distribute the executors evenly between supervisors, giving each supervisor (that is, JVM) 8 threads each.

Ottimizzare i parametri aggiuntiviTune additional parameters

Dopo aver creato la topologia di base, è possibile valutare se modificare uno dei parametri:After you have the basic topology, you can consider whether you want to tweak any of the parameters:

  • Number of JVMs per worker node (Numero di JVM per nodo di lavoro).Number of JVMs per worker node. Se si ha una struttura dei dati di grandi dimensioni (ad esempio, tabella di ricerca) che si vuole ospitare in memoria, ogni JVM richiede una copia separata.If you have a large data structure (for example, a lookup table) that you host in memory, each JVM requires a separate copy. In alternativa, è possibile usare la struttura dei dati in diversi thread se si hanno meno JVM.Alternatively, you can use the data structure across many threads if you have fewer JVMs. Per l'I/O del bolt, il numero di JVM non definisce una differenza significativa come il numero di thread aggiunti su tali JVM.For the bolt’s I/O, the number of JVMs does not make as much of a difference as the number of threads added across those JVMs. Per semplicità, è consigliabile avere una sola JVM per ruolo di lavoro.For simplicity, it's a good idea to have one JVM per worker. A seconda delle operazioni eseguite dal bolt o dall'elaborazione dell'applicazione richiesta, potrebbe tuttavia essere necessario modificare questo numero.Depending on what your bolt is doing or what application processing you require, though, you may need to change this number.
  • Number of spout executors (Numero di executor spout).Number of spout executors. Poiché l'esempio precedente usa bolt per la scrittura in Data Lake Store, il numero di spout non è strettamente correlato alle prestazioni del bolt.Because the preceding example uses bolts for writing to Data Lake Store, the number of spouts is not directly relevant to the bolt performance. Tuttavia, a seconda della quantità di elaborazione o I/O in corso nello spout, è consigliabile ottimizzare gli spout per ottenere le massime prestazioni.However, depending on the amount of processing or I/O happening in the spout, it's a good idea to tune the spouts for best performance. Assicurarsi di avere sufficienti spout per poter mantenere occupati i bolt.Ensure that you have enough spouts to be able to keep the bolts busy. Le velocità di output degli spout devono corrispondere alla velocità effettiva dei bolt.The output rates of the spouts should match the throughput of the bolts. La configurazione effettiva varia a seconda dello spout.The actual configuration depends on the spout.
  • Number of tasks (Numero di attività).Number of tasks. Ogni bolt viene eseguito come thread singolo.Each bolt runs as a single thread. Attività aggiuntive sui bolt non forniscono concorrenza aggiuntiva.Additional tasks per bolt don't provide any additional concurrency. Esse risultano utili solo se il processo di riconoscimento della tupla occupa una porzione importante del tempo di esecuzione del bolt.The only time they are of benefit is if your process of acknowledging the tuple takes a large proportion of your bolt execution time. È consigliabile raggruppare più tuple in un'aggiunta di maggiori dimensioni prima di inviare un acknowledgement dal bolt.It's a good idea to group many tuples into a larger append before you send an acknowledgement from the bolt. Nella maggior parte dei casi quindi più attività non offrono vantaggi aggiuntivi.So, in most cases, multiple tasks provide no additional benefit.
  • Local or shuffle grouping (Raggruppamento locale o casuale).Local or shuffle grouping. Quando questa impostazione è abilitata, le tuple vengono inviate ai bolt nello stesso processo di lavoro.When this setting is enabled, tuples are sent to bolts within the same worker process. In questo modo si riduce il numero di comunicazioni tra processi e chiamate di rete.This reduces inter-process communication and network calls. Tale operazione è consigliata per la maggior parte delle topologie.This is recommended for most topologies.

Questo scenario di base è un buon punto di partenza.This basic scenario is a good starting point. Usare i propri dati per un test per perfezionare i parametri precedenti e raggiungere così prestazioni ottimali.Test with your own data to tweak the preceding parameters to achieve optimal performance.

Ottimizzare lo spoutTune the spout

È possibile modificare le impostazioni seguenti per ottimizzare lo spout.You can modify the following settings to tune the spout.

  • Tuple timeout: topology.message.timeout.secs (Timeout tuple: topology.message.timeout.secs).Tuple timeout: topology.message.timeout.secs. Questa impostazione determina il tempo impiegato da un messaggio per completare e ricevere l'acknowledgement prima che questo abbia esito negativo.This setting determines the amount of time a message takes to complete, and receive acknowledgement, before it is considered failed.

  • Max memory per worker process: worker.childopts (Memoria massima per processo di lavoro: worker.childopts).Max memory per worker process: worker.childopts. Questa impostazione consente di specificare i parametri della riga di comando aggiuntivi per ruoli di lavoro Java.This setting lets you specify additional command-line parameters to the Java workers. L'impostazione più comunemente usata in questo caso è XmX, che determina la memoria massima allocata nell'heap di JVM.The most commonly used setting here is XmX, which determines the maximum memory allocated to a JVM’s heap.

  • Max spout pending: topology.max.spout.pending (Spout massimo in sospeso: topology.max.spout.pending).Max spout pending: topology.max.spout.pending. Questa configurazione determina il numero di tuple che possono essere in esecuzione (non ancora riconosciute in tutti i nodi della topologia) per thread di spout in qualsiasi momento.This setting determines the number of tuples that can in be flight (not yet acknowledged at all nodes in the topology) per spout thread at any time.

    Un buon calcolo da eseguire è la stima delle dimensioni di ogni tupla.A good calculation to do is to estimate the size of each of your tuples. Quindi capire di quanta memoria dispone un thread spout.Then figure out how much memory one spout thread has. La memoria totale allocata in un thread, divisa per questo valore, fornisce il limite superiore per il parametro di spout massimo in sospeso.The total memory allocated to a thread, divided by this value, should give you the upper bound for the max spout pending parameter.

Ottimizzare il boltTune the bolt

Durante la scrittura in Data Lake Store, impostare un criterio di sincronizzazione delle dimensioni (buffer sul lato client) pari a 4 MB.When you're writing to Data Lake Store, set a size sync policy (buffer on the client side) to 4 MB. L'operazione di scaricamento o hsync() viene quindi eseguita solo quando la dimensione del buffer è uguale a questo valore.A flushing or hsync() is then performed only when the buffer size is the at this value. Il driver Data Lake Store nel ruolo di lavoro della macchina virtuale esegue automaticamente il buffering, a meno che non si esegua in modo esplicito hsync().The Data Lake Store driver on the worker VM automatically does this buffering, unless you explicitly perform an hsync().

Il bolt Data Lake Store di Storm predefinito ha un parametro dei criteri di sincronizzazione delle dimensioni (fileBufferSize) che può essere usato per ottimizzare questo parametro.The default Data Lake Store Storm bolt has a size sync policy parameter (fileBufferSize) that can be used to tune this parameter.

Nelle topologie intensive I/O, è consigliabile che ogni thread bolt scriva in un proprio file e impostare un criterio di rotazione del file (fileRotationSize).In I/O-intensive topologies, it's a good idea to have each bolt thread write to its own file, and to set a file rotation policy (fileRotationSize). Quando il file raggiunge una determinata dimensione, il flusso viene allineato automaticamente e viene scritto su un nuovo file.When the file reaches a certain size, the stream is automatically flushed and a new file is written to. La dimensione del file consigliata per la rotazione è 1 GB.The recommended file size for rotation is 1 GB.

Gestire i dati delle tupleHandle tuple data

In Storm lo spout trattiene la tupla fino a quando non viene esplicitamente riconosciuta dal bolt.In Storm, a spout holds on to a tuple until it is explicitly acknowledged by the bolt. Se una tupla è stata letta dal bolt, ma non è ancora stata riconosciuta, lo spout potrebbe non essere permanente nel back-end di Data Lake Store.If a tuple has been read by the bolt but has not been acknowledged yet, the spout might not have persisted into Data Lake Store back end. Dopo l'acknowledgment di una tupla, la persistenza dello spout può essere garantita dal bolt e lo spout può quindi eliminare i dati di origine da qualsiasi origine vengano letti.After a tuple is acknowledged, the spout can be guaranteed persistence by the bolt, and can then delete the source data from whatever source it is reading from.

Per prestazioni ottimali in Data Lake Store, 4 MB del buffer del bolt devono essere disponibili per i dati delle tuple.For best performance on Data Lake Store, have the bolt buffer 4 MB of tuple data. Scrivere quindi nel back-end di Data Lake Store con un'unica operazione di scrittura da 4 MB.Then write to the Data Lake Store back end as one 4-MB write. Dopo aver scritto correttamente i dati nell'archivio, chiamando hflush(), il bolt può riconoscere i dati nello spout.After the data has been successfully written to the store (by calling hflush()), the bolt can acknowledge the data back to the spout. Il bolt di esempio qui fornito si comporta in questo modo.This is what the example bolt supplied here does. È accettabile anche trattenere un numero maggiore di tuple prima che venga chiamato hflush() e le tuple vengano riconosciute.It is also acceptable to hold a larger number of tuples before the hflush() call is made and the tuples acknowledged. In questo modo, tuttavia, aumenta il numero di tuple in esecuzione che lo spout deve contenere e quindi aumenta la quantità di memoria necessaria per JVM.However, this increases the number of tuples in flight that the spout needs to hold, and therefore increases the amount of memory required per JVM.

Nota

Per altre cause non strettamente correlate alle prestazioni, è possibile che le applicazioni presentino un requisito di frequenza più elevata per l'acknowledgment delle tuple (in caso di dimensioni dei dati inferiori a 4 MB).Applications might have a requirement to acknowledge tuples more frequently (at data sizes less than 4 MB) for other non-performance reasons. Questo potrebbe tuttavia influire sulla velocità effettiva di I/O nel back-end di archiviazione.However, that might affect the I/O throughput to the storage back end. Considerare con attenzione questo compromesso in relazione alle prestazioni di I/O del bolt.Carefully weigh this tradeoff against the bolt’s I/O performance.

Se la velocità in ingresso delle tuple non è elevata e quindi il riempimento del buffer da 4 MB richiede molto tempo, considerare la possibilità di risolvere il problema in questo modo:If the incoming rate of tuples is not high, so the 4-MB buffer takes a long time to fill, consider mitigating this by:

  • Riducendo il numero di bolt, per poter avere un numero minore di buffer da riempire.Reducing the number of bolts, so there are fewer buffers to fill.
  • Con un criterio basato sul tempo o sul conteggio, in cui hflush() viene attivato ogni x scaricamenti oppure ogni y millisecondi e le tuple accumulate fino a quel momento vengono riconosciute.Having a time-based or count-based policy, where an hflush() is triggered every x flushes or every y milliseconds, and the tuples accumulated so far are acknowledged back.

Si noti che in questo caso la velocità effettiva è inferiore, ma con una bassa frequenza di eventi, comunque, la velocità effettiva massima non è l'obiettivo principale.Note that the throughput in this case is lower, but with a slow rate of events, maximum throughput is not the biggest objective anyway. Queste mitigazioni consentono di ridurre il tempo totale impiegato da una tupla per il flusso fino alla risorsa di archiviazione.These mitigations help you reduce the total time that it takes for a tuple to flow through to the store. Questo potrebbe essere importante se è necessaria una pipeline in tempo reale anche con una frequenza di eventi bassa.This might matter if you want a real-time pipeline even with a low event rate. Si noti anche che, se la frequenza di tuple in ingresso è bassa, è consigliabile modificare il parametro topology.message.timeout_secs, in modo che non si verifichi il timeout delle tuple durante l'elaborazione o il buffering.Also note that if your incoming tuple rate is low, you should adjust the topology.message.timeout_secs parameter, so the tuples don’t time out while they are getting buffered or processed.

Monitorare la topologia in StormMonitor your topology in Storm

Quando la topologia è in esecuzione, è possibile monitorarla nell'interfaccia utente di Storm.While your topology is running, you can monitor it in the Storm user interface. Ecco i parametri principali da esaminare:Here are the main parameters to look at:

  • Total process execution latency (Latenza totale di esecuzione del processo).Total process execution latency. Tempo medio che la tupla impiega per essere emessa dallo spout, elaborata dal bolt e riconosciuta.This is the average time one tuple takes to be emitted by the spout, processed by the bolt, and acknowledged.

  • Total bolt process latency (Latenza totale del processo dei bolt).Total bolt process latency. Tempo medio impiegato dalla tupla per ricevere un riconoscimento nel bolt.This is the average time spent by the tuple at the bolt until it receives an acknowledgement.

  • Total bolt execute latency (Latenza totale di esecuzione dei bolt).Total bolt execute latency. Tempo medio speso dal bolt nel metodo Execute.This is the average time spent by the bolt in the execute method.

  • Number of failures (Numero di errori).Number of failures. Numero di tuple la cui elaborazione non è stata completata prima del timeout.This refers to the number of tuples that failed to be fully processed before they timed out.

  • Capacity (Capacità).Capacity. Misura del carico di lavoro del sistema.This is a measure of how busy your system is. Se questo numero è 1, i bolt stanno lavorando al massimo della velocità.If this number is 1, your bolts are working as fast as they can. Se è minore di 1, aumentare il parallelismo.If it is less than 1, increase the parallelism. Se è maggiore di 1, ridurre il parallelismo.If it is greater than 1, reduce the parallelism.

Risolvere i problemi comuniTroubleshoot common problems

Ecco alcuni scenari comuni per la risoluzione dei problemi.Here are a few common troubleshooting scenarios.

  • È in corso il timeout di diverse tuple. Esaminare ogni nodo della topologia per trovare il collo di bottiglia.Many tuples are timing out. Look at each node in the topology to determine where the bottleneck is. La causa più comune è che i bolt non riescono a rimanere aggiornati con gli spout,The most common reason for this is that the bolts are not able to keep up with the spouts. di conseguenza le tuple bloccano i buffer interni mentre sono in attesa di essere elaborate.This leads to tuples clogging the internal buffers while waiting to be processed. Considerare l'aumento del valore di timeout o la riduzione dello spout massimo in sospeso.Consider increasing the timeout value or decreasing the max spout pending.

  • La latenza di esecuzione dei processi totale è elevata, ma la latenza dei processi bolt è bassa.There is a high total process execution latency, but a low bolt process latency. In questo caso è possibile che le tuple non siano riconosciute in modo sufficientemente veloce.In this case, it is possible that the tuples are not being acknowledged fast enough. Verificare che sia presente un numero sufficiente di elementi di acknowledgment.Check that there are a sufficient number of acknowledgers. Un'altra possibilità è che rimangano in attesa nella coda per troppo tempo prima che i bolt inizino la loro elaborazione.Another possibility is that they are waiting in the queue for too long before the bolts start processing them. Ridurre lo spout massimo in sospeso.Decrease the max spout pending.

  • La latenza di esecuzione dei bolt è elevata.There is a high bolt execute latency. Indica che il metodo execute() del bolt impiega troppo tempo.This means that the execute() method of your bolt is taking too long. Ottimizzare il codice o esaminare le dimensioni della scrittura e il comportamento di scaricamento.Optimize the code, or look at write sizes and flush behavior.

Limitazione di Data Lake StoreData Lake Store throttling

Se si raggiungono i limiti di larghezza di banda di Data Lake Store, è possibile che le attività abbiano esito negativo.If you hit the limits of bandwidth provided by Data Lake Store, you might see task failures. Cercare errori di limitazione nei log delle attività.Check task logs for throttling errors. È possibile ridurre il parallelismo aumentando la dimensione del contenitore.You can decrease the parallelism by increasing container size.

Per verificare la presenza di limitazioni, abilitare la registrazione di debug sul lato client:To check if you are getting throttled, enable the debug logging on the client side:

  1. In Ambari > Storm > Config (Configurazione) > Advanced storm-worker-log4j (Storm-worker-log4j avanzato), sostituire <root level="info"> con <root level="debug">.In Ambari > Storm > Config > Advanced storm-worker-log4j, change <root level="info"> to <root level=”debug”>. Riavviare tutti i nodi o servizi per rendere effettiva la nuova configurazione.Restart all the nodes/service for the configuration to take effect.
  2. Monitorare i log della topologia Storm sui nodi di lavoro (in /var/log/storm/worker-artifacts/<NomeTopologia>/<porta>/worker.log) per le eccezioni alle limitazioni di Data Lake Store.Monitor the Storm topology logs on worker nodes (under /var/log/storm/worker-artifacts/<TopologyName>/<port>/worker.log) for Data Lake Store throttling exceptions.

Passaggi successiviNext steps

L'ottimizzazione aggiuntiva delle prestazioni di Storm è reperibile in questo blog.Additional performance tuning for Storm can be referenced in this blog.

Per un esempio aggiuntivo da eseguire, vedere questo in GitHub.For an additional example to run, see this one on GitHub.