Condividi tramite


Ottimizzazione delle prestazioni per i cluster Apache Kafka HDInsight

Questo articolo offre alcuni suggerimenti per ottimizzare le prestazioni dei carichi di lavoro Apache Kafka in HDInsight. L'obiettivo è modificare la configurazione di producer, broker e consumer. In alcuni casi, è anche necessario modificare le impostazioni del sistema operativo per ottimizzare le prestazioni con un carico di lavoro elevato. Esistono diversi modi per misurare le prestazioni e le ottimizzazioni applicate dipendono dalle esigenze aziendali.

Panoramica dell'architettura

Gli argomenti Kafka vengono usati per organizzare i record. I produttori producono record e i consumer li usano. I produttori inviano record ai broker Kafka, che archivia quindi i dati. Ogni nodo del ruolo di lavoro nel cluster HDInsight è un broker Kafka.

Record di partizione degli argomenti tra broker. Quando si usano i record, è possibile usare fino a un consumer per partizione, per ottenere l'elaborazione parallela dei dati.

La replica viene usata per duplicare le partizioni tra i nodi. Questa partizione protegge dalle interruzioni del nodo (broker). Una singola partizione tra il gruppo di repliche è designata come leader di partizione. Il traffico dei producer viene indirizzato al leader di ogni nodo, usando lo stato gestito da ZooKeeper.

Identificare lo scenario

Le prestazioni di Apache Kafka hanno due aspetti principali: velocità effettiva e latenza. La velocità effettiva è la velocità massima di elaborazione dei dati. Una velocità effettiva più elevata è migliore. La latenza è il tempo necessario per l'archiviazione o il recupero dei dati. La latenza più bassa è migliore. Trovare il giusto equilibrio tra velocità effettiva, latenza e costo dell'infrastruttura dell'applicazione può risultare complesso. I requisiti di prestazioni devono corrispondere a una delle tre situazioni comuni seguenti, a seconda che sia necessaria una velocità effettiva elevata, una bassa latenza o entrambe:

  • Velocità effettiva elevata, bassa latenza. Questo scenario richiede sia velocità effettiva elevata che bassa latenza (~100 millisecondi). Un esempio di questo tipo di applicazione è il monitoraggio della disponibilità del servizio.
  • Velocità effettiva elevata, latenza elevata. Questo scenario richiede una velocità effettiva elevata (~1,5 GBps), ma può tollerare una latenza più elevata (< 250 ms). Un esempio di questo tipo di applicazione è l'inserimento dei dati di telemetria per processi quasi in tempo reale, ad esempio applicazioni di rilevamento delle intrusioni e sicurezza.
  • Bassa velocità effettiva, bassa latenza. Questo scenario richiede bassa latenza (< 10 ms) per l'elaborazione in tempo reale, ma può tollerare una velocità effettiva inferiore. Un esempio di questo tipo di applicazione è il controllo ortografico e grammaticale online.

Configurazioni del producer

Le sezioni seguenti evidenziano alcune delle proprietà di configurazione generiche più importanti per ottimizzare le prestazioni dei producer Kafka. Per una spiegazione dettagliata di tutte le proprietà di configurazione, vedere la documentazione di Apache Kafka sulle configurazioni producer.

Dimensioni dei batch

I producer Apache Kafka assemblano gruppi di messaggi (denominati batch) che vengono inviati come unità da archiviare in una singola partizione di archiviazione. Dimensioni batch indica il numero di byte che devono essere presenti prima che tale gruppo venga trasmesso. L'aumento del parametro può aumentare la batch.size velocità effettiva, perché riduce il sovraccarico di elaborazione dalle richieste di rete e I/O. Con carico leggero, una maggiore dimensione del batch può aumentare la latenza di invio Kafka quando il producer attende che un batch sia pronto. Con un carico elevato, è consigliabile aumentare le dimensioni del batch per migliorare la velocità effettiva e la latenza.

Riconoscimenti richiesti dal produttore

La configurazione richiesta acks dal producer determina il numero di riconoscimenti richiesti dal leader della partizione prima che venga considerata completata una richiesta di scrittura. Questa impostazione influisce sull'affidabilità dei dati e accetta valori di 0, 1o -1. Il valore di -1 indica che un riconoscimento deve essere ricevuto da tutte le repliche prima del completamento della scrittura. L'impostazione acks = -1 offre garanzie più efficaci rispetto alla perdita di dati, ma comporta anche una latenza più elevata e una velocità effettiva inferiore. Se l'applicazione richiede una velocità effettiva superiore, provare a impostare acks = 0 o acks = 1. Tenere presente che non riconoscere tutte le repliche può ridurre l'affidabilità dei dati.

Compressione

