Tablo akışı okumaları ve yazmaları

Delta Lake, ve aracılığıyla Spark Yapılandırılmış Akışı ile derin bir şekilde readStream tümleştirilmiştir. writeStream Delta Lake genellikle aşağıdakiler dahil olmak üzere akış sistemleri ve dosyalarla ilişkili sınırlamaların çoğunun üstesinden gelir:

  • Düşük gecikme süresiyle esning tarafından üretilen küçük dosyaları bir arada

  • Birden fazla akışla (veya eşzamanlı toplu işlerle) "tam olarak bir kez" işlemeyi sürdürme

  • Bir akışın kaynağı olarak dosyaları kullanırken hangi dosyaların yeni olduğunu verimli bir şekilde bulma

Kaynak olarak Delta tablosu

Delta tablosunu akış kaynağı olarak yükp bir akış sorgusunda kullanırken sorgu, hem tabloda mevcut olan tüm verileri hem de akış başlatıldıktan sonra gelen tüm yeni verileri işler.

Hem yolları hem de tabloları akış olarak yükleyebilirsiniz.

spark.readStream.format("delta")
  .load("/mnt/delta/events")

import io.delta.implicits._
spark.readStream.delta("/mnt/delta/events")

veya

import io.delta.implicits._

spark.readStream.format("delta").table("events")

Bu bölümdeki konular:

Giriş oranını sınırlama

Mikro toplu işleri kontrol etmek için aşağıdaki seçenekler kullanılabilir:

  • maxFilesPerTrigger: Her mikro toplu iş için dikkate alınacak yeni dosya sayısı. Varsayılan değer 1000’dir.
  • maxBytesPerTrigger: Her mikro toplu iş için ne kadar veri işleniyor? Bu seçenek bir "soft max" ayarlar, yani bir toplu iş yaklaşık olarak bu miktarda veri işler ve sınırın daha fazlasını işler. Akışınız Trigger.Once için kullanırsanız bu seçenek yoksayılır. Bu varsayılan olarak ayarlanmaz.

ile birlikte maxBytesPerTrigger kullanırsanız, mikro toplu iş veya sınırına maxFilesPerTrigger ulaşıncaya kadar verileri maxFilesPerTrigger maxBytesPerTrigger işler.

Not

Kaynak tablo işlemlerinin yapılandırma nedeniyle temizlenmiş olduğu ve işleme sırasında akış gecikmeleri olduğu durumlarda Delta Lake, kaynak tablonun en son kullanılabilir işlem geçmişine karşılık gelen verileri işler ancak akışı başarısız logRetentionDuration olmaz. Bu, verilerin düşmesine neden olabilir.

Güncelleştirmeleri ve silmeleri yoksayma

Yapılandırılmış Akış, ekleme değil girişi işlemez ve kaynak olarak kullanılan tabloda herhangi bir değişiklik olursa bir özel durum oluşturur. Aşağı akışa otomatik olarak yayılamayacak değişikliklerle ilgilenmek için iki ana strateji vardır:

  • Çıktıyı ve denetim noktasını silebilir ve akışı en baştan yeniden başlatebilirsiniz.
  • Şu iki seçeneklerden birini seçebilirsiniz:
    • ignoreDeletes: Bölüm sınırlarda verileri sen işlemleri yoksayın.
    • ignoreChanges: , , (bölümler içinde) veya gibi veri değiştirme işlemi nedeniyle kaynak tabloda dosyaların yeniden yazılacak olması gerekirse UPDATE MERGE INTO DELETE güncelleştirmeleri yeniden OVERWRITE işle. Değişmeyen satırlar yine de yayılabilir, bu nedenle aşağı akış tüketicileriniz yinelenenleri işleyene kadar devam edecektir. Silmeler aşağı akışa yayılmaz. ignoreChanges alt ignoreDeletes altları. Bu nedenle, ignoreChanges kullanırsanız, akışınız kaynak tablodaki silme veya güncelleştirmeler tarafından kesintiye neden olmaz.

