HDInsight と Azure Data Lake Storage Gen1 上の Storm に対するパフォーマンス チューニング ガイダンス

Azure Storm トポロジのパフォーマンスを調整する際に考慮すべき要素について説明します。 たとえば、スパウトとボルトによる処理 (I/O とメモリのどちらを大量に消費する場合でも) の特性を理解しておくことが重要です。 この記事では、一般的な問題のトラブルシューティングを含む、さまざまなパフォーマンス チューニング ガイドラインについて説明します。

前提条件

トポロジの並列処理のチューニング

Data Lake Storage Gen1 との間で I/O のコンカレンシーを高めることで、パフォーマンスを向上できる場合があります。 Storm トポロジでは、並列処理を決定する一連の設定が用意されています。

  • ワーカー プロセスの数 (ワーカーは VM 間で均等に分散されます)。
  • スパウトの Executor インスタンスの数。
  • ボルトの Executor インスタンスの数。
  • スパウト タスクの数。
  • ボルト タスクの数。

たとえば、クラスターに 4 つの VM と 4 つのワーカー プロセス、32 個のスパウトの Executor と 32 個のスパウト タスク、256 個のボルトの Executor と 512 個のボルト タスクがある場合は、次のことを考慮してください。

ワーカー ノードである各スーパーバイザーでは、単一のワーカー Java 仮想マシン (JVM) プロセスが使用されています。 この JVM プロセスでは、4 つのスパウト スレッドと 64 個のボルト スレッドが管理されます。 各スレッド内では、タスクが順番に実行されます。 前述の構成では、各スパウト スレッドに 1 つのタスク、各ボルト スレッドに 2 つのタスクがあります。

Storm に関連するさまざまなコンポーネントと、それらが並列処理のレベルに与える影響は次のとおりです。

  • ヘッド ノード (Storm では Nimbus と呼ばれます) は、ジョブの送信と管理に使用されます。 これらのノードは、並列処理の次数に影響を与えません。
  • スーパーバイザー ノード。 HDInsight では、ワーカー ノードの Azure VM に対応します。
  • ワーカー タスクは、VM で実行される Storm のプロセスです。 各ワーカー タスクは、JVM インスタンスに対応します。 Storm は、指定したワーカー プロセスの数ができる限り均等になるようにワーカー ノードに分散させます。
  • スパウトとボルトの Executor インスタンス。 各 Executor インスタンスは、ワーカー (JVM) 内で実行されるスレッドに対応します。
  • Storm のタスク。 各スレッドで実行される論理タスクです。 これによって並列処理のレベルが変わることはないので、Executor ごとに複数のタスクが必要かどうかを評価する必要があります。

Data Lake Storage Gen1 でパフォーマンスを最大限に高める

Data Lake Storage Gen1 を使用する場合、次の方法でパフォーマンスを最大限高めることができます。

  • 小さなサイズの追加データを大きなサイズのデータに結合する (4 MB が理想的)。
  • 同時要求をできる限り多く実行する。 各ボルト スレッドではブロッキング読み取りが行われるため、コアごとに 8 ~ 12 個のスレッドが必要です。 これにより、NIC と CPU の使用率が適切に保たれます。 VM が大きくなると、より多くの同時要求が可能になります。

トポロジの例

D13v2 Azure VM を含む 8 つのワーカー ノードから成るクラスターがあるとします。 この VM には 8 コアがあるため、8 つのワーカー ノードで合計 64 個のコアがあります。

たとえば、コアあたり 8 つのボルト スレッドを実行したとします。 64 個のコアがあるため、合計 512 個のボルトの Executor インスタンス (つまりスレッド) が必要になります。 この場合、たとえば、VM ごとに 1 つの JVM を起動し、主に JVM 内でスレッドのコンカレンシーを使用することで、コンカレンシーを実現します。 つまり、8 つのワーカー タスク (Azure VM ごとに 1 つずつ) と 512 個のボルトの Executor が必要です。 この構成下では、Storm は各ワーカー ノードに 1 つの JVM を提供しながら、ワーカーをワーカー ノード (別名スーパーバイザー ノード) 間で均等に分散しようとします。 スーパーバイザー内では、Storm は各スーパーバイザー (つまり JVM) にスレッドを 8 つずつ提供しながら、Executor をスーパーバイザー間で均等に分散させようとします。

追加パラメーターのチューニング