Un producer Kafka può essere configurato per comprimere i messaggi prima di inviarli ai broker. L'impostazione compression.type specifica il codec di compressione da utilizzare. I codec di compressione supportati sono "gzip", "snappy" e "lz4". La compressione è utile e deve essere considerata se esiste una limitazione della capacità del disco.

Tra i due codec di compressione comunemente usati, gzip e snappy, gzip ha un rapporto di compressione più elevato, che comporta una riduzione dell'utilizzo del disco a un costo maggiore del carico della CPU. Il snappy codec offre meno compressione con un sovraccarico inferiore della CPU. È possibile decidere quale codec usare in base alle limitazioni della CPU del disco broker o del producer. gzip può comprimere i dati a una velocità cinque volte superiore a snappy.

La compressione dei dati aumenta il numero di record che possono essere archiviati in un disco. Può anche aumentare il sovraccarico della CPU nei casi in cui si verifica una mancata corrispondenza tra i formati di compressione usati dal producer e dal broker. come i dati devono essere compressi prima dell'invio e quindi decompressi prima dell'elaborazione.

Impostazioni broker

Le sezioni seguenti evidenziano alcune delle impostazioni più importanti per ottimizzare le prestazioni dei broker Kafka. Per una spiegazione dettagliata di tutte le impostazioni del broker, vedere la documentazione di Apache Kafka sulle configurazioni del broker.

Numero di dischi

Archiviazione dischi hanno operazioni di I/O al secondo limitate (operazioni di input/output al secondo) e byte di lettura/scrittura al secondo. Quando si creano nuove partizioni, Kafka archivia ogni nuova partizione sul disco con il minor numero di partizioni esistenti per bilanciarle tra i dischi disponibili. Nonostante la strategia di archiviazione, durante l'elaborazione di centinaia di repliche di partizione in ogni disco, Kafka può facilmente saturare la velocità effettiva del disco disponibile. Il compromesso è tra velocità effettiva e costo. Se l'applicazione richiede una maggiore velocità effettiva, creare un cluster con più dischi gestiti per broker. HDInsight attualmente non supporta l'aggiunta di dischi gestiti a un cluster in esecuzione. Per altre informazioni su come configurare il numero di dischi gestiti, vedere Configurare l'archiviazione e la scalabilità per Apache Kafka in HDInsight. Comprendere le implicazioni sui costi dell'aumento dello spazio di archiviazione per i nodi nel cluster.

Numero di argomenti e partizioni

I produttori Kafka scrivono negli argomenti. I consumer Kafka leggono da argomenti. Un argomento è associato a un log, ovvero una struttura di dati su disco. Kafka aggiunge i record da un producer alla fine di un log di argomenti. Un log di argomenti è costituito da molte partizioni distribuite su più file. Questi file sono, a loro volta, distribuiti tra più nodi del cluster Kafka. I consumer leggono gli argomenti di Kafka alla frequenza e possono selezionare la posizione (offset) nel log degli argomenti.

Ogni partizione Kafka è un file di log nel sistema e i thread producer possono scrivere in più log contemporaneamente. Analogamente, poiché ogni thread consumer legge i messaggi da una partizione, anche l'utilizzo di più partizioni viene gestito in parallelo.

Aumentando la densità di partizione (il numero di partizioni per broker) viene aggiunto un sovraccarico correlato alle operazioni di metadati e alla richiesta/risposta di partizione tra il leader della partizione e i relativi follower. Anche in assenza del flusso di dati attraverso, le repliche di partizione recuperano ancora i dati dai leader, che comportano un'elaborazione aggiuntiva per l'invio e la ricezione di richieste in rete.

Per i cluster Apache Kafka 2.1 e 2.4 e come indicato in HDInsight, è consigliabile avere un massimo di 2000 partizioni per broker, incluse le repliche. L'aumento del numero di partizioni per broker riduce la velocità effettiva e può anche causare l'indisponibilità degli argomenti. Per altre informazioni sul supporto delle partizioni Kafka, vedere il post di blog ufficiale di Apache Kafka sull'aumento del numero di partizioni supportate nella versione 1.1.0. Per informazioni dettagliate sulla modifica degli argomenti, vedere Gli argomenti relativi alla modifica di Apache Kafka.

Numero di repliche

Un fattore di replica più elevato comporta richieste aggiuntive tra il leader della partizione e i follower. Di conseguenza, un fattore di replica più elevato utilizza più disco e CPU per gestire richieste aggiuntive, aumentando la latenza di scrittura e riducendo la velocità effettiva.

È consigliabile usare almeno 3 volte la replica per Kafka in Azure HDInsight. La maggior parte delle aree di Azure ha tre domini di errore, ma nelle aree con solo due domini di errore gli utenti devono usare la replica 4x.

Per altre informazioni sulla replica, vedere Apache Kafka: replica e Apache Kafka: aumento del fattore di replica.

Configurazioni consumer

