Azure Databricks'te Delta Lake değişiklik veri akışını kullanma

Not

  • Bu makalede değişiklik veri akışı özelliği kullanılarak Delta tabloları için satır düzeyi değişiklik bilgilerinin nasıl kaydedileceği ve sorgulandığı açıklanır. Delta Live Tables işlem hattındaki tabloları kaynak verilerdeki değişikliklere göre güncelleştirme hakkında bilgi edinmek için bkz . Delta Live Tablolarında DEĞIŞIKLIKLERI UYGULA API'siyle basitleştirilmiş değişiklik verileri yakalama.

Değişiklik veri akışı, Azure Databricks'in Delta tablosunun sürümleri arasındaki satır düzeyi değişiklikleri izlemesine olanak tanır. Delta tablosunda etkinleştirildiğinde, çalışma zamanı kayıtları tabloya yazılan tüm veriler için olayları değiştirir. Bu, satır verilerinin yanı sıra belirtilen satırın eklendiğini, silindiğini veya güncelleştirildiğini belirten meta verileri içerir.

Spark SQL, Apache Spark DataFrames ve Yapılandırılmış Akış kullanarak toplu iş sorgularındaki değişiklik olaylarını okuyabilirsiniz.

Önemli

Değişiklik veri akışı, değişiklik bilgilerini sağlamak için tablo geçmişiyle birlikte çalışır. Delta tablosunun kopyalanması ayrı bir geçmiş oluşturduğundan, kopyalanan tablolardaki değişiklik veri akışı özgün tablonunkiyle eşleşmiyor.

Kullanım örnekleri

Değişiklik veri akışı varsayılan olarak etkin değildir. Değişiklik veri akışını etkinleştirdiğinizde aşağıdaki kullanım örnekleri yönlendirilmelidir.

  • Silver ve Gold tabloları: ETL ve ELT işlemlerini hızlandırmak ve basitleştirmek için ilk MERGE, UPDATEveya DELETE işlemlerden sonra yalnızca satır düzeyi değişiklikleri işleyerek Delta Lake performansını geliştirin.
  • Gerçekleştirilmiş görünümler: Temel alınan tabloların tamamını yeniden işlemek zorunda kalmadan iş zekası ve analizde kullanılmak üzere bilgilerin güncel, toplu görünümlerini oluşturun; bunun yerine yalnızca değişikliklerin geldiği yerleri güncelleştirin.
  • Değişiklikleri iletme: Kafka veya RDBMS gibi veri işlem hatlarının sonraki aşamalarında artımlı olarak işlemek için kullanabilen aşağı akış sistemlerine bir değişiklik veri akışı gönderin.
  • Denetim izi tablosu: Değişiklik veri akışını delta tablosu olarak yakalamak, silmelerin ne zaman gerçekleştiği ve hangi güncelleştirmelerin yapıldığı da dahil olmak üzere zaman içindeki tüm değişiklikleri görmek için kalıcı depolama ve verimli sorgu özelliği sağlar.

Değişiklik veri akışını etkinleştirme

Aşağıdaki yöntemlerden birini kullanarak veri akışını değiştir seçeneğini açıkça etkinleştirmeniz gerekir:

  • Yeni tablo: komutunda CREATE TABLE table özelliğini delta.enableChangeDataFeed = true ayarlayın.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Varolan tablo: komutunda ALTER TABLE table özelliğini delta.enableChangeDataFeed = true ayarlayın.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tüm yeni tablolar:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Önemli

Yalnızca değişiklik veri akışını etkinleştirdikten sonra yapılan değişiklikler kaydedilir; tablodaki geçmiş değişiklikler yakalanmaz.

Veri depolama alanını değiştirme

Azure Databricks kayıtları tablo dizininin altındaki klasördeki _change_data , DELETEve MERGE işlemleri için UPDATEdeğişiklik verilerini değiştirir. Yalnızca ekleme işlemleri ve tam bölüm silme işlemleri gibi bazı işlemler dizinde _change_data veri oluşturmaz çünkü Azure Databricks değişiklik veri akışını doğrudan işlem günlüğünden verimli bir şekilde hesaplayabilir.

Klasördeki _change_data dosyalar tablonun bekletme ilkesini izler. Bu nedenle, VACUUM komutunu çalıştırırsanız değişiklik veri akışı verileri de silinir.

Toplu sorgulardaki değişiklikleri okuma

Başlangıç ve bitiş için sürüm veya zaman damgası sağlayabilirsiniz. Başlangıç ve bitiş sürümleri ile zaman damgaları sorgularda kapsayıcıdır. Belirli bir başlangıç sürümünden tablonun en son sürümüne yapılan değişiklikleri okumak için yalnızca başlangıç sürümünü veya zaman damgasını belirtin.

Bir sürümü tamsayı ve zaman damgalarını biçiminde yyyy-MM-dd[ HH:mm:ss[.SSS]]bir dize olarak belirtirsiniz.

Değişiklik olaylarını kaydeden bir sürümden daha eski bir sürüm veya zaman damgası sağlarsanız (değişiklik veri akışı etkinleştirildiğinde), değişiklik veri akışının etkinleştirilmediğini belirten bir hata oluşur.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Akış sorgularındaki değişiklikleri okuma

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Tabloyu okurken değişiklik verilerini almak için seçeneğini readChangeFeed olarak trueayarlayın. startingVersion veya startingTimestamp isteğe bağlıdır ve sağlanmadıysa akış, akış sırasında tablonun en son anlık görüntüsünü döndürür ve değişiklik verileri olarak INSERT gelecekteki değişiklikler olur. Hız sınırları (maxFilesPerTrigger, maxBytesPerTrigger) gibi excludeRegex seçenekler değişiklik verileri okunurken de desteklenir.

Not

