Работа клиента Kafka завершается исключением OffsetOutOfRangeException

Проблема

У вас есть Apache Spark приложение, которое пытается получить сообщения из источника Apache Kafka, когда оно завершается kafkashaded.org.apache.kafka.clients.consumer.OffsetOutOfRangeException сообщением об ошибке.

Причина

Приложение Spark пытается получить смещения данных с истекшим сроком действия от Kafka.

Как правило, это видно в следующих двух сценариях:

Сценарий 1

Приложение Spark прерывается во время обработки данных. При перезапуске приложения Spark он пытается получить данные на основе ранее вычисленных смещенных данных. Если срок действия какого либо смещения данных истек во время завершения работы приложения Spark, это может привести к возникновению этой проблемы.

Сценарий 2

Для политики хранения задано более короткий интервал времени, чем требуется для обработки пакета. К моменту обработки пакета истек срок действия некоторых смещений секций Kafka. Смещение вычисляется для следующего пакета, и если в метаданных контрольной точки обнаружено несовпадение из-за просроченных смещений, может возникнуть эта проблема.

Решение

Сценарий 1. вариант 1

Удалите существующую контрольную точку перед перезапуском приложения Spark. Создается новое смещение контрольной точки со сведениями о вновь полученном смещении.

Недостаток этого подхода заключается в том, что некоторые данные могут быть пропущены, так как срок действия смещений истек в Kafka.

Сценарий 1. вариант 2

Увеличьте политику хранения Kafka в разделе, чтобы она была длиннее, чем время работы приложения Spark в автономном режиме.

В этом решении отсутствуют данные, так как смещение не истекло до перезапуска приложения Spark.

Политики хранения бывают двух видов.

  • Хранение на основе времени. Этот тип политики определяет период времени, в течение которого будет храниться сегмент журнала, прежде чем он будет автоматически удален. По умолчанию окно хранения данных на основе времени для всех разделов составляет семь дней. Дополнительные сведения см. в документации по Kafka для журнала. retention. Hours, log. retention. минути log.retention.MS .
  • Хранение на основе размера. Этот тип политики определяет объем данных, которые должны храниться в журнале для каждого раздела раздела. Это ограничение для отдельных секций. По умолчанию это значение не ограничено. Дополнительные сведения см. в документации по Kafka для log. retention. bytes .

Примечание

Если задано несколько политик хранения, более с более узким набором элементов управления. Это может быть переопределено для каждого раздела.

Дополнительные сведения о задании переопределения для каждого раздела см. в разделе Конфигурация на уровне раздела Kafka.

Сценарий 2. вариант 1

Увеличьте политику хранения секции. Это выполняется так же, как и решение для сценария 1 — вариант 2.

Сценарий 2. вариант 2

Увеличьте число параллельных рабочих ролей, настроив .option("minPartitions",<X>) для readStream .

Параметр minPartitions определяет минимальное число секций, считываемых из Kafka. По умолчанию Spark использует сопоставление "один к одному" для секций разделов Kafka с секциями Spark при использовании данных из Kafka. Если для параметра задано minPartitions значение, превышающее число разделов раздела Kafka, Spark разделяет разделы Kafka на небольшие части.

Этот параметр рекомендуется использовать в периоды смещения данных, пиковых нагрузок, а также в случае, если ваш поток падает позади. Установка этого значения, превышающего значение по умолчанию, приводит к инициализации потребителей Kafka в каждом триггере. Это может повлиять на производительность, если при подключении к Kafka используется SSL.