基本的なトポロジを作成したら、次のパラメーターをチューニングするかどうかを検討できます。

  • ワーカー ノードごとの JVM の数。 大規模なデータ構造 (ルックアップ テーブルなど) をメモリ内でホストしている場合、JVM ごとに個別のコピーが必要になります。 また、JVM の数が少ない場合は、データ構造を多くのスレッド間で使用できます。 JVM の数を変更しても、ボルトの I/O には、これらの JVM 間で追加されたスレッドの数ほどの大きな変化は発生しません。 わかりやすくするために、JVM はワーカーごとに 1 つにしておくことをお勧めします。 ただし、ボルトの処理内容や必要なアプリケーション処理によっては、この数を変更することが必要になる場合があります。
  • スパウトの Executor の数。 前の例では Data Lake Storage Gen1 への書き込みにボルトを使用しているため、スパウトの数は、ボルトのパフォーマンスに直接には関連しません。 ただし、スパウトで発生する処理または I/O の量によっては、最良のパフォーマンスを得るためにスパウトをチューニングすることをお勧めします。 ボルトをビジー状態に保つために十分なスパウトがあることを確認してください。 スパウトの出力レートはボルトのスループットと一致する必要があります。 実際の構成は、スパウトによって異なります。
  • タスクの数。 各ボルトは 1 つのスレッドとして実行されます。 ボルトごとにタスクを追加しても、コンカレンシー数が増えることはありません。 タスクを追加する利点があるのは、タプルの受信確認処理に要する時間がボルトの実行時間の大部分を占める場合のみです。 ボルトから受信確認を送信する前に、数が多いタプルを大きな追加データとしてグループ化することをお勧めします。 したがって、ほとんどの場合、タスクを増やす利点はありません。
  • ローカルまたはシャッフル グループ化。 この設定を有効にすると、タプルは同じワーカー プロセス内のボルトに送信されます。 これにより、プロセス間での通信とネットワーク呼び出しが減少します。 ほとんどのトポロジにおいてこれをお勧めします。

この基本的なシナリオは、出発点として適しています。 最適なパフォーマンスを実現するために、独自のデータでテストして上記のパラメーターを調整してください。

スパウトのチューニング

次の設定を変更してスパウトを調整できます。

  • タプルのタイムアウト: topology.message.timeout.secs。 この設定は、メッセージの完了と受信確認の受信にどれだけの時間がかかったら失敗と見なすかを決定します。

  • ワーカー プロセスごとの最大メモリ: worker.childopts。 この設定では、Java ワーカーに追加のコマンド ライン パラメーターを指定できます。 ここで最もよく使用される設定は、JVM ヒープに割り当てられた最大メモリを決定する XmX です。

  • スパウトでの保留の最大数: topology.max.spout.pending。 この設定は、任意の時点でフライト状態 (トポロジのどのノードでも受信確認されていない状態) でいられる、スパウト スレッドあたりのタプルの数を決定します。

    適切な計算を行うには、まず各タプルのサイズを見積もります。 次に、1 つのスパウト スレッドにどれだけのメモリがあるかを確認します。 1 つのスレッドに割り当てられたメモリの合計をこの値で割ると、スパウトでの保留パラメーターの最大値が得られます。

ボルトのチューニング

Data Lake Storage Gen1 に書き込む場合は、サイズ同期ポリシー (クライアント側のバッファー) を 4 MB に設定します。 これにより、フラッシュまたは hsync() は、バッファーのサイズがこの値である場合にのみ実行されるようになります。 hsync() を明示的に実行しない限り、ワーカー VM 上の Data Lake Storage Gen1 ドライバーはこのバッファー処理を自動的に行います。

既定の Data Lake Storage Gen1 Storm ボルトには、このパラメーターのチューニングに使用できるサイズ同期ポリシー パラメーター (fileBufferSize) が用意されています。

I/O 集中型のトポロジでは、各ボルト スレッドによって独自のファイルへの書き込みとファイルのローテーション ポリシー (fileRotationSize) の設定が行われるようにすることをお勧めします。 ファイルが特定のサイズに達すると、ストリームは自動的にフラッシュされ、新しいファイルに書き込まれます。 ローテーションにあたってのファイルの推奨サイズは 1 GB です。

タプル データの処理

Storm では、タプルがボルトによって明示的に受信確認されるまで、スパウトがそのタプルを保持し続けます。 タプルがボルトによって読み取られた場合でも、まだ受信確認されていない場合は、スパウトが Data Lake Storage Gen1 のバックエンドへの保持の処理を行っていない可能性があります。 タプルが受信確認されると、ボルトによる保持がスパウトに保証されるので、スパウトは読み取り元にかかわらずソース データを削除できます。

Data Lake Storage Gen1 のパフォーマンスを最大化するには、ボルトによって 4 MB のタプル データがバッファー処理されるようにします。 その後、Data Lake Storage Gen1 バックエンドへの書き込みを 1 つの 4 MB の書き込みとして実行します。 (hflush() の呼び出しによって) データがストアに正常に書き込まれると、ボルトはスパウトにデータの受信確認を送信できます。 これが、ここで解説されているサンプルのボルトが行っている処理です。 hflush() の呼び出しとタプルの受信確認までに保持するタプルの数を増やすこともできます。 ただし、この場合、スパウトで保持する必要があるフライト中のタプルの数が増えるので、JVM ごとに必要なメモリの量は増加します。

