Share via


Leistungsoptimierung für Apache Kafka HDInsight-Cluster

Dieser Artikel enthält Vorschläge zum Optimieren der Leistung Ihrer Apache Kafka-Workloads in HDInsight. Der Schwerpunkt liegt auf dem Anpassen der Producer-, Broker- und Consumerkonfiguration. Manchmal müssen Sie auch Betriebssystemeinstellungen anpassen, um die Leistung bei einer hohen Arbeitsauslastung zu optimieren. Es gibt verschiedene Möglichkeiten zum Messen der Leistung. Die Optimierungen, die Sie anwenden, hängen von Ihren geschäftlichen Anforderungen ab.

Übersicht über die Architektur

Kafka-Themen dienen zum Organisieren von Datensätzen. Producer produzieren Datensätze und Verbraucher verbrauchen sie. Producer senden Datensätze an Kafka-Broker, die die Daten anschließend speichern. Jeder Workerknoten in Ihrem HDInsight-Cluster ist ein Kafka-Broker.

Themen partitionieren Datensätze Broker übergreifend. Bei der Nutzung von Datensätzen können Sie bis zu einen Consumer pro Partition einsetzen, um parallele Verarbeitung der Daten zu erzielen.

Replikation kommt zum Einsatz, um Partitionen auf Knoten zu duplizieren. Diese Partition dient als Schutz vor Ausfällen von Knoten (Brokern). Eine einzelne Partition in der Gruppe der Replikate wird als übergeordnete Partition festgelegt. Producer-Datenverkehr wird an die führende Komponente jedes Knotens weitergeleitet, indem der von ZooKeeper verwaltete Zustand verwendet wird.

Identifizieren des Szenarios

Hinsichtlich Leistung von Apache Kafka gibt es zwei Hauptaspekte: Durchsatz und Latenz. Durchsatz ist die maximale Rate, mit der Daten verarbeitet werden können. Höherer Durchsatz ist besser. Latenz ist der Zeitaufwand für die Speicherung oder den Abruf von Daten. Niedrigere Latenz ist besser. Die richtige Balance zwischen Durchsatz, Latenz und den Kosten der Anwendungsinfrastruktur zu finden, kann eine echte Herausforderung sein. Ihre Leistungsanforderungen sollten einer der folgenden drei gängigen Situationen entsprechen, je nachdem, ob Sie hohen Durchsatz, geringe Latenz oder beides benötigen:

  • Hoher Durchsatz/Geringe Latenz. Dieses Szenario erfordert sowohl hohen Durchsatz als auch geringe Latenz (~100 Millisekunden). Ein Beispiel für diesen Anwendungstyp ist die Überwachung der Verfügbarkeit von Diensten.
  • Hoher Durchsatz/Hohe Latenz. Dieses Szenario erfordert einen hohen Durchsatz (~1,5 Gbit/s), toleriert jedoch eine höhere Latenz (< 250 ms). Ein Beispiel für diese Art von Anwendung ist die Erfassung von Telemetriedaten für echtzeitnahe Prozesse wie in Anwendungen für Sicherheit und Angriffserkennung.
  • Niedriger Durchsatz/Geringe Latenz. Dieses Szenario erfordert eine geringe Latenz (< 10 ms), kann jedoch einen niedrigeren Durchsatz tolerieren. Ein Beispiel für diesen Anwendungstyp ist die Onlineprüfung von Rechtschreibung und Grammatik.

Konfigurationen von Producern

In den folgenden Abschnitten werden einige der wichtigsten allgemeinen Konfigurationseigenschaften zur Optimierung der Leistung Ihrer Kafka-Producer vorgestellt. Eine ausführliche Erläuterung aller Konfigurationseigenschaften finden Sie in der Apache Kafka-Dokumentation zu Producerkonfigurationen.

Batchgröße

Apache Kafka-Producer stellen Gruppen von Nachrichten (sog. Batches) zusammen, die als Einheit gesendet und in einer einzigen Speicherpartition gespeichert werden. Die Batchgröße bezeichnet die Anzahl der Bytes, die vorhanden sein muss, ehe diese Gruppe übertragen wird. Die Erhöhung des Parameters batch.size kann den Durchsatz steigern, da der Verarbeitungsaufwand für Netzwerk- und E/A-Anforderungen reduziert wird. Bei geringer Last kann eine erhöhte Batchgröße die Sendelatenz von Kafka erhöhen, da der Producer darauf wartet, dass ein Batch fertig wird. Bei hoher Last wird empfohlen, die Batchgröße zu erhöhen, um Durchsatz und Latenz zu verbessern.