Hız sınırlama, başlangıç anlık görüntüsü sürümü dışındaki sürümler için atomik olabilir. Diğer bir ifadeyle, işleme sürümünün tamamı hız sınırına sahip olur veya işlemenin tamamı döndürülür.

Varsayılan olarak, bir kullanıcı bir sürümde veya zaman damgasında tablodaki son işlemeyi aşıyorsa hata timestampGreaterThanLatestCommit oluşur. Databricks Runtime 11.3 LTS ve üzeri sürümlerinde, kullanıcı aşağıdaki yapılandırmayı trueolarak ayarlarsa değişiklik veri akışı aralık dışı sürüm durumunu işleyebilir:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Bir tablodaki son işlemeden daha büyük bir başlangıç sürümü veya bir tablodaki son işlemeden daha yeni bir başlangıç zaman damgası sağlarsanız, önceki yapılandırma etkinleştirildiğinde boş bir okuma sonucu döndürülür.

Bir tablodaki son işlemeden daha büyük bir bitiş sürümü veya tablodaki son işlemeden daha yeni bir bitiş zaman damgası sağlarsanız, önceki yapılandırma toplu okuma modunda etkinleştirildiğinde, başlangıç sürümü ile son işleme arasındaki tüm değişiklikler döndürülür.

Değişiklik veri akışının şeması nedir?

Bir tablonun değişiklik veri akışından okuduğunuzda, en son tablo sürümünün şeması kullanılır.

Not

Çoğu şema değişikliği ve evrim işlemi tam olarak desteklenir. Sütun eşlemesi etkinleştirilmiş tablo tüm kullanım örneklerini desteklemez ve farklı davranış gösterir. Bkz . Sütun eşlemesi etkin tablolar için veri akışı sınırlamalarını değiştirme.

Değişiklik veri akışı, Delta tablosunun şemasındaki veri sütunlarına ek olarak değişiklik olayının türünü tanımlayan meta veri sütunlarını içerir:

Sütun adı Tür Değerler
_change_type String insert, update_preimage , update_postimage, delete(1)
_commit_version Uzun Değişikliği içeren Delta günlüğü veya tablo sürümü.
_commit_timestamp Zaman damgası İşleme oluşturulduğunda ilişkili zaman damgası.

(1)preimage güncelleştirmeden önceki değerdir, postimage güncelleştirmeden sonraki değerdir.

Not

Şemada bu eklenen sütunlarla aynı adlara sahip sütunlar varsa tablodaki değişiklik veri akışını etkinleştiremezsiniz. Değişiklik veri akışını etkinleştirmeye çalışmadan önce bu çakışmayı çözmek için tablodaki sütunları yeniden adlandırın.

Sütun eşlemesi etkin tablolar için veri akışı sınırlamalarını değiştirme

Delta tablosunda sütun eşlemesi etkinleştirildiğinde, var olan veriler için veri dosyalarını yeniden yazmadan tablodaki sütunları bırakabilir veya yeniden adlandırabilirsiniz. Sütun eşleme etkinleştirildiğinde, sütun yeniden adlandırma veya bırakma, veri türünü değiştirme veya null atanabilirlik değişiklikleri gibi eklemesiz şema değişiklikleri gerçekleştirdikten sonra değişiklik veri akışında sınırlamalar vardır.

Önemli

  • Toplu iş semantiği kullanılarak eklemesiz şema değişikliğinin gerçekleştiği bir işlem veya aralık için değişiklik veri akışını okuyamazsınız.
  • Databricks Runtime 12.2 LTS ve altında, sütun eşlemesi etkin olan ve eklemesiz şema değişiklikleriyle karşılaşan tablolar, değişiklik veri akışında akış okumalarını desteklemez. Bkz. Sütun eşleme ve şema değişiklikleriyle akış yapma.
  • Databricks Runtime 11.3 LTS ve altında, sütun eşlemesi etkin olan ve sütun yeniden adlandırma veya bırakma işlemiyle karşılaşmış olan tabloların değişiklik veri akışını okuyamazsınız.

Databricks Runtime 12.2 LTS ve üzeri sürümlerde, sütun eşlemesi etkin olan ve eklemesiz şema değişiklikleriyle karşılaşan tablolar için değişiklik veri akışında toplu okuma gerçekleştirebilirsiniz. Okuma işlemleri, tablonun en son sürümünün şemasını kullanmak yerine sorguda belirtilen tablonun son sürümünün şemasını kullanır. Belirtilen sürüm aralığı eklemeli olmayan bir şema değişikliğine yayılsa da sorgular başarısız olur.

Sık sorulan sorular (SSS)

Değişiklik veri akışını etkinleştirmenin ek yükü nedir?

Önemli bir etki yoktur. Değişiklik verileri kayıtları sorgu yürütme işlemi sırasında satır içinde oluşturulur ve genellikle yeniden yazılan dosyaların toplam boyutundan çok daha küçüktür.

Değişiklik kayıtları için bekletme ilkesi nedir?

Değişiklik kayıtları, güncel olmayan tablo sürümleriyle aynı bekletme ilkesini izler ve belirtilen bekletme süresinin dışındaysa VACUUM aracılığıyla temizlenir.

Değişiklik veri akışında yeni kayıtlar ne zaman kullanılabilir hale gelir?

Değişiklik verileri Delta Lake işlemiyle birlikte işlenir ve yeni veriler tabloda kullanılabilir hale gelir.

Not defteri örneği: Delta değişiklik veri akışı ile değişiklikleri yayma

Bu not defteri, mutlak sayıda aşı içeren gümüş bir tabloda yapılan değişikliklerin aşı oranlarının altın renkli bir tablosuna nasıl yayılarak yapıldığını gösterir.

Veri akışı not defterini değiştirme

Not defterini alma