Tablo silmeleri, güncelleştirmeleri ve birleştirmeleri

Delta Gölü, Delta tablolarında verilerin silinmesini ve güncelleştirilmesini kolaylaştırmak için çeşitli deyimleri destekler.

Tablodan Sil

Bir Delta tablosundan bir koşul ile eşleşen verileri kaldırabilirsiniz. Örneğin, öncesinde tüm olayları silmek için 2017 aşağıdakileri çalıştırabilirsiniz:

SQL

DELETE FROM events WHERE date < '2017-01-01'

DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

Python

Not

Python API 'SI Databricks Runtime 6,1 ve üzeri sürümlerde kullanılabilir.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        # predicate using SQL formatted string

deltaTable.delete(col("date") < "2017-01-01")   # predicate using Spark SQL functions

Scala

Not

Scala API 'SI Databricks Runtime 6,0 ve üzeri sürümlerde kullanılabilir.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.delete(col("date") < "2017-01-01")       // predicate using Spark SQL functions and implicits

Java

Not

Java API 'SI Databricks Runtime 6,0 ve üzeri sürümlerde kullanılabilir.

import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

deltaTable.delete("date < '2017-01-01'");            // predicate using SQL formatted string

deltaTable.delete(functions.col("date").lt(functions.lit("2017-01-01")));   // predicate using Spark SQL functions

Ayrıntılar için bkz. Delta Lake API başvurusu .

Önemli

delete verileri Delta tablosunun en son sürümünden kaldırır, ancak eski sürümler açıkça vacuumed kadar fiziksel depolamadan kaldırmaz. Ayrıntılar için bkz. vakum .

İpucu

Mümkün olduğunda, bölümlenmiş bir Delta tablosu için bölüm sütunlarında koşullar sağlayın ve bu koşullar işlemi önemli ölçüde hızlandırabilirler.

Tablo güncelleştirme

Bir Delta tablosunda bir koşula uyan verileri güncelleştirebilirsiniz. Örneğin, içinde bir yazım hatalarını onarmak için eventType aşağıdakileri çalıştırabilirsiniz:

SQL

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'

UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'

Python

Not

Python API 'SI Databricks Runtime 6,1 ve üzeri sürümlerde kullanılabilir.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.update("eventType = 'clck'", { "eventType": "'click'" } )   # predicate using SQL formatted string

deltaTable.update(col("eventType") == "clck", { "eventType": lit("click") } )   # predicate using Spark SQL functions

Scala

Not

Scala API 'SI Databricks Runtime 6,0 ve üzeri sürümlerde kullanılabilir.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  Map("eventType" -> "'click'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                // predicate using Spark SQL functions and implicits
  col("eventType") === "clck",
  Map("eventType" -> lit("click")));

Java

Not

Scala API 'SI Databricks Runtime 6,0 ve üzeri sürümlerde kullanılabilir.

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  new HashMap<String, String>() {{
    put("eventType", "'click'");
  }}
);

deltaTable.update(                // predicate using Spark SQL functions
  functions.col(eventType).eq("clck"),
  new HashMap<String, Column>() {{
    put("eventType", functions.lit("click"));
  }}
);

Ayrıntılar için bkz. Delta Lake API başvurusu .

İpucu

Silmeye benzer şekilde, güncelleştirme işlemleri, bölümlerdeki koşullara göre önemli bir hızlı işlem yapabilir.

Birleştirme kullanarak bir tabloya bir yukarı Ekle

kaynak tablo, görünüm veya veri çerçevesindeki verileri SQL işlemini kullanarak hedef Delta tablosuna taşıyabilirsiniz MERGE . Delta gölü ekleme, güncelleştirme ve silme işlemlerini destekler MERGE ve gelişmiş kullanım durumlarını kolaylaştırmak için SQL standartlarının ötesinde genişletilmiş sözdizimini destekler.

İle olaylar için yeni veri içeren bir Spark DataFrame olduğunu varsayalım eventId . Bu olaylardan bazıları tabloda zaten mevcut olabilir events . Yeni verileri tabloyla birleştirmek için events , eşleşen satırları (yani, eventId zaten var olan) güncelleştirmek ve yeni satırları (yani, yok) eklemek istersiniz eventId . Aşağıdakileri çalıştırabilirsiniz:

SQL

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

Sözdizimi ayrıntıları için bkz.

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.alias("events").merge(
    updatesDF.alias("updates"),
    "events.eventId = updates.eventId") \
  .whenMatchedUpdate(set = { "data" : "updates.data" } ) \
  .whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()

Java

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

Dataset<Row> updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched()
  .updateExpr(
    new HashMap<String, String>() {{
      put("data", "events.data");
    }})
  .whenNotMatched()
  .insertExpr(
    new HashMap<String, String>() {{
      put("date", "updates.date");
      put("eventId", "updates.eventId");
      put("data", "updates.data");
    }})
  .execute();

Scala, Java ve Python sözdizimi ayrıntıları için Delta Lake API başvurusuna bakın.

İşlem semantiği

Programlı işlemin ayrıntılı bir açıklaması aşağıda verilmiştir merge .

  • Herhangi bir sayıda whenMatched ve whenNotMatched yan tümce olabilir.

    Not

    Databricks Runtime 7,2 ve altında, merge en fazla 2 whenMatched yan tümce ve en fazla 1 whenNotMatched yan tümce bulunabilir.

  • whenMatched yan tümceler, bir kaynak satırı eşleşme koşuluna göre bir hedef tablo satırıyla eşleştiğinde yürütülür. Bu yan tümcelerde aşağıdaki semantiği vardır.

    • whenMatched yan tümcelerinde en fazla bir update ve bir delete eylem olabilir. updateİçindeki eylem, merge eşleşen hedef satırın yalnızca belirtilen sütunlarını ( update işlemebenzer) güncelleştirir. deleteEylem eşleşen satırı siler.

    • Her whenMatched yan tümce isteğe bağlı bir koşula sahip olabilir. Bu yan tümce koşulu varsa, update ya da delete eylem yalnızca yan tümce koşulu true olduğunda, eşleşen herhangi bir kaynak-hedef satır çifti için yürütülür.

    • Birden çok whenMatched yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir (yani yan tümcelerinin sırası). whenMatchedSon diğeri hariç tüm yan tümcelerinin koşulları olmalıdır.

    • Birden çok whenMatched yan tümce koşullara sahiptir ve eşleşen kaynak hedefi satır çifti için koşulların hiçbiri doğru değilse, eşleşen hedef satırı değişmeden bırakılır.

    • Hedef Delta tablosunun tüm sütunlarını kaynak veri kümesinin karşılık gelen sütunlarıyla güncelleştirmek için kullanın whenMatched(...).updateAll() . Bu eşdeğerdir:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      hedef Delta tablosunun tüm sütunları için. Bu nedenle, bu eylem kaynak tabloda hedef tablodaki sütunların bulunduğu varsayılır, aksi takdirde sorgu bir çözümleme hatası oluşturur.

      Not

      Bu davranış, otomatik şema geçişi etkin olduğunda değişir. Ayrıntılar için bkz. Otomatik şema evrimi .

  • whenNotMatched yan tümceler, bir kaynak satırı eşleşme koşulunu temel alan herhangi bir hedef satırla eşleşmediği zaman yürütülür. Bu yan tümcelerde aşağıdaki semantiği vardır.

    • whenNotMatched yan tümcelerinde yalnızca insert eylem olabilir. Yeni satır, belirtilen sütun ve karşılık gelen ifadeler temel alınarak oluşturulur. Hedef tablodaki tüm sütunları belirtmeniz gerekmez. Belirtilmeyen hedef sütunlar için NULL eklenir.

      Not

      Databricks Runtime 6,5 ve altında, eylem için hedef tablodaki tüm sütunları sağlamanız gerekir INSERT .

    • Her whenNotMatched yan tümce isteğe bağlı bir koşula sahip olabilir. Yan tümce koşulu varsa, yalnızca söz konusu satır için bu koşul doğru olduğunda bir kaynak satır eklenir. Aksi takdirde, kaynak sütun yok sayılır.

    • Birden çok whenNotMatched yan tümce varsa, bunlar belirtildikleri sırayla değerlendirilir (yani yan tümcelerinin sırası). whenNotMatchedSon diğeri hariç tüm yan tümcelerinin koşulları olmalıdır.

    • Hedef Delta tablosunun tüm sütunlarını, kaynak veri kümesinin karşılık gelen sütunlarına eklemek için kullanın whenNotMatched(...).insertAll() . Bu eşdeğerdir:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      hedef Delta tablosunun tüm sütunları için. Bu nedenle, bu eylem kaynak tabloda hedef tablodaki sütunların bulunduğu varsayılır, aksi takdirde sorgu bir çözümleme hatası oluşturur.

      Not

      Bu davranış, otomatik şema geçişi etkin olduğunda değişir. Ayrıntılar için bkz. Otomatik şema evrimi .

