Apache Kafka HDInsight クラスターのパフォーマンスの最適化

この記事では、HDInsight で Apache Kafka のワークロードのパフォーマンスを最適化するためのいくつかの提案を示します。 プロデューサー、ブローカー、コンシューマーの構成を調整することに焦点を当てています。 場合によっては、負荷の高いワークロードでパフォーマンスを調整するために OS 設定を調整する必要があります。 パフォーマンスを測定するさまざまな方法があり、適用する最適化はビジネス ニーズによって異なります。

アーキテクチャの概要

Kafka トピックを使用して、レコードを整理します。 プロデューサーによってレコードが生成され、コンシューマーによって使用されます。 プロデューサーは Kafka ブローカーにレコードを送信し、それからデータが格納されます。 HDInsight クラスターの各ワーカー ノードが、Kafka のブローカーです。

トピックは、ブローカー間でレコードを分割します。 レコードの使用時に、パーティションあたり最大 1 つのコンシューマーを使用して、データの並列処理を実現できます。

レプリケーションを使用して、ノード間でパーティションを複製します。 このパーティションにより、ノード (ブローカー) が停止しても保護されます。 レプリカのグループ間で、1 つのパーティションがパーティション リーダーとして指定されます。 プロデューサー トラフィックは、ZooKeeper によって管理された状態に基づいて、各ノードのリーダーにルーティングされます。

シナリオの特定

Apache Kafka のパフォーマンスには、スループットと待ち時間という 2 つの主要な側面があります。 スループットとは、アプリでデータを処理できる最大速度のことです。 スループットが高ければそれだけ良好です。 待ち時間とは、データの格納または取得にかかる時間のことです。 待ち時間が短ければそれだけ良好です。 スループット、待ち時間、およびアプリケーションのインフラストラクチャのコストの間での適切なバランスを見つけることは、困難な場合があります。 高スループット、短い待ち時間、またはその両方が必要かどうかに基づいて、パフォーマンス要件は次の 3 つ一般的な状況の 1 つに当てはまる必要があります。

  • 高スループット、低待ち時間。 このシナリオでは、高スループットと低待ち時間 (最大 100 ミリ秒) の両方が必要です。 このタイプのアプリケーションは、たとえばサービス可用性監視です。
  • 高スループット、高待ち時間。 このシナリオでは、高スループット (最大 1.5 GBps) が必要ですが、長い待ち時間 (< 250 ミリ秒) を許容できます。 このタイプのアプリケーションの例としては、セキュリティや侵入検出アプリケーションなどのほぼリアルタイム プロセスのテレメトリ データ インジェストがあります。
  • 低スループット、低待ち時間。 このシナリオでは、リアルタイム処理に対して低待ち時間 (< 10 ミリ秒) が必要ですが、低スループットを許容できます。 このタイプのアプリケーションの例としては、オンラインのスペル チェックと文法チェックがあります。

プロデューサーの構成

次のセクションでは、Kafka プロデューサーのパフォーマンスを最適化するために、いくつかの最も重要な一般構成プロパティに注目します。 すべての構成プロパティの詳細な説明については、プロデューサーの構成に関する Apache Kafka のドキュメントを参照してください。

バッチ サイズ

Apache Kafka のプロデューサーは、単一のストレージ パーティションに格納される 1 単位として送信される、メッセージのグループ (バッチと呼ばれる) をアセンブルします。 バッチ サイズは、そのグループを送信する前になければならないバイト数を意味します。 batch.size パラメーターを増やすと、ネットワークと IO 要求からのオーバーヘッドの処理が減るため、スループットを向上させることができます。 負荷が低く、バッチ サイズが大きくなると、プロデューサーはバッチの準備が完了するのを待機するため、Kafka の送信待ち時間が増える可能性があります。 負荷が高い場合は、スループットを向上させて待ち時間を減らすために、バッチ サイズを増やすことをお勧めします。

プロデューサーが必要な確認

プロデューサーが必要な acks 構成では、書き込み要求が完了したと見なされる前に、パーティション リーダーによって要求される確認の数を判別します。 この設定はデータの信頼性に影響し、01、または-1 の値を取ります。 値 -1 は、書き込みが完了する前に、確認をすべてのレプリカから受け取る必要があることを意味します。 acks = -1 を設定すると、データ損失に対する保証は高くなりますが、待ち時間が長くなりスループットが低下することにもなります。 アプリケーションの要件によりさらに高いスループットが求められる場合は、acks = 0 または acks = 1 の設定を試みてください。 一部のレプリカを確認しないと、データの信頼性が低くなる可能性があることに留意してください。

[圧縮]

Kafka プロデューサーは、メッセージをブローカーに送信する前に圧縮するように構成できます。 compression.type 設定は、使用する圧縮コーデックを指定します。 サポートされている圧縮コーデックは、「gzip」、「snappy」、「lz4」です。 圧縮には利点があり、ディスク容量の制限がある場合には考慮する必要があります。

