Optimisation des performances pour les clusters Kafka HDInsight Apache

Cet article contient des suggestions pour optimiser les performances de vos charges de travail Apache Kafka dans HDInsight. L’accent est mis sur l’optimisation de la configuration du producteur, du répartiteur et du consommateur. Parfois, vous devez également ajuster les paramètres du système d’exploitation pour optimiser les performances en cas de charge de travail importante. Il existe différentes façons de mesurer les performances, et les optimisations que vous appliquez dépendent de vos besoins métier.

Présentation de l'architecture

Les rubriques Kafka sont utilisées pour organiser des enregistrements. Les producteurs produisent des enregistrements et les consommateurs les consomment. Les producteurs envoient des enregistrements aux répartiteurs Kafka, qui stockent alors les données. Chaque nœud Worker dans votre cluster HDInsight est un répartiteur Kafka.

Les rubriques partitionnent les enregistrements entre les répartiteurs. Au cours de l’utilisation des enregistrements, il est possible d’utiliser jusqu’à un consommateur par partition pour effectuer un traitement parallèle des données.

La réplication est utilisée pour dupliquer des partitions entre les nœuds. Cette partition protège contre les pannes des nœuds (du répartiteur). Une seule partition parmi le groupe de réplicas est désignée comme partition « leader ». Le trafic de producteur est acheminé vers le leader de chaque nœud, en utilisant l’état géré par ZooKeeper.

Identifier votre scénario

Les performances d’Apache Kafka ont deux aspects principaux : le débit et la latence. Le débit est vitesse de traitement maximale des données de votre application. Un débit plus élevé est préférable. La latence est le temps nécessaire pour que les données soient stockées ou récupérées. Une latence plus faible est préférable. Trouver le juste équilibre entre le débit, la latence et le coût de l’infrastructure de l’application peut être difficile. Vos exigences en matière de performances doivent correspondre à une des trois situations courantes suivantes, selon que vous avez besoin d’un débit élevé, d’une latence faible ou des deux :

  • Débit élevé, latence faible. Ce scénario nécessite à la fois un débit élevé et une latence faible (environ 100 millisecondes). La supervision de la disponibilité des services est un exemple de ce type d’application.
  • Débit élevé, latence élevée. Ce scénario nécessite un débit élevé (environ 1,5 Gbits/s), mais peut tolérer une latence plus élevée (< 250 ms). L’ingestion de données de télémétrie pour des processus en quasi temps réel, comme des applications de sécurité et de détection des intrusions, est un exemple de ce type d’application.
  • Débit faible, latence faible. Ce scénario nécessite une latence faible (< 10 ms) pour le traitement en temps réel, mais peut tolérer un débit inférieur. Les vérifications de l’orthographe et de la grammaire en ligne sont un exemple de ce type d’application.

Configurations du producteur

Les sections suivantes sont consacrées à certaines des propriétés génériques de configuration les plus importantes pour optimiser les performances de vos producteurs Kafka. Pour obtenir une explication détaillée de toutes les propriétés de configuration, consultez la documentation Apache Kafka sur les configurations de producteur.

Taille du lot

Les producteurs Apache Kafka assemblent des groupes de messages (appelés lots) qui sont envoyés en tant qu’unité à stocker dans une même partition de stockage. La taille de lot représente le nombre d’octets qui doivent être présents avant que ce groupe soit transmis. Augmenter la valeur du paramètre batch.size peut augmenter le débit, car il réduit la surcharge du traitement due aux demandes du réseau et des E/S. Avec une charge faible, une taille de lot augmentée peut augmenter la latence des envois de Kafka, car le producteur attend qu’un lot soit prêt. Avec une charge importante, il est recommandé d’augmenter la taille de lot pour améliorer le débit et la latence.

Accusés de réception requis par le producteur

La configuration des accusés de réception (acks) requis par le producteur détermine le nombre d’accusés de réception requis par la partition « leader » avant qu’une demande d’écriture soit considérée comme terminée. Ce paramètre affecte la fiabilité des données et il peut prendre les valeurs suivantes : 0, 1 ou -1. La valeur -1 signifie qu’un accusé de réception doit être reçu de tous les réplicas avant que l’écriture soit terminée. Définir cette valeur sur acks = -1 offre de meilleures garanties contre la perte de données, mais elle résulte également en une latence plus élevée et un débit inférieur. Si les spécifications de votre application demandent un débit plus élevé, essayez en définissant acks = 0 ou acks = 1. Gardez à l’esprit que ne pas accuser réception pour tous les réplicas peut réduire la fiabilité des données.

Compression

Un producteur Kafka peut être configuré de façon à compresser les messages avant de les envoyer aux répartiteurs. Le paramètre compression.type spécifie le codec de compression à utiliser. Les codecs de compression pris en charge sont « gzip », « snappy » et « lz4 ». La compression est utile et doit être envisagée si la capacité du disque est limitée.