Vom Producer angeforderte Bestätigungen

Die vom Producer geforderte Konfiguration von acks bestimmt die Anzahl der Bestätigungen, die die übergeordnete Partition verlangt, ehe eine Schreibanforderung als abgeschlossen gilt. Diese Einstellung wirkt sich auf die Datenzuverlässigkeit aus und lässt die Werte 0, 1 oder -1 zu. Der Wert -1 bedeutet, dass von allen Replikaten eine Bestätigung empfangen werden muss, bevor der Schreibvorgang abgeschlossen wird. Die Einstellung acks = -1 bietet mehr Garantien gegen Datenverlust, führt aber auch zu mehr Latenz und weniger Durchsatz. Wenn Ihre Anwendung einen höheren Durchsatz erfordert, legen Sie acks = 0 oder acks = 1 fest. Beachten Sie, dass wenn nicht alle Replikate bestätigt werden, die Datenzuverlässigkeit abnehmen kann.

Komprimierung

Ein Kafka-Producer kann so konfiguriert werden, dass Nachrichten komprimiert werden, ehe sie an Broker gesendet werden. Die Einstellung compression.type gibt den Komprimierungscodec an, der verwendet werden soll. Unterstützte Komprimierungscodecs sind „gzip“, „snappy“ und „lz4“. Die Komprimierung ist vorteilhaft und sollte erwogen werden, wenn die Datenträgerkapazität limitiert ist.

Bei den beiden häufig verwendeten Komprimierungscodecs gzip und snappy hat gzip ein höheres Komprimierungsverhältnis, was zu einer geringeren Datenträgerauslastung auf Kosten einer höheren CPU-Last führt. Der Codec snappy bietet weniger Komprimierung bei weniger CPU-Aufwand. Sie können basierend auf den Einschränkungen des Datenträgers des Brokers oder der CPU des Producers entscheiden, welchen Codec Sie verwenden möchten. gzip bietet eine fünfmal höhere Komprimierungsrate als snappy.

Die Datenkomprimierung erhöht die Anzahl der Datensätze, die auf einem Datenträger gespeichert werden können. Sie kann auch den CPU-Aufwand erhöhen, wenn die vom Producer und Broker verwendeten Komprimierungsformate nicht übereinstimmen. Denn die Daten müssen vor dem Senden komprimiert und vor der Verarbeitung dekomprimiert werden.

Brokereinstellungen

In den folgenden Abschnitten werden einige der wichtigsten Einstellungen zur Optimierung der Leistung Ihrer Kafka-Broker vorgestellt. Eine ausführliche Erläuterung aller Brokereinstellungen finden Sie in der Apache Kafka-Dokumentation zu Brokerkonfigurationen.

Anzahl der Datenträger

Speicherdatenträger bieten begrenzte Kapazität für IOPS (Ein-/Ausgabevorgänge pro Sekunde) und Lese-/Schreibvorgänge von Bytes pro Sekunde. Beim Erstellen neuer Partitionen speichert Kafka jede neue Partition auf dem Datenträger mit den wenigsten vorhandenen Partitionen, um sie gleichmäßig auf die verfügbaren Datenträger zu verteilen. Trotz Speicherungsstrategie kann Kafka bei der Verarbeitung von Hunderten von Partitionsreplikaten auf jedem Datenträger den verfügbaren Datenträgerdurchsatz leicht komplett ausschöpfen. Zwischen Durchsatz und Kosten liegt hier der Kompromiss. Wenn Ihre Anwendung mehr Durchsatz erfordert, erstellen Sie einen Cluster mit mehr verwalteten Datenträgern pro Broker. HDInsight unterstützt derzeit nicht das Hinzufügen verwalteter Datenträger zu einem aktiven Cluster. Weitere Informationen zum Konfigurieren der Anzahl verwalteter Datenträger finden Sie unter Konfigurieren von Speicher und Skalierbarkeit von Apache Kafka in HDInsight. Machen Sie sich damit vertraut, wie sich die Vergrößerung des Speicherplatzes für die Knoten in Ihrem Cluster auf die Kosten auswirkt.

Anzahl von Themen und Partitionen

