Kafka クライアントが OffsetOutOfRangeException で終了するKafka client terminated with OffsetOutOfRangeException

問題Problem

Apache Spark アプリケーションが、エラーメッセージで終了したときに Apache Kafka ソースからメッセージをフェッチしようとしてい kafkashaded.org.apache.kafka.clients.consumer.OffsetOutOfRangeException ます。You have an Apache Spark application that is trying to fetch messages from an Apache Kafka source when it is terminated with a kafkashaded.org.apache.kafka.clients.consumer.OffsetOutOfRangeException error message.

原因Cause

Spark アプリケーションが、Kafka から期限切れのデータオフセットをフェッチしようとしています。Your Spark application is trying to fetch expired data offsets from Kafka.

通常、次の2つのシナリオでこれを確認できます。We generally see this in these two scenarios:

シナリオ 1Scenario 1

データの処理中に、Spark アプリケーションが終了します。The Spark application is terminated while processing data. Spark アプリケーションは、再起動すると、以前に計算されたデータオフセットに基づいてデータをフェッチしようとします。When the Spark application is restarted, it tries to fetch data based on previously calculated data offsets. Spark アプリケーションの終了時にいずれかのデータオフセットの有効期限が切れた場合、この問題が発生する可能性があります。If any of the data offsets have expired during the time the Spark application was terminated, this issue can occur.

シナリオ 2Scenario 2

保持ポリシーが、バッチの処理に必要な時間より短い時間に設定されています。Your retention policy is set to a shorter time than the time require to process the batch. バッチの処理が完了するまでに、一部の Kafka パーティションのオフセットの有効期限が切れています。By the time the batch is done processing, some of the Kafka partition offsets have expired. オフセットは次のバッチに対して計算されます。有効期限が切れたオフセットが原因でチェックポイントメタデータに不一致がある場合は、この問題が発生する可能性があります。The offsets are calculated for the next batch, and if there is a mismatch in the checkpoint metadata due to the expired offsets, this issue can occur.

解決策Solution

シナリオ 1-オプション1Scenario 1 - Option 1

Spark アプリケーションを再起動する前に、既存のチェックポイントを削除します。Delete the existing checkpoint before restarting the Spark application. 新しくフェッチされたオフセットの詳細を使用して、新しいチェックポイントオフセットが作成されます。A new checkpoint offset is created with the details of the newly fetched offset.

この方法の欠点は、Kafka でオフセットの有効期限が切れているため、一部のデータが失われる可能性があることです。The downside to this approach is that some of the data may be missed, because the offsets have expired in Kafka.

シナリオ 1-オプション2Scenario 1 - Option 2

トピックの Kafka 保持ポリシーを増やして、Spark アプリケーションがオフラインになる時間より長いようにします。Increase the Kafka retention policy of the topic so that it is longer than the time the Spark application is offline.

このソリューションではデータが欠落していません。これは、Spark アプリケーションを再起動する前にオフセットの有効期限が切れていないためです。No data is missed with this solution, because no offsets have expired before the Spark application is restarted.

保持ポリシーには 2 つの種類があります。There are two types of retention policies:

  • 時間ベースの保有期間-この種類のポリシーは、ログセグメントが自動的に削除されるまでの時間を定義します。Time based retention - This type of policy defines the amount of time to keep a log segment before it is automatically deleted. すべてのトピックの既定の時間ベースのデータリテンション期間は7日間です。The default time based data retention window for all topics is seven days. 詳細については、Kafkaのドキュメントの「 log.retention.ms 」、「リテンション期間 (分)」、および「」を参照してください。You can review the Kafka documentation for log.retention.hours, log.retention.minutes, and log.retention.ms for more information.
  • サイズベースの保有期間-この種類のポリシーは、各トピックのログに保持するデータの量を定義します-パーティション。Size based retention - This type of policy defines the amount of data to retain in the log for each topic-partition. この上限はパーティションごとに制限されます。This limit is per-partition. 既定では、この値は無制限です。This value is unlimited by default. 詳細については、 ログの保存 に関する Kafka のドキュメントを参照してください。You can review the Kafka documentation for log.retention.bytes for more information.

注意

複数の保持ポリシーが設定されている場合、より制限の厳しい1つのコントロールです。If multiple retention policies are set, the more restrictive one controls. これは、トピックごとにオーバーライドできます。This can be overridden on a per topic basis.

トピックごとの上書きを設定する方法の詳細については、「Kafka の トピックレベルの構成」 を参照してください。Review Kafka’s Topic-level configuration for more information on how to set a per topic override.

シナリオ 2-オプション1Scenario 2 - Option 1

パーティションの保持ポリシーを増やします。Increase the retention policy of the partition. これは、シナリオ1オプション2のソリューションと同じ方法で実現されます。This is accomplished in the same way as the solution for Scenario 1 - Option 2.

シナリオ 2-オプション2Scenario 2 - Option 2

にを構成して、並列ワーカーの数を増やし .option("minPartitions",<X>) readStream ます。Increase the number of parallel workers by configuring .option("minPartitions",<X>) for readStream.

オプションは、 minPartitions Kafka から読み取るパーティションの最小数を定義します。The option minPartitions defines the minimum number of partitions to read from Kafka. 既定では、Spark は Kafka からのデータを使用するときに、Kafka トピックパーティションの1対1のマッピングを Spark パーティションに対して使用します。By default, Spark uses a one-to-one mapping of Kafka topic partitions to Spark partitions when consuming data from Kafka. このオプションを minPartitions kafka トピックパーティションの数よりも大きい値に設定すると、Spark は Kafka トピックパーティションをより小さな部分に分割します。If you set the option minPartitions to a value greater than the number of your Kafka topic partitions, Spark separates the Kafka topic partitions into smaller pieces.

このオプションは、データのスキュー、ピーク時の読み込み、およびストリームの遅延が発生した場合に推奨されます。This option is recommended at times of data skew, peak loads, and if your stream is falling behind. この値を既定値より大きく設定すると、各トリガーで Kafka コンシューマーが初期化されます。Setting this value greater than the default results in the initialization of Kafka consumers at each trigger. Kafka への接続時に SSL を使用すると、パフォーマンスが低下する可能性があります。This can impact performance if you use SSL when connecting to Kafka.