Odstraňování, aktualizace a slučování tabulek

Rozdílový Lake podporuje několik příkazů, které usnadňují odstranění dat z rozdílových tabulek a jejich aktualizaci.

Odstranit z tabulky

Můžete odebrat data, která odpovídají predikátu z tabulky Delta. Chcete-li například odstranit všechny události ze služby dřív 2017 , můžete spustit následující:

SQL

DELETE FROM events WHERE date < '2017-01-01'

DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

Python

Poznámka

Rozhraní Python API je dostupné v Databricks Runtime 6,1 a novějších.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        # predicate using SQL formatted string

deltaTable.delete(col("date") < "2017-01-01")   # predicate using Spark SQL functions

Scala

Poznámka

Rozhraní Scala API je dostupné v Databricks Runtime 6,0 a novějších.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.delete(col("date") < "2017-01-01")       // predicate using Spark SQL functions and implicits

Java

Poznámka

Rozhraní Java API je dostupné v Databricks Runtime 6,0 a novějších.

import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

deltaTable.delete("date < '2017-01-01'");            // predicate using SQL formatted string

deltaTable.delete(functions.col("date").lt(functions.lit("2017-01-01")));   // predicate using Spark SQL functions

Podrobnosti najdete v referenčních informacích k rozhraní API pro rozdílových Lake .

Důležité

delete Odebere data z poslední verze rozdílové tabulky, ale neodebere ji z fyzického úložiště, dokud nebudou staré verze explicitně vyvakuované. Podrobnosti najdete v části vaku .

Tip

Pokud je to možné, poskytněte predikáty sloupce oddílu pro rozdílovou tabulku dělené jako oddíly, protože tyto predikáty můžou operaci významně zrychlit.

Aktualizace tabulky

Data, která se shodují s predikátem, můžete aktualizovat v tabulce rozdílů. Chcete-li například opravit pravopisnou chybu v nástroji eventType , můžete spustit následující:

SQL

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'

UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'

Python

Poznámka

Rozhraní Python API je dostupné v Databricks Runtime 6,1 a novějších.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.update("eventType = 'clck'", { "eventType": "'click'" } )   # predicate using SQL formatted string

deltaTable.update(col("eventType") == "clck", { "eventType": lit("click") } )   # predicate using Spark SQL functions

Scala

Poznámka

Rozhraní Scala API je dostupné v Databricks Runtime 6,0 a novějších.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  Map("eventType" -> "'click'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                // predicate using Spark SQL functions and implicits
  col("eventType") === "clck",
  Map("eventType" -> lit("click")));

Java

Poznámka

Rozhraní Scala API je dostupné v Databricks Runtime 6,0 a novějších.

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  new HashMap<String, String>() {{
    put("eventType", "'click'");
  }}
);

deltaTable.update(                // predicate using Spark SQL functions
  functions.col(eventType).eq("clck"),
  new HashMap<String, Column>() {{
    put("eventType", functions.lit("click"));
  }}
);

Podrobnosti najdete v referenčních informacích k rozhraní API pro rozdílových Lake .

Tip

Podobně jako při odstraňování můžou operace aktualizace získat významné zrychlení s predikáty na oddílech.

Upsert se do tabulky pomocí sloučení

Můžete Upsert data ze zdrojové tabulky, zobrazení nebo dataframe do cílové rozdílové tabulky pomocí merge operace. Tato operace je podobná příkazu SQL MERGE INTO , ale má další podporu pro odstranění a dodatečné podmínky v části aktualizace, vkládání a odstraňování.

Předpokládejme, že máte Spark dataframe, který obsahuje nová data pro události s eventId . Některé z těchto událostí již mohou být v tabulce k dispozici events . Chcete-li sloučit nová data do events tabulky, je třeba aktualizovat odpovídající řádky (tj. eventId již přítomné) a vložit nové řádky (tj eventId . nejsou k dispozici). Můžete spustit následující:

SQL

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

Podrobné informace o syntaxi naleznete v tématu.

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.alias("events").merge(
    updatesDF.alias("updates"),
    "events.eventId = updates.eventId") \
  .whenMatchedUpdate(set = { "data" : "updates.data" } ) \
  .whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()