Parmi les deux codecs de compression couramment utilisés, gzip et snappy, gzip a un taux de compression plus élevé, ce qui entraîne une utilisation inférieure du disque, au prix d’une charge plus élevée pour le processeur. Le codec snappy offre une compression inférieure, avec moins de charge pour le processeur. Vous pouvez décider du codec à utiliser en fonction des limitations du disque du répartiteur ou du processeur du producteur. gzip peut compresser les données à une vitesse cinq fois supérieure à celle de snappy.

La compression des données augmente le nombre d’enregistrements qui peuvent être stockés sur un disque. Elle peut également augmenter la charge du processeur dans les cas où les formats de compression utilisés par le producteur et par le répartiteur ne correspondent pas. Les données doivent en effet être compressées avant l’envoi et décompressées avant leur traitement.

Paramètres du répartiteur

Les sections suivantes sont consacrées à certaines des propriétés les plus importantes pour optimiser les performances de vos répartiteurs Kafka. Pour obtenir une explication détaillée de tous les paramètres des répartiteurs, consultez la documentation Apache Kafka sur les configurations de répartiteur.

Nombre de disques

Les disques de stockage sont sujets à des limitations des IOPS (opérations d’entrée/sortie par seconde) et des octets lus/écrits par seconde. Lors de la création de partitions, Kafka stocke chaque nouvelle partition sur le disque ayant le moins de partitions existantes, de façon à les équilibrer entre les disques disponibles. En dépit de cette stratégie de stockage, lors du traitement de centaines de partitions de réplica sur chaque disque, Kafka peut facilement saturer le débit des disques disponibles. Le compromis se fait ici entre le débit et de coût. Si votre application nécessite un débit supérieur, créer un cluster avec plusieurs disques managés par répartiteur. HDInsight ne prend actuellement pas en charge l’ajout de disques managés à un cluster en cours d’exécution. Pour plus d’informations sur la façon de configurer le nombre de disques managés, consultez Configurer le stockage et la scalabilité pour Apache Kafka sur HDInsight. Vous devez bien comprendre les implications en termes de coût de l’augmentation de l’espace de stockage pour les nœuds de votre cluster.

Nombre de rubriques et de partitions

Les producteurs Kafka écrivent dans des rubriques. Les consommateurs Kafka lisent dans des rubriques. Une rubrique est associée à un journal, qui est une structure de données sur le disque. Kafka ajoute les enregistrements provenant d’un ou plusieurs producteurs à la fin du journal d’une rubrique. Un journal de rubrique se compose de nombreuses partitions qui sont réparties en plusieurs fichiers. Ces fichiers sont à leur tour répartis entre plusieurs nœuds de cluster Kafka. Les consommateurs lisent dans des rubriques Kafka à leur rythme, et ils peuvent choisir leur position (décalage) dans le journal de la rubrique.

Chaque partition Kafka est un fichier journal sur le système, et les threads des producteurs peuvent écrire simultanément dans plusieurs journaux. De même, comme chaque thread de consommateur lit les messages à partir d’une seule partition, la consommation à partir de plusieurs partitions est également gérée en parallèle.

L’augmentation de la densité de partitions (le nombre de partitions par répartiteur) ajoute une charge liée aux opérations de métadonnées et aux demandes/réponses par partition entre la partition « leader » et celles qui la suivent. Même en l’absence de données en transition, les réplicas de partition continuent d’extraire des données des « leaders », ce qui aboutit à un traitement supplémentaire pour envoyer et recevoir des demandes sur le réseau.

Pour les clusters Apache Kafka 2.1, 2.4, comme indiqué précédemment dans HDInsight, nous vous recommandons d’avoir un maximum de 2000 partitions, y compris les réplicas, par répartiteur. L’augmentation du nombre de partitions par répartiteur diminue le débit et peut également entraîner l’indisponibilité de rubriques. Pour plus d’informations sur la prise en charge des partitions dans Kafka, consultez le billet de blog Apache Kafka officiel sur l’augmentation du nombre de partitions prises en charge dans la version 1.1.0. Pour plus d’informations sur la modification des rubriques, consultez Apache Kafka : modification des rubriques.

Nombre de réplicas

Un facteur de réplication plus élevé entraîne des demandes supplémentaires entre la partition « leader » et celles qui la suivent. Par conséquent, un facteur de réplication supérieur consomme plus de disque et de processeur pour gérer les demandes supplémentaires, en augmentant la latence des écritures et en diminuant le débit.

Nous vous recommandons d’utiliser au moins une réplication 3x pour Kafka dans Azure HDInsight. La plupart des régions Azure ont trois domaines d’erreur, mais dans les régions avec seulement deux domaines d’erreur, les utilisateurs doivent utiliser une réplication 4x.

Pour plus d’informations sur la réplication, consultez Apache Kafka : réplication et Apache Kafka : augmentation du facteur de réplication.

Configurations des consommateurs

La section suivante présente certaines configurations génériques importantes permettant d’optimiser les performances de vos consommateurs Kafka. Pour obtenir une explication détaillée de toutes les propriétés de configuration, consultez la documentation Apache Kafka concernant les configurations de producteur.

Nombre de consommateurs

