Hatalı kayıtları ve dosyaları işleme

Dosya tabanlı bir veri kaynağından veri okurken, Apache Spark SQL iki tipik hatayla karşı karşıya olur. İlk olarak, dosyalar okunabilir (örneğin, eksik, erişilemez veya bozuk olabilir) olabilir. İkincisi, dosyalar işlenebilir olsa bile bazı kayıtlar ayrıştırılabilir (örneğin, söz dizimi hataları ve şema eşleşmesi nedeniyle) olabilir.

Azure Databricks Spark işlerini kesintiye uğratmadan hatalı kayıtları ve dosyaları işlemeye yaraya birleştirilmiş bir arabirim sağlar. Veri kaynağı seçeneğini ayarerek özel durum günlüklerinden özel durum kayıtlarını/dosyalarını ve nedenlerini elde badRecordsPath edinebilirsiniz. badRecordsPath CSV ve JSON kaynakları için hatalı kayıtlar ve tüm dosya tabanlı yerleşik kaynaklar için hatalı dosyalar (örneğin, Parquet) hakkında bilgi kaydetmeye ilişkin özel durum dosyalarını depolamak için bir yol belirtir.

Ayrıca, dosyaları okurken ağ bağlantısı özel durumu, IO özel durumu gibi geçici hatalar oluşabilir. Bu hatalar yoksayılır ve ayrıca altına kaydedilir badRecordsPath ve Spark görevleri çalıştırmaya devam eder.

Not

Dosya badRecordsPath tabanlı bir veri kaynağında seçeneğini kullanmanın birkaç önemli sınırlaması vardır:

  • İşlemsel olmayan bir işlemdir ve tutarsız sonuçlara neden olabilir.
  • Geçici hatalar hata olarak kabul edilir.

Örnekler

Giriş dosyası bulunamıyor

val df = spark.read
  .option("badRecordsPath", "/tmp/badRecordsPath")
  .parquet("/input/parquetFile")

// Delete the input parquet file '/input/parquetFile'
dbutils.fs.rm("/input/parquetFile")

df.show()

Yukarıdaki örnekte, giriş dosyası bulunamadığından Spark, hatayı kaydetmek için df.show() JSON biçiminde bir özel durum dosyası oluşturur. Örneğin, /tmp/badRecordsPath/20170724T101153/bad_files/xyz özel durum dosyasının yoludur. Bu dosya belirtilen dizinin badRecordsPath altında, /tmp/badRecordsPath . 20170724T101153 , bunun oluşturma DataFrameReader zamanıdır. bad_files , özel durum t type'dır. xyz , hatalı dosyanın yoluna ve özel durum/neden iletisine sahip bir JSON kaydı içeren dosyadır.

Giriş dosyası hatalı kayıt içeriyor

// Creates a json file containing both parsable and corrupted records
Seq("""{"a": 1, "b": 2}""", """{bad-record""").toDF().write.text("/tmp/input/jsonFile")

val df = spark.read
  .option("badRecordsPath", "/tmp/badRecordsPath")
  .schema("a int, b int")
  .json("/tmp/input/jsonFile")

df.show()

Bu örnekte DataFrame yalnızca ilk ayrıştırılabilir kaydı () {"a": 1, "b": 2} içerir. İkinci hatalı kayıt ( {bad-record ) içinde bulunan bir JSON dosyası olan özel durum dosyasına /tmp/badRecordsPath/20170724T114715/bad_records/xyz kaydedilir. Özel durum dosyası hatalı kaydı, kaydı içeren dosyanın yolunu ve özel durum/neden iletiyi içerir. Özel durum dosyalarını bulamaktan sonra, bunları işlemesi için bir JSON okuyucusu kullanabilirsiniz.