Otomatik yükleyicisindeki Şema çıkarımı ve evrimi

Not

JSON biçimi desteği Databricks Runtime 8,2 ve üzeri sürümlerde kullanılabilir; CSV biçimi desteği Databricks Runtime 8,3 ve üzeri sürümlerde kullanılabilir. Her biçim hakkında ayrıntılı bilgi için bkz. veri biçimleri.

Otomatik yükleyici , verilerinize yeni sütunların giriş işlemini otomatik olarak algılayabilir ve yeniden başlatarak şema değişikliklerinin izlenmesini ve işlenmesini kendiniz yönetmek zorunda kalmazsınız. Otomatik yükleyici, bir JSON blob sütunundaki beklenmeyen verileri (örneğin, farklı veri türleri gibi), daha sonra yarı yapılandırılmış veri erişimi API 'lerikullanarak erişmeyi seçebilmeniz için de "kurtarma" işlemi gerçekleştirebilir.

Şema çıkarımı

Şemayı çıkarsmak için otomatik yükleyici, bulduğu ilk 50 GB veya 1000 dosyayı örneklemeli ve öncelikle bu sınır daha önce çapraz olur. Her akış başlangıcında bu çıkarım maliyetini ortadan kaldırmak ve akış yeniden başlatmaları arasında kararlı bir şema sağlayabilmek için seçeneğini ayarlamanız gerekir cloudFiles.schemaLocation . Otomatik yükleyici _schemas , giriş verilerinde zaman içinde şema değişikliklerini izlemek için bu konumda gizli bir dizin oluşturur. Stream cloudFiles , verileri almak için tek bir kaynak içeriyorsa, kontrol noktası konumunu olarak sağlayabilirsiniz cloudFiles.schemaLocation . Aksi takdirde, bu seçenek için benzersiz bir dizin sağlayın. Giriş verileriniz Stream için beklenmeyen bir şema döndürürse, şema konumunuz yalnızca tek bir otomatik yükleyici kaynağı tarafından kullanıldığından emin olun.

Not

Kullanılan örneğin boyutunu değiştirmek için, SQL yapılandırmasını ayarlayabilirsiniz:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(örneğin, bayt dizesi 10gb )

ve

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

gir

