Streamování čtení a zápisů tabulek

Rozdílový Lake je hluboko integrovaný se strukturovaným streamování Sparku prostřednictvím readStream a writeStream . Delta Lake přechází z mnoha omezení typicky spojených se systémy a soubory streamování, včetně:

  • Slučování malých souborů vytvořených při ingestování s nízkou latencí

  • Údržba zpracování "právě jednou" s více než jedním datovým proudem (nebo souběžnými dávkovými úlohami)

  • Efektivně zjišťovat, které soubory jsou při použití souborů jako zdroje pro datový proud nové

Rozdílová tabulka jako zdroj streamu

Když načtete rozdílovou tabulku jako zdroj streamu a použijete ji v dotazu streamování, dotaz zpracuje všechna data v tabulce a také všechna nová data, která dorazí po spuštění streamu.

Můžete načíst jak cesty, tak i tabulky jako datový proud.

spark.readStream.format("delta")
  .load("/mnt/delta/events")

import io.delta.implicits._
spark.readStream.delta("/mnt/delta/events")

nebo

import io.delta.implicits._

spark.readStream.format("delta")
  .table("events")

Můžete také:

  • Nastavte maximální velikost jakékoli mikrodávky, kterou má rozdílový Lake, na streaming nastavením maxFilesPerTrigger Možnosti. Určuje maximální počet nových souborů, které se mají v každé triggeru zohlednit. Výchozí hodnota je 1 000.
  • Rate – omezte množství dat zpracovaných v každé mikrodávce nastavením maxBytesPerTrigger Možnosti. Tím se nastaví "měkké maximum", což znamená, že Batch zpracuje přibližně tento objem dat a může zpracovat více než limit. Pokud používáte Trigger.Once pro streamování, tato možnost se ignoruje. Použijete-li tuto možnost společně s maxFilesPerTrigger , mikrodávka zpracuje data, dokud maxFilesPerTrigger maxBytesPerTrigger nedosáhne limitu nebo.

Poznámka

V případech, kdy se transakce zdrojové tabulky vyčistí kvůli logRetentionDuration konfiguraci a prodlevy streamování, zpracuje rozdílový Lake data odpovídající poslední dostupné historii transakcí zdrojové tabulky, ale datový proud selže. Výsledkem může být zahození dat.

Ignorovat aktualizace a odstranit

Strukturované streamování nezpracovává vstup, který není připojený, a vyvolá výjimku, pokud se v tabulce používané jako zdroj vyskytnou nějaké změny. Existují dvě hlavní strategie pro práci se změnami, které není možné automaticky šířit do podřízeného vztahu:

  • Můžete odstranit výstup a kontrolní bod a restartovat datový proud od začátku.
  • Můžete nastavit jednu z těchto dvou možností:
    • ignoreDeletes: ignoruje transakce, které odstraňují data v hranicích oddílu.
    • ignoreChanges: opětovné zpracování aktualizací, pokud se soubory musely přepsat ve zdrojové tabulce z důvodu operace změny dat, například UPDATE , MERGE INTO , DELETE (v rámci oddílů) nebo OVERWRITE . Stále se nezměněné řádky můžou vysílat, takže příjemci s dalšími příjemci by měli být schopni zpracovávat duplicity. Odstranění se nešíří pro příjem dat. ignoreChanges subsumes ignoreDeletes . Proto pokud použijete ignoreChanges , váš datový proud nebude přerušen buď odstraněním, nebo aktualizací zdrojové tabulky.

Příklad

Předpokládejme například, že máte tabulku user_events se date user_email sloupci, a, action které jsou rozděleny na oddíly date . Vyčerpáte datový proud z user_events tabulky a z nich potřebujete data z GDPR odstranit.

Při odstranění v hranicích oddílu (tj. je WHERE ve sloupci oddílu) jsou soubory již rozděleny podle hodnoty, takže odstranění pouze tyto soubory z metadat odstraní. Pokud tedy chcete jenom odstranit data z některých oddílů, můžete použít:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/mnt/delta/user_events")

Pokud však potřebujete data odstranit na základě user_email , budete muset použít:

spark.readStream.format("delta")
  .option("ignoreChanges", "true")
  .load("/mnt/delta/user_events")