Önemli

  • mergeKaynak veri kümesinin birden çok satırı eşleşiyorsa ve birleştirme, hedef Delta tablosunun aynı satırlarını güncelleştirmeye çalışırsa bir işlem başarısız olabilir. birleştirme SQL semantiğine göre, bu tür bir güncelleştirme işlemi, eşleşen hedef satırı güncelleştirmek için hangi kaynak satırın kullanılması gerektiği anlaşılır olduğundan belirsizdir. Birden çok eşleşme olasılığını ortadan kaldırmak için kaynak tabloyu ön işlemden çıkarabilirsiniz. Değişiklik verilerini yakalama örneği: Bu değişikliği hedef Delta tablosuna uygulamadan önce, değişiklik veri kümesinin (yani, kaynak veri kümesinin) yalnızca her bir anahtarın en son değişikliğini bekletmek üzere nasıl önceden işleyeceğini gösterir.
  • mergeKaynak veri kümesi belirleyici değilse, bir işlem hatalı sonuçlar üretebilir. Bunun nedeni, merge kaynak veri kümesinin iki taramasını gerçekleştirebileceğinden ve iki tarama tarafından üretilen veriler farklıysa, tabloda yapılan son değişiklikler yanlış olabilir. Kaynaktaki belirleyici olmayan bir ISM birçok şekilde oluşabilir. Bunlardan bazıları şunlardır:
    • Delta olmayan tablolardan okuma. Örneğin, temel alınan dosyaların birden çok tarama arasında değiştirebildiği bir CSV tablosundan okuma.
    • Belirleyici olmayan işlemleri kullanma. Örneğin, Dataset.filter() verileri filtrelemek için geçerli zaman damgasını kullanan işlemler birden çok tarama arasında farklı sonuçlar oluşturabilir.
  • bir SQL MERGE işlemini, yalnızca görünüm olarak tanımlanmışsa SQL görünümünde uygulayabilirsiniz CREATE VIEW viewName AS SELECT * FROM deltaTable .

Not

Databricks Runtime 7,3 LTS ve üzeri sürümlerde, eşleşmeler koşulsuz olarak silinirken birden fazla eşleşme yapılmasına izin verilir (birden fazla eşleşme olsa bile koşulsuz silme belirsiz olduğundan).

Şema doğrulaması

merge INSERT ve Update ifadeleri tarafından oluşturulan verilerin şemasının, tablonun şemasıyla uyumlu olduğunu otomatik olarak doğrular. İşlemin uyumlu olup olmadığını anlamak için aşağıdaki kuralları kullanır merge :

  • ve update insert eylemleri için, belirtilen hedef sütunların hedef Delta tablosunda mevcut olması gerekir.
  • ve updateAll insertAll eylemleri için kaynak veri kümesi, hedef Delta tablonun tüm sütunlarına sahip olması gerekir. Kaynak veri kümesi ek sütunlara sahip olabilir ve yoksayılır.
  • Tüm eylemler için, hedef sütunları oluşturan ifadeler tarafından oluşturulan veri türü, hedef Delta tablosunda karşılık gelen sütunlardan farklı ise, bunları tablodaki merge türlere türlere türe türler.

Otomatik şema evrimi

Not

'de şema merge evrimi, Databricks Runtime 6.6 ve üzerinde kullanılabilir.