Java

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

Dataset<Row> updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched()
  .updateExpr(
    new HashMap<String, String>() {{
      put("data", "events.data");
    }})
  .whenNotMatched()
  .insertExpr(
    new HashMap<String, String>() {{
      put("date", "updates.date");
      put("eventId", "updates.eventId");
      put("data", "updates.data");
    }})
  .execute();

Podrobnosti o syntaxi Scala, Java a Pythonu najdete v referenčních informacích k rozhraní Lake Lake API .

Sémantika operace

Tady je podrobný popis merge programové operace.

  • Může existovat libovolný počet whenMatched whenNotMatched klauzulí a.

    Poznámka

    V Databricks Runtime 7,2 a níže merge může mít maximálně dvě whenMatched klauzule a nejvýše 1 whenNotMatched klauzuli.

  • whenMatched klauzule jsou spouštěny, když zdrojový řádek odpovídá řádku cílové tabulky na základě podmínky shody. Tyto klauzule mají následující sémantiku.

    • whenMatched klauzule mohou mít nejvýše jednu update a jednu delete akci. updateAkce v rámci merge aktualizuje pouze zadané sloupce (podobně jako update operace) odpovídajícího cílového řádku. deleteAkce odstraní odpovídající řádek.

    • Každá whenMatched klauzule může mít volitelnou podmínku. Pokud tato podmínka klauzule existuje, update akce nebo delete se spustí pro všechny párové dvojice řádků cílového zdroje pouze v případě, že je podmínka klauzule true.

    • Pokud existuje více whenMatched klauzulí, jsou vyhodnocovány v pořadí, v jakém jsou zadány (tj. pořadí klauzulí). Všechny whenMatched klauzule s výjimkou posledního z nich musí mít podmínky.

    • Pokud whenMatched má více klauzulí podmínky a žádná z podmínek není pravdivá pro odpovídající dvojici řádků cílového zdroje, pak odpovídající cílový řádek zůstane beze změny.

    • Chcete-li aktualizovat všechny sloupce cílové rozdílové tabulky s odpovídajícími sloupci zdrojové datové sady, použijte whenMatched(...).updateAll() . Jedná se o ekvivalent:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      pro všechny sloupce cílové rozdílové tabulky. Proto tato akce předpokládá, že zdrojová tabulka má stejné sloupce jako v cílové tabulce, jinak dotaz vyvolá chybu analýzy.

      Poznámka

      Toto chování se změní, když je povolena Automatická migrace schématu. Podrobnosti najdete v tématu Automatický vývoj schématu .

  • whenNotMatched klauzule jsou spouštěny, pokud zdrojový řádek neodpovídá žádnému cílovému řádku na základě podmínky shody. Tyto klauzule mají následující sémantiku.

    • whenNotMatched klauzule můžou mít jenom tuto insert akci. Nový řádek je vygenerován na základě zadaného sloupce a odpovídajících výrazů. Nemusíte zadávat všechny sloupce v cílové tabulce. Pro neurčené cílové sloupce NULL je vložena.

      Poznámka

      V Databricks Runtime 6,5 a níže musíte zadat všechny sloupce v cílové tabulce pro INSERT akci.

    • Každá whenNotMatched klauzule může mít volitelnou podmínku. Pokud je splněna podmínka klauzule, je zdrojový řádek vložen pouze v případě, že je tato podmínka pro daný řádek pravdivá. V opačném případě se zdrojový sloupec ignoruje.

    • Pokud existuje více whenNotMatched klauzulí, jsou vyhodnocovány v pořadí, v jakém jsou zadány (tj. pořadí klauzulí). Všechny whenNotMatched klauzule s výjimkou posledního z nich musí mít podmínky.

    • Chcete-li vložit všechny sloupce cílové tabulky Delta s odpovídajícími sloupci zdrojové datové sady, použijte whenNotMatched(...).insertAll() . Jedná se o ekvivalent:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      pro všechny sloupce cílové rozdílové tabulky. Proto tato akce předpokládá, že zdrojová tabulka má stejné sloupce jako v cílové tabulce, jinak dotaz vyvolá chybu analýzy.

      Poznámka

      Toto chování se změní, když je povolena Automatická migrace schématu. Podrobnosti najdete v tématu Automatický vývoj schématu .