Pokud aktualizujete user_email UPDATE příkaz pomocí příkazu, bude přepsán soubor obsahující danou user_email otázku. Když použijete ignoreChanges , nový záznam se rozšíří na navazující příjem dat se všemi ostatními nezměněnými záznamy, které se nacházely ve stejném souboru. Vaše logika by měla být schopná zpracovat tyto příchozí duplicitní záznamy.

Zadat počáteční pozici

Poznámka

Tato funkce je k dispozici na Databricks Runtime 7,3 LTS a novějších verzích.

Pomocí následujících možností můžete určit výchozí bod zdroje rozdílového streamování, aniž byste museli zpracovat celou tabulku.

  • startingVersion: Rozdílová verze jezera, ze které se má začít. Všechny změny tabulky počínaje touto verzí (včetně) budou čteny zdrojem streamování. Verze potvrzení můžete získat ze version sloupce popsání výstupu příkazu v historii.

    V Databricks Runtime 7,4 a novějším, chcete-li vrátit pouze nejnovější změny, zadejte latest .

  • startingTimestamp: Časové razítko, ze kterého se má začít. Všechny změny tabulky potvrzené v nebo po časovém razítku (včetně) budou čteny zdrojem streamování. Jedna z těchto:

    • Řetězec časového razítka. Například, "2019-01-01T00:00:00.000Z".
    • Řetězec data. Například, "2019-01-01".

Současně nelze nastavit obě možnosti. můžete použít jenom jeden z nich. Projeví se pouze při spuštění nového dotazu streamování. Pokud byl spuštěn dotaz streamování a v jeho kontrolním rámci byl zaznamenán průběh, budou tyto možnosti ignorovány.

Důležité

I když můžete spustit zdroj streamování ze zadané verze nebo časového razítka, schéma zdroje streamování je vždycky nejnovější schéma rozdílové tabulky. Je nutné zajistit, aby se po zadané verzi nebo časovém razítku nekompatibilní Změna schématu na rozdílovou tabulku. Jinak může zdroj streamování vracet nesprávné výsledky při čtení dat s nesprávným schématem.

Příklad

Předpokládejme například, že máte tabulku user_events . Pokud chcete číst změny od verze 5, použijte:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/mnt/delta/user_events")

Pokud chcete číst změny od 2018-10-18, použijte:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/mnt/delta/user_events")

Rozdílová tabulka jako jímka

Data můžete také zapisovat do rozdílové tabulky pomocí strukturovaného streamování. Protokol transakcí umožňuje, aby rozdílový Lake zajišťoval zaručené zpracování právě jednou, a to i v případě, že jsou v tabulce souběžně spuštěny jiné datové proudy nebo dávkové dotazy.

Metriky

Poznámka

K dispozici v Databricks Runtime 8,1 a novějších.

Počet bajtů a počet souborů, které se ještě mají zpracovat v procesu dotazu streamování jako numBytesOutstanding metrika a, najdete v části numFilesOutstanding . Pokud datový proud spouštíte v poznámkovém bloku, můžete tyto metriky zobrazit na kartě nezpracovaná data na řídicím panelu průběh dotazování streamování:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Režim připojení

Ve výchozím nastavení se streamy spouštějí v režimu připojení, který do tabulky přidává nové záznamy.

Můžete použít metodu Path:

Python

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events")

Scala

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .start("/mnt/delta/events")

import io.delta.implicits._
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .delta("/mnt/delta/events")

nebo metoda tabulky:

Python

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .table("events")

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .table("events")

Režim dokončení

Strukturované streamování můžete použít také k nahrazení celé tabulky každou dávkou. Jedním z příkladů případu použití je vypočítat souhrn pomocí agregace:

spark.readStream
  .format("delta")
  .load("/mnt/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/mnt/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/mnt/delta/eventsByCustomer")

Předchozí příklad průběžně aktualizuje tabulku, která obsahuje agregovaný počet událostí podle zákazníka.

Pro aplikace s dalšími požadavky na latenci mírné můžete ušetřit výpočetní prostředky pomocí jednorázových aktivačních událostí. Pomocí těchto kroků můžete aktualizovat souhrnné tabulky agregace podle daného plánu a zpracovávat pouze nová data, která byla od poslední aktualizace dodána.