Note

アプリケーションによっては、パフォーマンス以外の他の理由から、タプルの受信確認をより頻繁に行わなければならない場合があります (データのサイズが 4 MB 未満)。 ただし、これによってストレージ バックエンドへの I/O のスループットに影響が及ぶ可能性があります。 ボルトの I/O のパフォーマンスとのトレードオフを慎重に評価する必要があります。

タプルの着信レートが高くないために 4 MB のバッファー入力に長い時間がかかる場合は、次の方法で改善を試みることができます。

  • ボルトの数を減らすことによって、バッファーの数を減らす。
  • 時間ベースまたはカウント ベースのポリシーを設定する。このポリシーでは x 回目のフラッシュごとまたは y ミリ秒ごとに hflush() がトリガーされ、これまで蓄積されたタプルについては受信確認が送信されます。

この場合のスループットは比較的低いものの、イベントのレートが低いため、スループットの最大化は最大の目的ではありません。 上記の対応策により、タプルがストアに流れるまでの合計時間を短縮することができます。 これは、イベント レートが低下するとしてもリアルタイム パイプラインを希望する場合に重要になります。 また、着信タプル レートが低い場合は、タプルがバッファー処理または処理されている間にタイムアウトしないように topology.message.timeout_secs パラメーターを調整する必要があることに注意してください。

Storm でのトポロジの監視

実行中のトポロジは、Storm ユーザー インターフェイスで監視することができます。 確認する必要がある主なパラメーターは次のとおりです。

  • プロセスの実行における待ち時間の合計。 1 つのタプルがスパウトによって出力され、ボルトによって処理されて、受信確認されるまでの平均時間です。

  • ボルト プロセスの待ち時間の合計。 ボルトのタプルが受信確認を受信するまでの平均時間です。

  • ボルトの execute の待ち時間の合計。 ボルトの execute メソッドにかかった平均時間です。

  • 失敗の数 タイムアウトするまでに完全に処理できなかったタプルの数を示します。

  • 容量。 システムがどれだけビジーな状態であるかを測る目安です。 この数が 1 である場合、ボルトは最高速度で動作しています。 1 未満である場合は、並列処理を増やします。 1 より大きい場合は、並列処理を減らします。

一般的な問題のトラブルシューティング

ここでは、一般的なトラブルシューティングのシナリオをいくつか紹介します。

  • タイムアウトするタプルの数が多い。 トポロジの各ノードを確認して、ボトルネックの場所を特定します。 この問題の最も一般的な原因として、ボルトがスパウトに追いつくことができないことがあります。 その結果、処理を待機しているタプルで内部バッファーがいっぱいになります。 タイムアウト値を引き上げるか、スパウトでの保留の最大数を引き下げることを検討してください。

  • プロセスの実行における待ち時間の合計が長いが、ボルトでのプロセスの待ち時間は短い。 この場合、いずれかのタプルが十分な速さで処理されていない可能性があります。 十分な数の受信確認があることを確認してください。 考えられる別の原因としては、ボルトが処理を開始するまでにキューで待機する時間が長すぎることが挙げられます。 スパウトでの保留の最大数を減らしてください。

  • ボルトの execute の待ち時間が長い。 これは、ボルトの execute() メソッドに時間がかかりすぎていることを示します。 コードを最適化するか、書き込みのサイズとフラッシュの動作を確認してください。

Data Lake Storage Gen1 の調整

Data Lake Storage Gen1 によって提供される帯域幅の制限に達した場合は、タスクが失敗する可能性があります。 タスク ログで調整エラーを確認してください。 コンテナーのサイズを増やすことで、並列処理を減らせます。

調整されているかどうかを確認するには、クライアント側でデバッグ ログを有効にしてください。

  1. [Ambari]>[Storm]>[構成]>[Advanced storm-worker-log4j (storm-worker-log4j の詳細)] の順に移動し、 <root level="info"><root level="debug"> に変更します。 すべてのノードとサービスを再起動して構成を有効にします。
  2. ワーカー ノード (/var/log/storm/worker-artifacts/<TopologyName>/<ポート>/worker.log の直下) の Storm トポロジ ログで、Data Lake Storage Gen1 調整の例外を監視します。

次のステップ

Storm におけるその他のパフォーマンスのチューニングについては、こちらのブログを参照してください。

実行できる他のサンプルについては、GitHub のこちらのサンプルを参照してください。