Kafka-Producer schreiben Daten in Themen. Kafka-Consumer lesen Daten aus Themen. Einem Thema ist ein Protokoll zugeordnet, das eine Datenstruktur auf dem Datenträger darstellt. Kafka fügt Datensätze von Producern am Ende eines Themenprotokolls an. Ein Themenprotokoll besteht aus vielen Partitionen, die auf mehrere Dateien verteilt sind. Diese Dateien sind wiederum auf mehrere Kafka-Clusterknoten verteilt. Consumer lesen mit der gewählten Taktung Daten aus Kafka-Themen und können ihre Position (Offset) im Themenprotokoll wählen.

Jede Kafka-Partition ist eine Protokolldatei im System. Producerthreads können gleichzeitig in mehrere Protokolle schreiben. Da jeder Consumerthread Nachrichten aus einer Partition liest, wird auch die Nutzung von Daten aus mehreren Partitionen parallel verarbeitet.

Die Erhöhung der Partitionsdichte (also der Anzahl von Partitionen pro Broker) führt zu mehr Verarbeitungsaufwand im Zusammenhang mit Metadatenvorgängen und pro Partitionsanfordung/-antwort zwischen der übergeordneten Partition und ihren untergeordneten Partitionen. Selbst wenn keine Daten durchfließen, rufen Partitionsreplikate dennoch Daten aus der übergeordneten Partition ab, was zu einer zusätzlichen Verarbeitung von Sende- und Empfangsanforderungen über das Netzwerk führt.

Für Apache Kafka-Cluster (wie zuvor erwähnt) mit den Versionen 2.1 und 2.4 und höher in HDInsight empfehlen wir maximal 2000 Partitionen pro Broker einschließlich Replikaten. Die Erhöhung der Partitionsanzahl pro Broker verringert den Durchsatz und kann auch zur Nichtverfügbarkeit von Themen führen. Weitere Informationen zur Unterstützung von Kafka-Partitionen finden Sie im offiziellen Apache Kafka-Blogbeitrag zur Erhöhung der Anzahl unterstützter Partitionen in Version 1.1.0. Einzelheiten zum Ändern von Themen finden Sie unter Apache Kafka: Modifying topics (Apache Kafka: Ändern von Themen).

Anzahl von Replikaten

Ein höherer Replikationsfaktor führt zu zusätzlichen Anforderungen zwischen der übergeordneten und den untergeordneten Partitionen. Folglich beansprucht ein höherer Replikationsfaktor mehr Speicherplatz und CPU-Leistung, um zusätzliche Anforderungen zu verarbeiten, was die Schreiblatenz erhöht und den Durchsatz verringert.

Wir empfehlen, in Azure HDInsight mindestens die Dreifachreplikation für Kafka zu verwenden. Die meisten Azure-Regionen haben drei Fehlerdomänen, aber in Regionen mit nur zwei Fehlerdomänen sollten Benutzer die Vierfachreplikation verwenden.

Weitere Informationen zur Replikation finden Sie unter Apache Kafka: Replication (Apache Kafka: Replikation) und Apache Kafka: Increasing replication factor (Apache Kafka: Erhöhen des Replikationsfaktors).

Consumerkonfigurationen

Im folgenden Abschnitt werden einige wichtige allgemeine Konfigurationen hervorgehoben, um die Leistung Ihrer Kafka-Consumer zu optimieren. Eine ausführliche Erläuterung aller Konfigurationen finden Sie in der Apache Kafka-Dokumentation zu Consumerkonfigurationen.

Anzahl der Consumer

Es empfiehlt sich, die Anzahl der Partitionen mit der Anzahl der Consumer zu teilen. Wenn die Anzahl der Consumer kleiner als die Anzahl der Partitionen ist, lesen einige der Consumer aus mehreren Partitionen, wodurch sich die Wartezeit der Consumer erhöht.

Wenn die Anzahl der Consumer größer als die Anzahl der Partitionen ist, verschwenden Sie Ihre Consumerressourcen, da diese Consumer nicht genutzt werden.

Vermeiden häufiger Neuausgleiche von Consumern

Die Neuausgleichung der Consumer wird ausgelöst durch eine Änderung der Partitionseigentümerschaft (d. h., die Consumer werden verkleinert oder verkleinert), einen Brokerabsturz (da Broker die Gruppenkoordinatoren für Consumergruppen sind), einen Consumerabsturz, das Hinzufügen eines neuen Themas oder das Hinzufügen neuer Partitionen. Während des Neuausgleichs können die Consumer nicht konsumieren, wodurch sich die Latenzzeit erhöht.