Il est conseillé d’avoir autant de partitions que de consommateurs. Si le nombre de consommateurs est inférieur au nombre de partitions, certains consommateurs lisent les données de plusieurs partitions, ce qui augmente la latence des consommateurs.

Si le nombre de consommateurs est supérieur au nombre de partitions, vous gaspillez les ressources consommateurs, puisque certains consommateurs sont inactifs.

Éviter un rééquilibrage trop fréquent des consommateurs

Le rééquilibrage des consommateurs est déclenché par la modification de la propriété de la partition (c’est-à-dire, en cas de scale-out ou de scale-down des consommateurs), par un incident de répartiteur (puisque les répartiteurs agissent comme des coordinateurs pour les groupes de consommateurs), par un incident de consommateur, par l’ajout d’une nouvelle rubrique ou par l’ajout de nouvelles partitions. Lors du rééquilibrage, les consommateurs ne peuvent pas consommer, ce qui a pour effet d’augmenter la latence.

Les consommateurs sont considérés comme actifs s’ils peuvent envoyer une pulsation à un répartiteur dans le délai défini (session.timeout.ms). Dans le cas contraire, les consommateurs sont considérés comme inactifs ou comme ayant échoué. Ce délai entraîne un rééquilibrage du consommateur. Plus le consommateur session.timeout.ms est faible, plus rapidement nous pouvons détecter ces défaillances.

En revanche, si le session.timeout.ms est trop court, un consommateur peut subir des rééquilibrages non nécessaires de façon répétée, par exemple, si le traitement d’un lot de messages ou le nettoyage de la mémoire d’une machine virtuelle Java prennent plus de temps que prévu. Si un consommateur passe plus de temps que prévu au traitement des messages, vous pouvez augmenter la durée pendant laquelle un consommateur peut être inactif avant de récupérer des enregistrements supplémentaires avec max.poll.interval.ms, ou réduire la taille maximale des lots retournés avec le paramètre de configuration max.poll.records.

Traitement par lot

Comme avec les producteurs, nous pouvons ajouter le traitement par lot pour les consommateurs. La quantité de données que les consommateurs peuvent obtenir dans chaque demande de récupération peut être configurée en modifiant la configuration fetch.min.bytes. Ce paramètre définit le nombre minimal d’octets qui sont attendus de la part d’une réponse de récupération d’un consommateur. L’augmentation de cette valeur réduit le nombre de demandes de récupération effectuées sur le répartiteur, ce qui réduit les frais supplémentaires. Par défaut, cette valeur est définie sur 1. De même, il existe une autre configuration fetch.max.wait.ms. Si une demande de récupération ne dispose pas de suffisamment de messages par rapport à la taille de fetch.min.bytes, elle attend l’expiration du délai d’attente qui est défini dans cette configuration fetch.max.wait.ms.

Remarque

Dans certains scénarios, les consommateurs peuvent sembler lents, alors qu’en réalité, ils ne parviennent pas à traiter les messages. Si vous ne validez pas le décalage après une exception, le consommateur sera bloqué au niveau d’un décalage particulier dans une boucle infinie et ne pourra plus avancer, ce qui aura pour effet d’augmenter le décalage côté client.

Réglage du système d’exploitation Linux en cas de charge de travail importante

Cartes mémoire

vm.max_map_count définit le nombre maximal de mmap qu’un processus peut avoir. Par défaut, sur la machine virtuelle Linux du cluster HDInsight Apache Kafka, la valeur est de 65535.

Dans Apache Kafka, chaque segment de journal nécessite une paire de fichiers index/timeindex, et chacun de ces fichiers consomme un mmap. En d’autres termes, chaque segment de journal utilise deux mmap. Par conséquent, si chaque partition héberge un seul segment de journal, elle nécessite au minimum deux mmap. Le nombre de segments de journal par partition varie en fonction de la taille du segment, de l’intensité de la charge, de la stratégie de conservation et de la période de roulement. En général, une partition comprend plusieurs segments de journal. Mmap value = 2*((partition size)/(segment size))*(partitions)

Si la valeur mmap nécessaire dépasse vm.max_map_count, le répartiteur lève l’exception « Map failed » (Échec de la carte).

Pour éviter cette exception, utilisez les commandes ci-dessous afin de vérifier la taille de mmap sur la machine virtuelle, et augmentez la taille si nécessaire sur chaque nœud Worker.

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

Notes

Veillez à ne pas définir une valeur trop élevée, car cela utilise de la mémoire sur la machine virtuelle. La quantité de mémoire pouvant être utilisée par la machine virtuelle Java sur les cartes mémoire est déterminée par le paramètre MaxDirectMemory. La valeur par défaut est de 64 Mo. Il est possible que cette valeur ait été atteinte. Vous pouvez augmenter cette valeur en ajoutant -XX:MaxDirectMemorySize=amount of memory used aux paramètres de la machine virtuelle Java via Ambari. Vous devez connaître la quantité de mémoire utilisée sur le nœud et savoir si la quantité de mémoire RAM disponible est suffisante.

Étapes suivantes