Varsayılan olarak, otomatik yükleyici sütunları CSV ve JSON gibi metin tabanlı dosya biçimlerinde sütunlar olarak algılar string . JSON veri kümelerinde, iç içe geçmiş sütunlar da sütun olarak algılanır string . JSON ve CSV verileri kendi kendini açıkladığından ve birçok veri türünü destekleyebileceğinden, verileri dize olarak göstermek, sayısal tür uyuşmazlıkları (tamsayılar, Long 'ler, float) gibi şema evrimini önlemenize yardımcı olabilir. Özgün Spark şeması çıkarımı davranışını sürdürmek istiyorsanız, seçeneğini cloudFiles.inferColumnTypes olarak ayarlayın true .

Not

Büyük/küçük harf duyarlılığı etkin değilse, foo Foo ve sütunları FOO aynı sütun olarak değerlendirilir. Sütunun hangi durumda temsil edileceğini rastgele ve örneklenen verilere göre değişir. Hangi durumun kullanılması gerektiğini zorlamak için şema ipuçlarını kullanabilirsiniz.

Otomatik yükleyici, veriler Hive stili bölümlemesinde düzenlense de, verilerin temel alınan dizin yapısından bölüm sütunları çıkarmaya çalışır. Örneğin, gibi bir dosya yolu, base_path/event=click/date=2021-04-01/f0.json date ve bölüm sütunlarının çıkarımı ile sonuçlanacaktır event . True olarak ayarlamadığınız müddetçe bu sütunların veri türleri dizeler olacaktır cloudFiles.inferColumnTypes . Temeldeki dizin yapısı çakışan Hive bölümleri içeriyorsa veya Hive stili bölümlendirme içermiyorsa, Bölüm sütunları yok sayılır. cloudFiles.partitionColumnsBu sütunlar dizin yapınızdan çiftler olarak mevcutsa, her zaman dosya yolundan verilen sütunları her zaman denemek ve ayrıştırmak için virgülle ayrılmış sütun adları listesi olarak seçeneğini sağlayabilirsiniz key=value .

Otomatik yükleyici şemayı kullanırken, şemaya bir veri sütunu otomatik olarak eklenir _rescued_data . Ayrıntılar için, veri sütunu ve şema evrimini bölümüne bakın.

Not

İkili dosya ( binaryFile ) ve text dosya biçimleri, sabit veri şemalarına sahiptir, ancak bölüm sütununun çıkarımını da destekler. Bölüm sütunları, belirtmediğiniz takdirde her akış yeniden başlatıldığında algılanır cloudFiles.schemaLocation . Herhangi bir olası hata veya bilgi kaybını önlemek için, Databricks bu cloudFiles.schemaLocation cloudFiles.partitionColumns cloudFiles.schemaLocation biçimler için gerekli bir seçenek olmadığından, bu dosya biçimleri için seçenek veya seçenek olarak ayar yapmanızı önerir.

Şema ipuçları

Çıkarılan veri türleri her zaman tam olarak aradığınız şeydir. Şema ipuçlarını kullanarak, bildiğiniz ve gösterilen ve tahmin edilen bir şema üzerinde beklemiş olan bilgileri oluşturabilirsiniz.

Apache Spark, varsayılan olarak, veri sütunlarının türünü göstermek için standart bir yaklaşıma sahiptir. Örneğin, iç içe geçmiş JSON 'u as 'ler olarak yapı ve tamsayılar olarak algılar. Buna karşılık, otomatik yükleyici tüm sütunları dizeler olarak değerlendirir. Bir sütunun belirli bir veri türünde olduğunu veya daha fazla genel veri türü (örneğin, bir tamsayı yerine bir Double) seçmek istiyorsanız, sütun veri türleri için rastgele sayıda ipucu sağlayabilirsiniz:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Desteklenen veri türleri listesi için veri türleri hakkındaki belgelere bakın.

Akışın başlangıcında bir sütun yoksa, bu sütunu çıkarılan şemaya eklemek için de şema ipuçlarını kullanabilirsiniz.

Şema ipuçlarının davranışını görmek için gösterilen bir şemanın örneği aşağıda verilmiştir. Çıkarılan şema:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Aşağıdaki şema ipuçlarını belirterek:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

şunları alacaksınız:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Not

Şema ipuçları yalnızca otomatik yükleyici için bir şema sağlamazsanız kullanılır . Şema ipuçlarını cloudFiles.inferColumnTypes etkin veya devre dışı olup olmadığını kullanabilirsiniz.

Şema evrimi

Otomatik yükleyici, verilerinizi işlerken yeni sütunların eklenmesini algılar. Varsayılan olarak, yeni bir sütunun eklenmesi akışlarınızın bir ile durdurulmasına neden olur UnknownFieldException . Stream Bu hatayı oluşturmadan önce, otomatik yükleyici, en son mikro-toplu veri üzerinde şema çıkarımı gerçekleştirir ve şema konumunu en son şemayla güncelleştirir. Yeni sütunlar şemanın sonuna birleştirilir. Mevcut sütunların veri türleri değişmeden kalır. Otomatik yükleyici akışınızı bir Azure Databricks işi içinde ayarlayarak, bu şema değişikliklerinden sonra akışınızı otomatik olarak yeniden başlatılacak şekilde alabilirsiniz.

Otomatik yükleyici, bu, seçeneğinde ayarladığınız şema evrimi için aşağıdaki modları destekler cloudFiles.schemaEvolutionMode :

  • addNewColumns: Otomatik yükleyici için bir şema sağlanmadıysa varsayılan mod. Akış işi ile başarısız olur UnknownFieldException . Şemaya yeni sütunlar eklenir. Mevcut sütunlar veri türlerini geliştirmez. addNewColumns akışın şeması sağlandığında izin verilmez. Bunun yerine, bu modu kullanmak istiyorsanız şemanızı bir şema ipucu olarak sağlayabilirsiniz.
  • failOnNewColumns: Otomatik yükleyici yeni bir sütun algılarsa, akış başarısız olur. Belirtilen şema güncellenmediği veya sorunlu veri dosyası kaldırıldığı takdirde bu işlem yeniden başlatmaz.
  • rescue: Akış, ilk çıkarılan veya belirtilen şemayla çalışır. Eklenen herhangi bir veri türü değişikliği veya yeni sütun, Stream 'in şemasına otomatik olarak eklenen veri sütununda yeniden oluşturulur _rescued_data . Bu modda, akış, şema değişiklikleri nedeniyle başarısız olmaz.
  • none: Bir şema sağlandığında varsayılan mod. Şemayı geliştirmez, yeni sütunlar yok sayılır ve veri sütunu bir seçenek olarak ayrı olarak sağlanmadığı sürece veriler yeniden alınmaz.

Şema evrimi için bölüm sütunları dikkate alınmıyor. Gibi bir ilk dizin yapınız varsa base_path/event=click/date=2021-04-01/f0.json ve sonra yeni dosyaları almaya başladıysanız, base_path/event=click/date=2021-04-01/hour=01/f1.json saat sütunu yok sayılır. Yeni bölüm sütunlarının bilgilerini yakalamak için olarak ayarlayın cloudFiles.partitionColumns event,date,hour .

Veri sütunu kurtarıldı

Veri sütunu, ETL sırasında verileri hiçbir şekilde kaybetmenizi veya kaçırmanızı sağlar. Rescued Data sütunu, belirtilen şemada eksik olduğu veya bir tür uyumsuzluğu olduğu veya kayıttaki veya dosyadaki sütunun büyük küçük harfleri şemadaki türle eşleşmediğinden, ayrıştırılmamış herhangi bir veriyi içeriyor. Oluşturulan veri sütunu, daha önce oluşturulmuş sütunları içeren bir JSON blobu olarak döndürülür ve kaydın kaynak dosya yolu (kaynak dosya yolu Databricks Runtime 8,3 ve üzeri sürümlerde kullanılabilir). Rescued Data sütunu, şema Çıkarsanan varsayılan olarak otomatik yükleyici tarafından döndürülen şemanın bir parçasıdır _rescued_data . Sütunu yeniden adlandırabilir veya seçeneğini ayarlayarak bir şemayı sağladığınız durumlarda ekleyebilirsiniz rescuedDataColumn .

Varsayılan değeri cloudFiles.inferColumnTypes olduğu false ve cloudFiles.schemaEvolutionMode addNewColumns şema çıkarsanma olduğu için, rescuedDataColumn yalnızca şemadan farklı bir servis talebine sahip sütunları yakalar.

JSON ve CSV ayrıştırıcıları, kayıtları ayrıştırırken üç modu destekler: PERMISSIVE , DROPMALFORMED ve FAILFAST . İle birlikte kullanıldığında rescuedDataColumn , veri türü uyuşmazlıkları kayıtların modda bırakılmasına neden olmaz DROPMALFORMED veya modda bir hata oluşturur FAILFAST . Yalnızca bozuk kayıtlar (tamamlanmamış veya hatalı biçimlendirilmiş JSON veya CSV), bırakılır veya hata oluşturur. badRecordsPathJSON veya CSV ayrıştırılırken kullanıyorsanız, veri türü uyuşmazlıkları, kullanılırken bozuk kayıtlar olarak değerlendirilmez rescuedDataColumn . Yalnızca tamamlanmamış ve hatalı biçimlendirilmiş JSON veya CSV kayıtları içinde depolanır badRecordsPath .

Veri biçimleri

Sınırlamalar

  • Şema evrimi, Databricks Runtime 8,2 ve 8,3 ' de çalışan Python uygulamalarında desteklenmez foreachBatch . foreachBatchBunun yerine Scala 'da kullanabilirsiniz.

Kullanım örnekleri

Kolay ETL 'yi etkinleştir

Verilerin kaybedilmesi gerekmeden Delta Gölü 'a veri almanın kolay bir yolu, aşağıdaki düzeni kullanmak ve otomatik yükleyici ile şema çıkarımını etkinleştirmek olur. Databricks, kaynak verilerinizin şeması değiştiğinde akışınızı otomatik olarak yeniden başlatmak için bir Azure Databricks işinde aşağıdaki kodu çalıştırmayı önerir. Varsayılan olarak, şema dize türleri olarak algılanır, herhangi bir ayrıştırma hatası (her şey bir dize olarak kalırsa) öğesine gidecektir _rescued_data ve tüm yeni sütunlar akışa hata vererek şemayı geliştirebilir.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

İyi yapılandırılmış verilerde veri kaybını önleme

Şemanızı biliyorsanız ancak beklenmeyen veriler her geldiğinde bunu bilmek istediğinizde Databricks, 'nin kullanılması rescuedDataColumn önerilir.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Şemanız ile eşleşmez yeni bir alan ortaya çıktı ise akışınızı işlemeyi durdurmak için şunları ebilirsiniz:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Esnek yarı yapılandırılmış veri işlem hatlarını etkinleştirme

Bir satıcıdan sağ yaptıkları bilgilerle ilgili yeni sütunlar sağlayan verileri alırken, tam olarak ne zaman yaptıkları hakkında bilginiz yok olabilir veya veri işlem hattınızı güncelleştirmek için bant genişliğine sahip olmadığınız olabilir. Artık şema evrimini kullanarak akışı yeniden başlatabilirsiniz ve Otomatik Yükleyici'nin ertelenen şemayı otomatik olarak güncelleştirmesine izin veabilirsiniz. Ayrıca satıcının schemaHints sağlamakta olduğu "şemasız" alanlardan bazıları için de yararlanabilirsiniz.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Sık sorulan sorular (SSS)

Otomatik Yükleyici şemayı nasıl çıkar?

DataFrame ilk kez tanımlandığı zaman, Otomatik Yükleyici kaynak dizininizi listeler ve en son (dosya değiştirme zaman aralığına göre) 50 GB veya 1000 dosyayı seçer ve veri şemanızı çıkarımk için bu dosyaları kullanır.

Otomatik Yükleyici ayrıca kaynak dizin yapısını inceler ve yapıyı içeren dosya yollarını inceler ve bölüm sütunlarını /key=value/ çıkartır. Kaynak dizin tutarsız bir yapıya sahipse, örneğin:

base/path/partition=1/date=2020-12-31/file1.json
// inconsistent because date and partition directories are in different orders
base/path/date=2020-12-31/partition=2/file2.json
// inconsistent because the date directory is missing
base/path/partition=3/file3.json

Otomatik Yükleyici, bölüm sütunlarını boş olarak gösterir. Dizin cloudFiles.partitionColumns yapısından sütunları açıkça ayrıştırmak için kullanın.

Kaynak klasör boş olduğunda Otomatik Yükleyici nasıl davranır?

Kaynak dizin boşsa, çıkarım gerçekleştirecek veri yoksa Otomatik Yükleyici bir şema sağlamanız gerekir.

Otomatik yükleyici şemayı ne zaman çıkartır? Her mikro toplu iş sonrasında otomatik olarak gelişiyor mu?

DataFrame kodunuz içinde ilk kez tanımlandığı zaman şemayı gösterir. Her mikro toplu iş sırasında şema değişiklikleri çalışma sırasında değerlendirilir; Bu nedenle performans isabetleri konusunda endişelenmeniz gerek yok. Akış yeniden başlatıldığında, şema konumdan gelişmiş şemayı alır ve çıkarım ek yükü olmadan yürütmeye başlar.

Otomatik Yükleyici şema çıkarıcısı kullanılırken verilerin verisini alan performans etkisi nedir?

İlk şema çıkarı sırasında çok büyük kaynak dizinleri için şema çıkarmı birkaç dakika beklemelisiniz. Akış yürütme sırasında aksi takdirde önemli performans isabetleri gözlemlenmeyin. Kodunuzu bir not defterinde Azure Databricks, Otomatik Yükleyici'nin veri şemanızı örneklemek ve çıkarım için dizininizi listelemek için ne zaman listele olacağını belirten durum güncelleştirmelerini görebilir.

Bir hata nedeniyle hatalı bir dosya şemamı önemli ölçüde değiştirdi. Şema değişikliğini geri almak için ne yapabilirim?

Yardım için Databricks desteğine başvurun.