Důležité

mergeOperace může selhat, pokud se shoduje více řádků zdrojové datové sady a pokusí se aktualizovat stejné řádky cílové rozdílové tabulky. Vzhledem k sémantikě SQL sloučení je taková operace aktualizace nejednoznačná, protože není jasné, který zdrojový řádek by měl být použit k aktualizaci odpovídajícího cílového řádku. Zdrojovou tabulku můžete předzpracovat, aby se vyloučila možnost více shod. Podívejte se na příklad zachycení dat – příklad, který předzpracovává datovou sadu změn (tj. zdrojovou datovou sadu), aby zachovala pouze poslední změnu pro každý klíč předtím, než použije tuto změnu do cílové tabulky Delta.

Poznámka

V Databricks Runtime 7,3 LTS a vyšších jsou povoleny vícenásobné shody, pokud se shody nepodmíněně odstraní (protože nepodmíněné odstranění není jednoznačné i v případě, že existuje více shod).

Ověřování schématu

merge automaticky ověří, že schéma dat generovaných výrazy INSERT a Update je kompatibilní se schématem tabulky. K určení, zda je operace kompatibilní, používá následující pravidla merge :

  • updateV případě insert akcí a musí v cílové tabulce rozdílů existovat zadané cílové sloupce.
  • Pro updateAll insertAll akce a musí mít zdrojová datová sada všechny sloupce cílové rozdílové tabulky. Zdrojová datová sada může obsahovat nadbytečné sloupce a ignoruje se.
  • Pro všechny akce, pokud datový typ generovaný výrazy vytvářející cílové sloupce se liší od odpovídajících sloupců v cílové tabulce rozdílů, se merge pokusí je přetypovat na typy v tabulce.

Automatický vývoj schématu

Poznámka

Vývoj schématu v nástroji merge je k dispozici v Databricks Runtime 6,6 a vyšších.

Ve výchozím nastavení updateAll a insertAll přiřaďte všechny sloupce v cílové tabulce rozdílů se sloupci stejného názvu ze zdrojové datové sady. Všechny sloupce ve zdrojové datové sadě, které se neshodují se sloupci v cílové tabulce, se ignorují. V některých případech použití je však žádoucí automaticky přidat zdrojové sloupce do cílové tabulky Delta. Chcete-li automaticky aktualizovat schéma tabulky během merge operace s updateAll a insertAll (alespoň jedním z nich), můžete nastavit konfiguraci relace Spark spark.databricks.delta.schema.autoMerge.enabled na true před spuštěním merge operace.

Poznámka

  • K vývoji schématu dochází pouze v případě, že je k dispozici buď updateAll UPDATE SET * Akce () nebo insertAll ( INSERT * ), nebo obojí.
  • update a insert Akce nemůžou explicitně odkazovat na cílové sloupce, které ještě neexistují v cílové tabulce (i když jsou updateAll nebo insertAll jako jedna z klauzulí). Podívejte se na následující příklad:

Poznámka

V Databricks Runtime 7,4 a níže merge podporuje vývoj schématu pouze pro sloupce nejvyšší úrovně a nikoli pro vnořené sloupce.

Tady je několik příkladů vlivu merge operace s vývojem schématu i bez něj.

Sloupce Dotaz (v Scala) Chování bez vývoje schématu (výchozí) Chování při vývoji schématu
Cílové sloupce: key, value

Zdrojové sloupce: key, value, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Schéma tabulky zůstane beze změny. key value Aktualizováno/vloženy jsou pouze sloupce. Schéma tabulky je změněno na (key, value, newValue) . updateAll aktualizuje sloupce value a newValue a insertAll vloží řádky (key, value, newValue) .
Cílové sloupce: key, oldValue

