Kafka istemcisi OffsetOutOfRangeException ile sonlandırıldı

Sorun

Bir hata iletisiyle sonlandırıldığı zaman bir Apache Kafka kaynağından iletileri getirmeye çalışan bir Apache Spark uygulamanız vardır kafkashaded.org.apache.kafka.clients.consumer.OffsetOutOfRangeException .

Nedeni

Spark uygulamanız, Kafka ' den geçen süre dolma veri farklarını getirmeye çalışıyor.

Bunu genellikle bu iki senaryoda görtik:

1. Senaryo

Spark uygulaması verileri işlerken sonlandırılır. Spark uygulaması yeniden başlatıldığında, verileri önceden hesaplanan veri uzaklıkları temelinde getirmeye çalışır. Spark uygulamasının sonlandırıldığı sırada veri uzaklıklardır süresi dolmuşsa, bu sorun oluşabilir.

2. Senaryo

Bekletme ilkeniz, Batch 'in işlenmesi için gereken süreden daha kısa bir süre ayarlanır. Toplu işlemin işlem tamamlandığında, bazı Kafka bölüm uzaklıklarıdır. Uzaklıklar bir sonraki toplu iş için hesaplanır ve süresi dolan uzaklıklar nedeniyle denetim noktası meta verilerinde uyuşmazlık varsa, bu sorun oluşabilir.

Çözüm

Senaryo 1-seçenek 1

Spark uygulamasını yeniden başlatmadan önce mevcut kontrol noktasını silin. Yeni getirilen kaydırın ayrıntılarıyla yeni bir kontrol noktası kayması oluşturulur.

Bu yaklaşımda, uzaklıklardan bazıları Kafka ' de zaman aşımına uğradığından bazı veriler eksik olabilir.

Senaryo 1-seçenek 2

Spark uygulamasının çevrimdışı olduğu zamandan daha uzun olması için konunun Kafka bekletme ilkesini artırın.

Spark uygulaması yeniden başlatılmadan önce hiçbir uzaklık dolmadığından, bu çözümle ilgili hiçbir veri kaçırmadı.

İki tür bekletme ilkesi vardır:

  • Zamana dayalı bekletme-Bu ilke türü, bir günlük segmentinin otomatik olarak silinmeden önce saklanacağı süreyi tanımlar. Tüm konular için varsayılan zaman tabanlı veri saklama penceresi yedi gündür. Daha fazla bilgi için log. bekletme. hours, log. bekletme. minutesve log.retention.MS için Kafka belgelerini gözden geçirebilirsiniz.
  • Boyut tabanlı bekletme-Bu ilke türü, her konu bölümü için günlükte tutulacak veri miktarını tanımlar. Bu sınır, bölüm başına. Bu değer varsayılan olarak sınırsızdır. Daha fazla bilgi için log. bekletme. Bytes için Kafka belgelerini gözden geçirebilirsiniz.

Not

Birden fazla bekletme ilkesi ayarlandıysa, daha kısıtlayıcı bir denetim vardır. Bu, konuya göre geçersiz kılınabilir.

Konu başına geçersiz kılmayı ayarlama hakkında daha fazla bilgi için Kafka 'in Konu düzeyi yapılandırmasını gözden geçirin.

Senaryo 2-seçenek 1

Bölümün bekletme ilkesini artırın. Bu, senaryo 1-seçenek 2 çözümüyle aynı şekilde gerçekleştirilir.

Senaryo 2-seçenek 2

İçin yapılandırarak paralel çalışan sayısını artırın .option("minPartitions",<X>) readStream .

Seçeneği, minPartitions Kafka öğesinden okunacak en az bölüm sayısını tanımlar. Spark, Kafka ' den veri tükettiği durumlarda Spark bölümlerine Kafka konu bölümlerinin bire bir eşlemesini kullanır. Seçeneğini minPartitions Kafka konu bölümlerinizin sayısından daha büyük bir değere ayarlarsanız Spark, Kafka konu bölümlerini daha küçük parçalara ayırır.

Bu seçenek veri eğme, en yüksek yüklerin ve akışlarınızın arkasında kaldığı durumlarda önerilir. Bu değer, her tetikleyicide Kafka tüketicilerinin başlatılmasında varsayılan sonuçlardan daha büyük olarak ayarlanıyor. Bu, Kafka 'e bağlanırken SSL kullanıyorsanız performansı etkileyebilir.