2 つの一般的に使用される圧縮コーデック gzipsnappy で、gzip のほうが圧縮率が高いため、CPU 負荷は高くなりますがディスク使用量は少なくなります。 snappy コーデックは圧縮率は低く、CPU のオーバーヘッドは少なくて済みます。 ブローカーのディスクまたはプロデューサーの CPU の制限事項に基づいて、使用するコーデックを決定できます。 gzipsnappy に比べて、5 倍の速度でデータを圧縮できます。

データ圧縮により、ディスクに格納できるレコードの数が増加します。 プロデューサーとブローカーによって使用されている圧縮形式の間で不一致がある場合には、CPU のオーバーヘッドが増える可能性もあります。 データは送信する前に圧縮し、処理する前に圧縮を解除する必要があります。

ブローカー設定

次のセクションでは、Kafka ブローカーのパフォーマンスを最適化するための、いくつかの最も重要な設定に注目します。 すべてのブローカーの設定の詳細な説明については、Apache Kafka ドキュメントのブローカー構成に関するページを参照してください。

ディスクの数

ストレージ ディスクの IOPS (1 秒あたりの入出力操作) および 1 秒あたりの読み取り/書き込みバイト数が限られています。 新しいパーティションを作成する際に、Kafka は、使用可能なディスク間で負荷を分散するために、既存のパーティションが最も少ないディスクに新しい各パーティションを格納します。 ストレージ戦略にも関わらず、各ディスク上で何百ものパーティションのレプリカを処理するときに、Kafka は使用可能なディスク スループットを容易に飽和させてしまう可能性があります。 ここでのトレードオフは、スループットとコストでの間のものです。 アプリケーションがより高いスループットを必要とする場合は、ブローカーごとにより多くのマネージド ディスクを持つクラスターを作成します。 HDInsight では、実行中のクラスターへのマネージド ディスクの追加は現在サポートしていません。 マネージド ディスクの数を構成する方法の詳細については、「HDInsight 上の Apache Kafka 用に記憶域とスケーラビリティを構成する」を参照してください。 クラスター内のノードのストレージ スペースを増やすことでコストがどのような影響を受けるかを理解します。

トピックとパーティションの数。

Kafka のプロデューサーは、トピックに書き込みます。 Kafka のコンシューマーは、トピックから読み取ります。 トピックは、ディスク上のデータ構造であるログに関連付けられます。 Kafka は、プロデューサーからのレコードをトピック ログの末尾に追加します。 トピック ログは、複数ファイルに分散する多数のパーティションで構成されます。 これらのファイルは、同様に、複数の Kafka クラスター ノードに分散します。 コンシューマーは各自の間隔で Kafka トピックから読み取り、トピック ログ内で位置 (オフセット) を選択できます。

Kafka の各パーティションはシステム上のログ ファイルであり、プロデューサー スレッドは同時に複数のログに書き込むことができます。 同様に、各コンシューマー スレッドはメッセージを 1 つのパーティションから読み取るので、複数のパーティションからの使用が並列で処理されます。

パーティション密度 (ブローカーごとのパーティションの数) を増やすことで、メタデータ操作に関連し、パーティションのリーダーとそのフォロワーの間のパーティション要求/応答ごとのオーバーヘッドが追加されます。 まだ流れるデータがない場合であっても、パーティションのレプリカは引き続きリーダーからデータをフェッチします。これによりネットワーク経由での要求の送受信のための追加の処理が発生します。

HDInsight の Apache Kafka クラスター 2.1、2.4、および前述のものでは、レプリカを含め、ブローカーごとに最大 2,000 個のパーティションを持つことをお勧めします。 ブローカーごとのパーティションの数を増やすと、スループットが低下し、トピックが使用不可になる場合もあります。 Kafka パーティションのサポートの詳細については、バージョン 1.1.0 でサポートされるパーティションの数の増加に関する公式の Apache Kafka ブログの投稿を参照してください。 トピックの変更の詳細については、「Apache Kafka: modifying topics」(Apache Kafka: トピックを変更する) を参照してください。

レプリカの数

高いレプリケーション係数により、パーティションのリーダーとフォロワーの間で追加の要求が発生します。 結果として、高いレプリケーション係数によりさらに多くのディスクと CPU が追加の要求の処理に使用されるため、書き込みの待ち時間は長くなりスループットは低下します。

Azure HDInsight では、Kafka には、少なくとも 3 倍のレプリケーションを使用することをお勧めします。 ほとんどの Azure リージョンは 3 つの障害ドメインがありますが、2 つの障害ドメインしかないリージョンでは、ユーザーは 4 倍のレプリケーションを使用する必要があります。