Zdrojové sloupce: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
updateAll````insertAllakce a vyvolávají chybu, protože cílový sloupec není oldValue ve zdroji. Schéma tabulky je změněno na (key, oldValue, newValue) . updateAll aktualizuje sloupce key a newValue ponechává oldValue nezměněné insertAll řádky a vloží řádky (key, NULL, newValue) (to znamená, že oldValue jsou vloženy jako NULL ).
Cílové sloupce: key, oldValue

Zdrojové sloupce: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().update(Map(
"newValue" -> col("s.newValue")))
.whenNotMatched().insertAll()
.execute()
update vyvolá chybu, protože sloupec newValue v cílové tabulce neexistuje. update stále vyvolá chybu, protože sloupec newValue v cílové tabulce neexistuje.
Cílové sloupce: key, oldValue

Zdrojové sloupce: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insert(Map(
"key" -> col("s.key"),
"newValue" -> col("s.newValue")))
.execute()
insert vyvolá chybu, protože sloupec newValue v cílové tabulce neexistuje. insert stále vyvolá chybu, protože sloupec newValue v cílové tabulce neexistuje.

Ladění výkonu

Dobu trvání sloučení můžete zkrátit pomocí následujících přístupů:

  • Zmenšení hledaného prostoru pro shody: ve výchozím nastavení provede merge operace hledání shod ve zdrojové tabulce v celé tabulce rozdílů. Jedním ze způsobů, jak zrychlit, merge je omezit prostor pro hledání přidáním známých omezení do podmínky shody. Předpokládejme například, že máte tabulku, která je rozdělená na oddíly country a date a chcete použít merge k aktualizaci informací za poslední den a určitou zemi. Přidání podmínky

    events.date = current_date() AND events.country = 'USA'
    

    provede dotaz rychleji, protože hledá shody pouze v příslušných oddílech. Kromě toho také sníží pravděpodobnost konfliktu s ostatními souběžnými operacemi. Další podrobnosti naleznete v tématu řízení souběžnosti .

  • Kompaktní soubory: Pokud jsou data uložená v mnoha malých souborech, může dojít k pomalému načítání dat pro hledání shody. Pro zlepšení propustnosti čtení můžete zkomprimovat malé soubory do větších souborů. Podrobnosti najdete v části kompaktní soubory .

  • Řídit náhodné oddíly pro zápisy: operace rozdělí merge data vícekrát do výpočtu a zapíše aktualizovaná data. Počet úloh, které se používají k náhodnému vyřízení, je řízen konfigurací relace Spark spark.sql.shuffle.partitions . Nastavení tohoto parametru neovládá pouze paralelismus, ale také určuje počet výstupních souborů. Zvýšení hodnoty zvyšuje paralelismus, ale také generuje větší počet menších datových souborů.

  • Povolit optimalizované zápisy: u dělených tabulek merge může vytvořit mnohem větší počet malých souborů, než je počet náhodně rozmístěných oddílů. Důvodem je to, že každý náhodný úkol může zapisovat více souborů ve více oddílech a může se stát kritickým výkonem. To můžete optimalizovat povolením optimalizovaných zápisů.

Příklady sloučení

Tady je několik příkladů použití merge v různých scénářích.

V této části:

Odstranění duplicitních dat při psaní do rozdílových tabulek

Běžným případem použití ETL je shromažďování protokolů do rozdílových tabulek jejich připojením k tabulce. Zdroje ale často můžou generovat duplicitní záznamy protokolů a kroky odstranění duplicitních dat, které je potřeba, aby se postarly. Pomocí nástroje se merge můžete vyhnout vkládání duplicitních záznamů.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Poznámka

Datová sada obsahující nové protokoly musí být v rámci sebe Odstraněná. Díky sémantikě SQL sloučení se shoduje a odřadí nová data s existujícími daty v tabulce, pokud ale v rámci nové datové sady existují duplicitní data, je vložena. Proto před sloučením do tabulky odduplikujte nová data.

Pokud víte, že můžete získat duplicitní záznamy jenom za pár dní, můžete svůj dotaz lépe optimalizovat rozdělením tabulky podle data a zadáním rozsahu dat cílové tabulky, na které se bude shodovat.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

