Klient Kafka ukončen s OffsetOutOfRangeException

Problém

Máte Apache Spark aplikaci, která se pokouší načíst zprávy ze zdroje Apache Kafka, když je ukončen kafkashaded.org.apache.kafka.clients.consumer.OffsetOutOfRangeException chybovou zprávou.

Příčina

Vaše aplikace Spark se snaží načíst posuny dat s vypršenou platností z Kafka.

Obecně se to zobrazuje v těchto dvou scénářích:

Scénář 1

Aplikace Spark se při zpracování dat ukončí. Po restartování aplikace Spark se pokusí načíst data na základě dříve vypočítaných posunů dat. K tomuto problému může dojít, pokud vypršela platnost některého z posunů dat během doby ukončení aplikace Spark.

Scénář 2

Vaše zásady uchovávání informací jsou nastaveny na kratší dobu, než je doba nutná ke zpracování dávky. V době, kdy byla dávka dokončena, vypršela platnost některých posunů oddílu Kafka. K tomuto problému může dojít, když se vypočítají posunutí pro další dávku a pokud v metadatech kontrolního bodu nedošlo k neshodě z důvodu vypršení platnosti.

Řešení

Scénář 1 – možnost 1

Před restartováním aplikace Spark odstraňte existující kontrolní bod. Vytvoří se nový posun kontrolního bodu s podrobnostmi nově načteného posunu.

Nevýhodou tohoto přístupu je, že některá data mohou být vynechána, protože posunutí vypršela v Kafka.

Scénář 1 – možnost 2

Zvyšte zásady uchovávání informací v Kafka tématu tak, aby byla delší než čas, kdy je aplikace Spark offline.

V tomto řešení neexistují žádná data, protože před restartováním aplikace Spark nevypršela žádná posunutí.

Existují dva typy zásad uchovávání informací:

  • Uchovávání na základě času – tento typ zásad definuje dobu, po kterou se bude segment protokolu uchovávat, než se automaticky odstraní. Výchozí časový interval pro uchovávání dat na základě dat pro všechna témata je sedm dní. Další informace najdete v dokumentaci ke službě Kafka for log. uchování. hours, log. uchování. minuta log.retention.MS .
  • Uchovávání na základě velikosti – tento typ zásad definuje množství dat, která se uchovávají v protokolu pro každé téma – oddíl. Toto omezení je na oddíl. Tato hodnota je ve výchozím nastavení neomezená. Další informace najdete v dokumentaci ke službě Kafka for log. uchování. bajtů .

Poznámka

Pokud je nastavené více zásad uchovávání informací, tím více omezující je jeden ovládací prvek. To může být přepsáno na základě jednotlivých témat.

Další informace o tom, jak nastavit přepsání jednotlivých témat, najdete v části konfigurace na úrovni tématu Kafka.

Scénář 2 – možnost 1

Zvyšte zásady uchovávání informací oddílu. To je provedeno stejným způsobem jako řešení pro scénář 1 – možnost 2.

Scénář 2 – možnost 2

Zvyšte počet paralelních pracovních procesů konfigurací .option("minPartitions",<X>) pro readStream .

Možnost minPartitions definuje minimální počet oddílů pro čtení z Kafka. Ve výchozím nastavení používá Spark při využívání dat z Kafka mapování oddílů tématu Kafka na oddíly Sparku. Pokud nastavíte možnost minPartitions na hodnotu vyšší, než je počet oddílů tématu Kafka, Spark oddělí oddíly tématu Kafka na menší části.

Tato možnost se doporučuje v časech, kdy dojde k překrytí dat, špičky zatížení a v případě, že je váš datový proud padající za. Nastavení této hodnoty větší než výchozí vede k inicializaci Kafkach uživatelů při každém triggeru. To může mít vliv na výkon, pokud při připojování k Kafka používáte protokol SSL.