Consumer werden als aktiv betrachtet, wenn sie einen Heartbeat an einen Broker insession.timeout.ms senden können. Andernfalls wird der Consumer als tot oder gescheitert betrachtet. Diese Verzögerung führt zu einem Ausgleich des Verbrauchers. Senken Sie den Verbraucher session.timeout.ms, desto schneller können wir diese Fehler erkennen.

Wenn session.timeout.ms zu niedrig ist, kann es bei einem Consumer zu wiederholten unnötigen Neuausgleichen kommen, z. B. wenn die Verarbeitung eines Nachrichten-Batchs länger dauert oder eine JVM GC-Pause zu lange dauert. Wenn Sie über einen Consumer verfügen, der zu viel Zeit mit der Verarbeitung von Nachrichten verbringt, können Sie dies entweder durch eine Erhöhung der Obergrenze für die Zeit, die ein Consumer im Leerlauf sein kann, bevor er weitere Datensätze abruft, oder max.poll.interval.ms durch eine Verringerung der maximalen Größe der mit dem Konfigurationsparameter zurückgegebenen Batches max.poll.records beheben.

Batchverarbeitung

Wie bei den Producern können wir auch bei den Consumern eine Batchverarbeitung vornehmen. Die Datenmenge, die Consumer in jeder Abrufanforderung abrufen können, kann durch Ändern der Konfiguration fetch.min.bytes konfiguriert werden. Dieser Parameter definiert die minimalen Bytes, die von einer Abrufantwort eines Consumers erwartet werden. Durch Erhöhen dieses Werts wird die Anzahl der Abrufanforderungen an den Broker reduziert, wodurch der zusätzliche Aufwand reduziert wird. Der Standardwert ist 1. Ebenso gibt es eine weitere Konfiguration fetch.max.wait.ms. Wenn eine Abrufanforderung nicht genügend Nachrichten gemäß der Größe von fetch.min.bytes enthält, wartet sie basierend auf dieser Konfiguration fetch.max.wait.ms bis zum Ablauf der Wartezeit.

Hinweis

In einigen Szenarien scheinen Consumer langsam zu sein, wenn die Nachricht nicht verarbeitet werden kann. Wenn Sie den Offset nach einer Ausnahme nicht committen, bleibt der Consumer an einem bestimmten Offset in einer Endlosschleife hängen und bewegt sich nicht vorwärts, wodurch sich die Verzögerung auf Consumerseite erhöht.

Optimierung von Linus OS mit hoher Arbeitsauslastung

Arbeitsspeicherzuordnungen

vm.max_map_count definiert die maximale Anzahl von MMaps, die ein Prozess haben kann. In HDInsight Apache Kafka Linux-Cluster-VM ist der Wert standardmäßig 65535.

In Apache Kafka erfordert jedes Protokollsegment ein Paar von Index-/Zeitindexdateien, und jede dieser Dateien verbraucht ein mmap. Anders ausgedrückt: Jedes Protokollsegment verwendet zwei mmap. Wenn also jede Partition ein einzelnes Protokollsegment hostet, sind mindestens zwei mmap erforderlich. Die Anzahl der Protokollsegmente pro Partition variiert je nach Segmentgröße, Lastintensivität, Aufbewahrungsrichtlinie, fortlaufendem Zeitraum und liegt im Allgemeinen bei mehr als einem Segment. Mmap value = 2*((partition size)/(segment size))*(partitions)

Wenn der erforderliche mmap-Wert vm.max_map_count überschreitet, würde der Broker die Ausnahme „Zuordnung fehlgeschlagen“ auslösen.

Um diese Ausnahme zu vermeiden, verwenden Sie die folgenden Befehle, um die Größe für mmap in vm zu überprüfen und die Größe bei Bedarf auf jedem Workerknoten zu erhöhen.

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

Hinweis

Achten Sie darauf, diese nicht zu hoch festzulegen, da dadurch auf dem VM Arbeitsspeicherplatz beansprucht wird. Die Menge an Arbeitsspeichern, die von JVM auf Arbeitsspeicherzuordnungen verwendet werden darf, wird durch die Einstellung MaxDirectMemory bestimmt. Der Standardwert ist 64MB. Es ist möglich, dass dieser erreicht wird. Sie können diesen Wert erhöhen, indem Sie -XX:MaxDirectMemorySize=amount of memory used den JVM-Einstellungen über Ambari hinzufügen. Achten Sie darauf, wie viel Arbeitsspeicher auf dem Knoten verwendet wird und ob genügend RAM zur Verfügung steht, um dies zu unterstützen.

Nächste Schritte