To je efektivnější než předchozí příkaz, protože vyhledává duplicity jenom za posledních 7 dní protokolů, ne pro celou tabulku. Kromě toho můžete pomocí tohoto vloženého sloučení se strukturovaným streamem provádět průběžné odstraňování duplicitních dat v protokolech.

  • V dotazu streamování můžete pomocí operace sloučení v aplikaci foreachBatch průběžně zapisovat všechna streamovaná data do tabulky Delta s odstraněním duplicit. Další informace o najdete v následujícím příkladu streamování foreachBatch .
  • V jiném dotazu pro streamování můžete data s odstraněnými duplicitními daty průběžně číst z této rozdílové tabulky. To je možné, protože sloučení jen pro vložení připojí pouze nová data do tabulky Delta.

Poznámka

Sloučení jen pro vložení je optimalizované tak, aby přidávalo data jenom v Databricks Runtime 6,2 a vyšších. V Databricks Runtime 6,1 a níže se zápisy z operací sloučení jen pro vložení nedají číst jako datový proud.

Operace s pomalou změnou dat (SCD) typu 2 na rozdílové tabulky

Další běžnou operací je SCD typ 2, který uchovává historii všech změn provedených u každého klíče v tabulce s dimenzemi. Tyto operace vyžadují aktualizaci stávajících řádků, aby bylo možné označit předchozí hodnoty klíčů jako staré, a vložit nové řádky jako nejnovější hodnoty. Vzhledem ke zdrojové tabulce s aktualizacemi a cílovou tabulkou s multidimenzionálními daty lze SCD typ 2 vyjádřit pomocí merge .

Tady je konkrétní příklad údržby historie adres pro zákazníka spolu s aktivním rozsahem kalendářních dat každé adresy. Když je potřeba aktualizovat adresu zákazníka, musíte označit předchozí adresu jako neaktuální, aktualizovat její aktivní rozsah dat a přidat novou adresu jako aktuální.

SCD typ 2 s použitím slučovacího poznámkového bloku

Získat poznámkový blok

Zápis dat změn do tabulky Delta

Podobně jako SCD, Další běžný případ použití, často označovaný Change Data Capture (CDC), je použít všechny změny dat vygenerované z externí databáze na rozdílovou tabulku. Jinými slovy je potřeba použít sadu aktualizací, které se odstraní a vloží do externí tabulky, a to pro rozdílovou tabulku. Můžete to udělat pomocí merge následujícího postupu.

Zápis dat o změnách pomocí poznámkového bloku sloučení

Získat poznámkový blok

Upsert z dotazů streamování pomocíforeachBatch

merge foreachBatch K zápisu komplexních upsertuje z dotazu streamování do tabulky Delta můžete použít kombinaci a (Další informace najdete v tématu foreachbatch ). Příklad:

  • Zápisy streamování v režimu aktualizace: to je mnohem efektivnější než úplný režim.
  • Zapsat datový proud změn databáze do rozdílové tabulky: slučovací dotaz pro zápis dat změn lze použít v foreachBatch k nepřetržitému použití datového proudu změn v tabulce rozdílů.
  • Zápis dat streamu do rozdílové tabulky s odstraněním duplicit: pomocí slučovacího dotazu , který se dá použít k odstranění duplicitních dat, můžete v foreachBatch nástroji průběžně zapisovat data (s duplicitními daty) do rozdílové tabulky s automatickým odstraněním duplicit.

Poznámka

  • Ujistěte se, že merge je váš příkaz uvnitř foreachBatch idempotentní jako restart dotazu streamování, může operaci použít u stejné dávky dat víckrát.
  • Při merge použití v systému se foreachBatch míra vstupních dat dotazu streamování (v StreamingQueryProgress grafu míry poznámkových bloků) dá ohlásit jako násobek skutečné sazby, při které se data generují ve zdroji. Důvodem je to merge , že čtení vstupních dat víckrát způsobuje vynásobení vstupních metrik. Pokud je toto kritické místo, můžete datový rámec Batch ukládat do mezipaměti dřív merge a pak ho po merge .

Zápis streamování do režimu aktualizace pomocí sloučení a foreachBatch poznámkového bloku

Získat poznámkový blok