La sezione seguente illustra alcune importanti configurazioni generiche per ottimizzare le prestazioni dei consumer Kafka. Per una spiegazione dettagliata di tutte le configurazioni, vedere la documentazione di Apache Kafka sulle configurazioni consumer.

Numero di consumer

È consigliabile avere il numero di partizioni uguale al numero di consumer. Se il numero di consumer è minore del numero di partizioni, alcuni consumer leggono da più partizioni, aumentando la latenza del consumer.

Se il numero di consumer è maggiore del numero di partizioni, si sta sprecando le risorse consumer perché tali consumer sono inattivi.

Evitare frequenti ribilanciamento dei consumatori

Il ribilanciamento del consumer viene attivato dalla modifica della proprietà della partizione (ad esempio, i consumer aumentano o aumentano le prestazioni), un arresto anomalo del broker (poiché i broker sono coordinatori del gruppo per i gruppi di consumer), un arresto anomalo del consumer, l'aggiunta di un nuovo argomento o l'aggiunta di nuove partizioni. Durante il ribilanciamento, i consumer non possono utilizzare, aumentando quindi la latenza.

I consumer vengono considerati attivi se possono inviare un heartbeat a un broker all'interno session.timeout.msdi . In caso contrario, il consumer viene considerato inattivo o non riuscito. Questo ritardo comporta un ribilanciamento del consumatore. Ridurre il consumer session.timeout.ms, più velocemente è possibile rilevare tali errori.

Se l'oggetto session.timeout.ms è troppo basso, un consumer potrebbe riscontrare ripetuti ribilanciamenti non necessari, a causa di scenari come quando un batch di messaggi richiede più tempo per l'elaborazione o quando una pausa GC JVM richiede troppo tempo. Se si dispone di un consumer che impiega troppo tempo per l'elaborazione dei messaggi, è possibile risolvere questo problema aumentando il limite superiore per il tempo di inattività di un consumer prima di recuperare più record con max.poll.interval.ms o riducendo le dimensioni massime dei batch restituiti con il parametro max.poll.recordsdi configurazione .

Batch

Come i produttori, è possibile aggiungere batch per i consumer. È possibile configurare la quantità di consumer di dati in ogni richiesta di recupero modificando la configurazione fetch.min.bytes. Questo parametro definisce i byte minimi previsti da una risposta di recupero di un consumer. L'aumento di questo valore riduce il numero di richieste di recupero effettuate al broker, riducendo quindi il sovraccarico aggiuntivo. Per impostazione predefinita, questo valore è 1. Analogamente, esiste un'altra configurazione fetch.max.wait.ms. Se una richiesta di recupero non dispone di messaggi sufficienti in base alle dimensioni di fetch.min.bytes, attende fino alla scadenza del tempo di attesa in base a questa configurazione fetch.max.wait.ms.

Nota

In alcuni scenari, i consumer potrebbero sembrare lenti, quando non riesce a elaborare il messaggio. Se non si esegue il commit dell'offset dopo un'eccezione, il consumer verrà bloccato in corrispondenza di un determinato offset in un ciclo infinito e non si sposta in avanti, aumentando così il ritardo sul lato consumer.

Ottimizzazione del sistema operativo Linux con un carico di lavoro elevato

Mappe di memoria

vm.max_map_count definisce il numero massimo di mmap che un processo può avere. Per impostazione predefinita, nella macchina virtuale Linux del cluster Apache Kafka di HDInsight il valore è 65535.

In Apache Kafka ogni segmento di log richiede una coppia di file index/timeindex e ognuno di questi file usa un mmap. In altre parole, ogni segmento di log usa due mmap. Pertanto, se ogni partizione ospita un singolo segmento di log, richiede almeno due mmap. Il numero di segmenti di log per partizione varia a seconda delle dimensioni del segmento, dell'intensità del carico, dei criteri di conservazione, del periodo di sequenza e, in genere, tende a essere più di uno. Mmap value = 2*((partition size)/(segment size))*(partitions)

Se il valore mmap richiesto supera , il broker genererà l'eccezione vm.max_map_count"Map failed".

Per evitare questa eccezione, usare i comandi seguenti per controllare le dimensioni di mmap nella macchina virtuale e aumentare le dimensioni, se necessario, in ogni nodo del ruolo di lavoro.

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

Nota

Prestare attenzione all'impostazione di questa impostazione troppo elevata perché occupa memoria nella macchina virtuale. La quantità di memoria consentita dalla JVM nelle mappe di memoria è determinata dall'impostazione MaxDirectMemory. Il valore predefinito è 64 MB. È possibile che questo venga raggiunto. È possibile aumentare questo valore aggiungendo -XX:MaxDirectMemorySize=amount of memory used le impostazioni JVM tramite Ambari. Essere consapevoli della quantità di memoria usata nel nodo e, se è disponibile ram sufficiente per supportare questo problema.

Passaggi successivi