Varsayılan olarak, hedef Delta tablosunda yer alan ve kaynak veri kümesinden aynı adı alan updateAll insertAll sütunlara sahip tüm sütunları attayabilirsiniz. Kaynak veri kümesinde hedef tablodaki sütunla eşleşmeen tüm sütunlar yoksayılır. Ancak bazı kullanım örnekleri için kaynak sütunların hedef Delta tablosuna otomatik olarak eklemek tercih edilir. ve (en az bir tanesi) ile bir işlem sırasında tablo şemasını otomatik olarak güncelleştirmek için, işlemi çalıştırmadan önce Spark oturum merge updateAll insertAll spark.databricks.delta.schema.autoMerge.enabled true yapılandırmasını olarak merge ayarlayın.

Not

  • Şema evrimi yalnızca bir ( ) veya ( ) eylemi updateAll ya da her ikisi birden olduğunda UPDATE SET * insertAll INSERT * gerçekleşir.
  • update ve eylemleri, hedef tabloda zaten mevcut olmayan hedef sütunlara açıkça başvuramaz (yan tümcelerden biri olsa insert updateAll insertAll bile). Aşağıdaki örneklere bakın.

Not

7 Databricks Runtime 7.4 ve altında, iç içe sütunların değil yalnızca üst düzey sütunların merge şema evrimini destekler.

Şema evrimi ile ve şema değişikliği olmadan işlemi merge etkilerine birkaç örnek aşağıda verilmiştir.