レプリケーションの詳細については、「Apache Kafka: レプリケーション」と「Apache Kafka: レプリケーション係数を増やす」を参照してください。

コンシューマー構成

次のセクションでは、Kafka コンシューマーのパフォーマンスを最適化するためのいくつかの重要な一般構成について注目します。 すべての構成の詳細な説明は、コンシューマー構成に関する Apache Kafka のドキュメントを参照してください。

コンシューマーの数

パーティションの数をコンシューマーの数と同じに設定するのが良い方法です。 コンシューマーの数がパーティションの数より少ない場合、一部のコンシューマーは複数のパーティションから読み取りを行い、コンシューマーの待機時間が延びます。

コンシューマーの数がパーティションの数を超える場合、コンシューマーはアイドル状態になるため、コンシューマー リソースを無駄に消費します。

コンシューマーの頻繁な再調整を避ける

コンシューマーの再調整は、パーティションの所有権の変更 (コンシューマーのスケール アウトまたはスケール ダウンなど)、ブローカーのクラッシュ (ブローカーはコンシューマー グループのグループ コーディネーターであるため)、コンシューマー クラッシュ、新しいトピックの追加、または新しいパーティションの追加によってトリガーされます。 再最適化中は、コンシューマーが使用できないので、待機時間が延びます。

コンシューマーは、session.timeout.ms 内のブローカーにハートビートを送信できる場合、生きていると見なされます。 それ以外の場合、コンシューマーは使用できないまたは、失敗と見なされます。 この遅延は、コンシューマーの再調整につながります。 コンシューマー session.timeout.ms が低くなると、これらのエラーを検出する速度が速くなります。

session.timeout.ms が低すぎると、メッセージのバッチの処理に時間がかかる、または JVM GC の一時停止に時間がかかるなどのシナリオのために、コンシューマーが不要な再調整を繰り返す可能性があります。 メッセージの処理に多くの時間を費やすコンシューマーがある場合は、max.poll.interval.ms でコンシューマーがより多くのレコードを取得するまでにアイドル状態でいられる時間の上限を増やすか、設定パラメーター max.poll.records で返されるバッチの最大サイズを小さくすることで対処できます。

バッチ処理

プロデューサーと同様に、コンシューマーのバッチ処理を追加できます。 各フェッチ要求でコンシューマーが取得できるデータ量は、構成 fetch.min.bytes を変更することで構成できます。 このパラメーターは、コンシューマーのフェッチ応答から予想される最小バイト数を定義します。 この値を大きくすると、ブローカーに対して行われたフェッチ要求の数が減り、余分なオーバーヘッドが削減されます。 既定では、この値は 1 です。 同様に、別の構成 fetch.max.wait.ms があります。 フェッチ要求に fetch.min.bytes のサイズに応じた十分なメッセージがない場合は、この構成 fetch.max.wait.ms に基づいて待機時間の有効期限が切れるまで待機します。

Note

一部のシナリオでは、コンシューマーがメッセージの処理に失敗した場合に、速度が低下しているように見える場合があります。 例外発生後にオフセットをコミットしていない場合、コンシューマーは特定のオフセットで無限ループに陥り、先に進めなくなり、結果としてコンシューマー側のラグが増加します。

負荷の高いワークロードでの Linux OS チューニング

メモリ マップ

vm.max_map_count は、プロセスが持つ mmap の最大数を定義します。 既定では、HDInsight Apache Kafka クラスター Linux VM で値は 65535 です。

Apache Kafka では、各ログ セグメントには index/timeindex ファイルのペアが必要であり、これらのファイルはそれぞれ 1 mmap を消費します。 つまり、各ログ セグメントは 2 mmap を使用します。 したがって、各パーティションが 1 つのログ セグメントをホストする場合は、最小で 2 mmap が必要です。 パーティションごとのログ セグメント数は、セグメント サイズ、負荷の強さ、保持ポリシー、ローリング期間によって異なり、一般的には 1 つ以上になる傾向があります。 Mmap value = 2*((partition size)/(segment size))*(partitions)

要求された mmap の値が vm.max_map_count を超える場合、ブローカーは「Map failed」の例外を発生させます。

この例外を回避するには、次のコマンドを使用して vm の mmap のサイズを確認し、各ワーカー ノードで必要に応じてサイズを増やします。

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

注意

VM 上のメモリが必要になるので、この設定が高すぎる場合は注意してください。 JVM がメモリ マップで使用できるメモリの量は、MaxDirectMemory の設定によって決まります。 既定値は 64 MB です。 これは到達している可能性があります。 この値を増やすには、Ambari を使用して JVM 設定に -XX:MaxDirectMemorySize=amount of memory used を追加します。 ノードで使用されているメモリの量と、それをサポートするのに十分な空き RAM があるかどうかを確認してください。

次のステップ