Örnek

Örneğin, , ve sütunları ile user_events bölümlenmiş date bir tablo olduğunu user_email action date varsayalım. Tablodan akışla user_events akışla akar ve GDPR nedeniyle bu tablodan veri silmeniz gerekir.

Bölüm sınırlarından silebilirsiniz (yani bölüm sütunundadır), dosyalar zaten değere göre segmentlere ayrılmıştır ve bu nedenle silme yalnızca bu dosyaları meta WHERE verilerden siler. Bu nedenle, yalnızca bazı bölümlere veri silmek için şunları kullanabilirsiniz:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/mnt/delta/user_events")

Ancak, verileri temel alarak silmeniz user_email gerekirse şunları kullansanız gerekir:

spark.readStream.format("delta")
  .option("ignoreChanges", "true")
  .load("/mnt/delta/user_events")

deyimiyle user_email UPDATE güncelleştirin, söz konusu dosyayı içeren dosya yeniden user_email yazılır. 'ı kullanırsanız, yeni kayıt aynı dosyada yer alan diğer ignoreChanges tüm değiştirilmemiş kayıtlarla aşağı akışa yayma. Mantığınız bu gelen yinelenen kayıtları işleyene kadar devam etmek zorunda.

İlk konumu belirtme

Not

Bu özellik, Databricks Runtime 7.3 LTS ve üzerinde kullanılabilir.

Tablonun tamamını işlemeden Delta Lake akış kaynağının başlangıç noktasını belirtmek için aşağıdaki seçenekleri kullanabilirsiniz.

  • startingVersion: Başlangıç olarak Delta Lake sürümü. Bu sürümden (dahil) başlayarak yapılan tüm tablo değişiklikleri akış kaynağı tarafından okunur. Commit sürümlerini DESCRIBE HISTORY komut version çıktısı sütunundan edinebilirsiniz.

    7 Databricks Runtime 7.4 ve üzeri sürümlerde yalnızca en son değişiklikleri dönmek için latest belirtin.

  • startingTimestamp: Başlangıç zaman damgası. Zaman damgasında veya sonrasında işlenen tüm tablo değişiklikleri (dahil) akış kaynağı tarafından okunur. Bunlardan biri:

    • Zaman damgası dizesi. Örneğin, "2019-01-01T00:00:00.000Z".
    • Tarih dizesi. Örneğin, "2019-01-01".

İki seçeneği de aynı anda ayaramaz; bunlardan yalnızca birini kullanabilirsiniz. Yalnızca yeni bir akış sorgusu başlatıyorken etkili olur. Akış sorgusu başlatıldı ve ilerleme durumu denetim noktasına kaydedildikçe bu seçenekler yoksayılır.

Önemli

Akış kaynağını belirtilen bir sürümden veya zaman damgasından başlatabilirsiniz ancak akış kaynağının şeması her zaman Delta tablonun en son şemasıdır. Belirtilen sürüm veya zaman damgasının ardından Delta tablosunda uyumsuz şema değişikliği olmadığını emin olun. Aksi takdirde, akış kaynağı, verileri yanlış bir şemayla okurken yanlış sonuçlar dönüşe neden olabilir.

Örnek

Örneğin, bir tablo olduğunu user_events varsayalım. Sürüm 5'ten itibaren değişiklikleri okumak için şunları kullanın:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/mnt/delta/user_events")

2018-10-18'den bu yana yapılan değişiklikleri okumak için şunları kullanın:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/mnt/delta/user_events")

Havuz olarak delta tablosu

Yapılandırılmış Akış kullanarak delta tablosuna veri de yazabilirsiniz. İşlem günlüğü, tabloda eşzamanlı olarak çalışan başka akışlar veya toplu sorgular olsa bile Delta Lake'in tam olarak bir kez işlemeyi garantilesini sağlar.

Bu bölümdeki konular:

Ölçümler

Not

8.1 Databricks Runtime ve üzeri bir şekilde kullanılabilir.

Bir akış sorgusu işleminde ve ölçümleri olarak işlenecek bayt sayısını ve dosya sayısını numBytesOutstanding numFilesOutstanding bulabilirsiniz. Akışı bir not defterinde çalıştırıyorsanız, bu ölçümleri akış sorgusu ilerleme durumu panosunun Ham Veriler sekmesinde görebilirsiniz:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Ekleme modu

Akışlar varsayılan olarak ekleme modunda çalışır ve tabloya yeni kayıtlar ekler.

Yol yöntemini kullanabilirsiniz:

Python

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events")

Scala

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .start("/mnt/delta/events")

import io.delta.implicits._
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .delta("/mnt/delta/events")

veya tablo yöntemi:

Python

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .table("events")

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .table("events")

Tamamlama modu

Tablonun tamamını her toplu işle değiştirmek için Yapılandırılmış Akış da kullanabilirsiniz. Örnek kullanım örneklerinden biri, toplama kullanarak bir özeti hesaplamaktır:

spark.readStream
  .format("delta")
  .load("/mnt/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/mnt/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/mnt/delta/eventsByCustomer")

Yukarıdaki örnek, müşteriye göre toplam olay sayısını içeren bir tabloyu sürekli olarak güncelleştirir.

Daha uzun gecikme süresi gereksinimleri olan uygulamalar için, tek seferlik tetikleyicilerle bilgi işlem kaynaklarından tasarruf sabilirsiniz. Özet toplama tablolarını belirli bir zaman çizelgesine göre güncelleştirmek ve yalnızca son güncelleştirmeden bu yana gelen yeni verileri işerek bunları kullanın.

Bire bir etkili çok tablolu yazmalar

Not

8.4 ve üzeri Databricks Runtime kullanılabilir.

Bu bölümde foreachBatch komutunu kullanarak birden çok tabloya yazma açıklanıyor. foreachBatch akış sorgusunda yer alan her mikro toplu iş çıkışının birden çok hedef hedefe yazıldığına izin verir. Ancak, bu yazma denemeleri toplu iş yeniden yürütülse de yürütülse de yürütülmez bilgisi eksik olduğu için bu yazma işlemleri bir kez foreachBatch etkili olmaz. Örneğin, başarısız bir toplu işi yeniden çalıştırmak yinelenen veri yazma işlemlerine neden olabilir.

Delta tabloları bunu ele alan aşağıdaki DataFrameWriter seçeneklerini destekler ve yazmaları birmpotent hale getirir:

  • txnAppId: Her yazmada geçilemedik benzersiz bir DataFrame dize. Örneğin StreamingQuery kimliğini olarak txnAppId kullanabilirsiniz.
  • txnVersion: İşlem sürümü olarak hareket etmek için monoton olarak artan bir sayı.

Delta Lake, yinelenen yazmaları tanımlamak txnAppId txnVersion ve yoksaymak için ve birleşimini kullanır.

Bir toplu iş yazma işlemi hatayla kesilirse, toplu iş yeniden çalıştırılan iş aynı uygulamayı ve toplu iş kimliğini kullanır. Bu, çalışma zamanının yinelenen yazmaları doğru şekilde tanımlaması ve bunları yoksayması konusunda yardımcı olur. Uygulama Kimliği ( txnAppId ) kullanıcı tarafından oluşturulan herhangi bir benzersiz dize olabilir ve akış kimliğiyle ilgili olması gerek değildir.

Uyarı

Akış denetim noktasını silebilir ve sorguyu yeniden başlatacak olursanız farklı bir sağlamanız gerekir; aksi takdirde, yeniden başlatılan sorgudan yazma işlemleri yoksayılır çünkü aynı olur ve toplu iş kimliği appId txnAppId 0'dan başlar.

Örnek

appId = ... // A unique string that is used as application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}