Sütunlar Sorgu (Scala'da) Şema evrimi olmadan davranış (varsayılan) Şema evrimi ile davranış
Hedef sütunlar: key, value

Kaynak sütunlar: key, value, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Tablo şeması değişmeden kalır; yalnızca sütunları key value güncelleştirilir/eklenir. Tablo şeması olarak (key, value, newValue) değiştirilir. updateAll ve value sütunlarını newValue günceller insertAll ve satırlarını (key, value, newValue) ekler.
Hedef sütunlar: key, oldValue

Kaynak sütunlar: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
updateAll ve insertAll eylemleri hata verir çünkü hedef sütun oldValue kaynakta değildir. Tablo şeması olarak (key, oldValue, newValue) değiştirilir. updateAll sütunları güncelleştirme ve değiştirmeden bırakma ve satır key newValue ekleme oldValue insertAll (key, NULL, newValue) (yani olarak oldValue NULL eklenir).
Hedef sütunlar: key, oldValue

Kaynak sütunlar: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().update(Map(
"newValue" -> col("s.newValue")))
.whenNotMatched().insertAll()
.execute()
update hedef tabloda sütun mevcut newValue olduğundan bir hata verir. update yine de bir hata verir çünkü newValue sütun hedef tabloda mevcut değildir.
Hedef sütunlar: key, oldValue

Kaynak sütunlar: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insert(Map(
"key" -> col("s.key"),
"newValue" -> col("s.newValue")))
.execute()
insert hedef tabloda sütun mevcut newValue olduğundan bir hata verir. insert sütun hedef tabloda mevcut değil newValue gibi bir hataya neden olur.

Performans ayarlama

Aşağıdaki yaklaşımları kullanarak birleştirme işlemiyle zaman azaltabilirsiniz:

  • Eşleşmeler için arama alanı azaltma: Varsayılan olarak, işlem kaynak tablodaki eşleşmeleri bulmak merge için Delta tablonun tamamını arar. Hızı azaltmanın bir merge yolu, eşleşme koşuluna bilinen kısıtlamalar ekleyerek arama alanı azaltmaktır. Örneğin, tarafından bölümlenmiş bir tablo olduğunu ve son güne ve belirli bir ülkeye ilişkin bilgileri güncelleştirmek için country kullanmak istediğiniz bir tablo olduğunu date merge varsayalım. Koşulu ekleme

    events.date = current_date() AND events.country = 'USA'
    

    yalnızca ilgili bölümlerde eşleşmeleri araması sorguyu daha hızlı hale gelecektir. Ayrıca, diğer eşzamanlı işlemlerle çakışma ihtimalini de azaltır. Diğer ayrıntılar için bkz. Eşzamanlılık denetimi.

  • Sıkıştırılmış dosyalar: Veriler çok sayıda küçük dosyada depolanıyorsa, eşleşmeleri aramak için verilerin okunması yavaş olabilir. Okuma aktarım hızını artırmak için küçük dosyaları daha büyük dosyalara sıkıştırabilirsiniz. Ayrıntılar için bkz. Sıkıştırılmış dosyalar.

  • Yazmalar için karıştırma bölümlerini denetleme: İşlem, güncelleştirilmiş verileri hesaplamak ve yazmak için verileri merge birden çok kez karıştırabilir. Karıştırmak için kullanılan görev sayısı Spark oturum yapılandırması tarafından spark.sql.shuffle.partitions denetlenr. Bu parametrenin belirlenmesi paralelliği kontrol etmekle birlikte çıkış dosyalarının sayısını da belirler. Değerin artırılması paralelliği artırır, ancak daha az sayıda veri dosyası da üretir.

  • en iyi duruma getirilmiş yazmaları etkinleştirme: Bölümlenmiş tablolar için karışık bölüm sayısından merge çok daha fazla sayıda küçük dosya üretebilir. Bunun nedeni, her karıştırma görevinin birden çok bölümde birden çok dosya yazabilir ve performans sorununa yol açabilir. İyileştirilmiş Yazma'ya olanak sağlayarak dosya sayısını azaltabilirsiniz.

Not

7 Databricks Runtime 7.4 ve üzeri bir bölümde, İyileştirilmiş Yazma bölümlenmiş merge tablolarda işlemlerde otomatik olarak etkinleştirilir.

  • Tablodaki dosya boyutlarını ayarlama: <DBR 8.2>'den bu yana, Azure Databricks Delta tablosunda dosyaları yeniden yazan sık sık işlemler olup oladığını otomatik olarak algılanabilir ve gelecekte daha fazla dosya yeniden yazma beklentisiyle yeniden yazılan dosyaların boyutunu azaltmayı merge seçebilir. Ayrıntılar için dosya boyutlarını ayarlama bölümüne bakın.

Birleştirme örnekleri

Aşağıda, farklı senaryolarda nasıl kullanabileceğiniz hakkında merge birkaç örnek verilmiştir.

Bu bölümdeki konular:

Delta tablolarına yazarken verileri tekrarlama

Yaygın bir ETL kullanım durumu, günlükleri bir tabloya ek olarak Delta tablosuna toplamaktır. Ancak, genellikle kaynaklar yinelenen günlük kayıtları oluşturamaz ve bunları ilgilendirebilecek aşağı akış yinelenenleri kaldırma adımları gerekir. ile merge yinelenen kayıtları eklemekten kaçınabilirsiniz.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Not

Yeni günlükleri içeren veri kümesi kendi içinde deduplicated gerekir. Birleştirmenin SQL, yeni verileri tablodaki mevcut verilerle eşler ve yinelenenleri çıkartır, ancak yeni veri kümesi içinde yinelenen veriler varsa eklenir. Bu nedenle, tabloyla birleştirmeden önce yeni verileri yineler.

Yalnızca birkaç gün boyunca yinelenen kayıtlar alasınız, tabloyu tarihe göre bölümleerek ve ardından eşleşmesi için hedef tablonun tarih aralığını belirterek sorguyu daha da iyileştirilmiş bir şekilde kullanabilirsiniz.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Bu, tablonun tamamına değil yalnızca son 7 günlük günü içinde yinelenenleri araması açısından önceki komuttan daha verimlidir. Ayrıca bu yalnızca ekleme birleştirme işlemini Yapılandırılmış Akış ile birlikte kullanarak günlüklerin sürekli olarak kaldırılmasını gerçekleştirebilirsiniz.

  • Bir akış sorgusunda, 'de birleştirme işlemi kullanarak herhangi bir akış verilerini sürekli olarak, tekrarsız kaldırma ile foreachBatch delta tablosuna yazabilirsiniz. hakkında daha fazla bilgi için aşağıdaki akış örneğine foreachBatch bakın.
  • Başka bir akış sorgusunda, bu Delta tablosundan sürekli olarak deduplicated verilerini okuyabilirsiniz. Bunun nedeni, yalnızca ekleme birleştirmenin Delta tablosuna yeni veriler eklemesi olabilir.

Not

Yalnızca ekleme birleştirme, 6.2 ve üzeri Databricks Runtime eklemek için en iyi duruma getirilmiştir. 6.1 Databricks Runtime ve altında, yalnızca ekleme birleştirme işlemlerinden yazma işlemleri akış olarak okunamaz.

Verileri yavaş değiştirme (SCD) Tür 2 işlemi Delta tablolarına

Bir diğer yaygın işlem de, boyutlu bir tablodaki her anahtarda yapılan tüm değişikliklerin geçmişini sürdüren SCD Tür 2'dir. Bu tür işlemler, önceki anahtar değerlerini eski olarak işaretlemek için mevcut satırların güncelleştirilerek yeni satırların en son değerler olarak eklenmesi gerekir. Güncelleştirmeleri ve boyutsal verileri olan hedef tabloyu olan bir kaynak tablo verildi, SCD Tür 2 ile ifade merge edildi.

Müşteri için adreslerin geçmişinin ve her bir adresin etkin tarih aralığının korunmasına somut bir örnek burada verilmektedir. Müşterinin adresinin güncelleştirilmiş olması gerekirken, önceki adresi geçerli adres değil olarak işaretlemeniz, etkin tarih aralığını güncelleştirmeniz ve yeni adresi geçerli adres olarak eklemeniz gerekir.

Birleştirme not defterini kullanarak SCD Tür 2

Not defterini alma

Değişiklik verilerini Delta tablosuna yazma

SCD'ye benzer şekilde, genellikle değişiklik verileri yakalama (CDC) olarak adlandırılan bir diğer yaygın kullanım durumu da dış veritabanından oluşturulan tüm veri değişikliklerini Delta tablosuna uygulamaktır. Başka bir deyişle, bir delta tablosuna dış tabloya uygulanan bir dizi güncelleştirme, silme ve eklemenin uygulanması gerekir. Bunu aşağıdaki gibi merge kullanarak yapabilirsiniz.

MERGE not defterini kullanarak değişiklik verileri yazma

Not defterini alma

kullanarak akış sorgularından upsert foreachBatch

Bir akış sorgusundan Delta tablosuna karmaşık upsert'lar yazmak için ve (daha fazla bilgi için merge foreachBatch bkz. foreachbatch) birleşimini kullanabilirsiniz. Örnek:

  • Güncelleştirme Modunda akış toplamları yazma: Bu, Tamamlama Modundan çok daha verimlidir.
  • Delta tablosuna veritabanı değişiklikleri akışı yazma: Değişiklik verilerini yazmak için birleştirme sorgusu, delta tablosuna sürekli değişiklik akışı uygulamak için foreachBatch kullanılabilir.
  • Yinelenenleri kaldırma ile Delta tablosuna akış verileri yazma: Yinelenenleri kaldırma için yalnızca ekleme birleştirme sorgusu, yinelenen verileri otomatik yinelenenleri kaldırma ile bir Delta tablosuna sürekli olarak foreachBatch yazmak için kullanılabilir.

Not

  • Akış sorgusunun yeniden başlatılması işlemi aynı veri toplu işlerine birden çok kez uygulayaya kadar içindeki deyiminizin bir kez etkili merge foreachBatch olduğundan emin olun.
  • içinde kullanılırken, akış sorgusunun giriş veri hızı (not defteri hızı grafiğinde bildirilen ve not defteri hız grafiğinde görünür) verilerin kaynakta oluşturulma oranının katları olarak merge foreachBatch StreamingQueryProgress bildiriliyor olabilir. Bunun nedeni, merge giriş verilerini birden çok kez okumanın giriş ölçümlerinin çarpımlarına neden olmasıdır. Bu bir performans sorunu ise, batch DataFrame'i önce önbelleğe alıp sonra merge da sonra önbelleğe aldırabilirsiniz. merge

Birleştirme ve foreachBatch not defterini kullanarak güncelleştirme modunda akış toplamları yazma

Not defterini alma