Delta-táblastreamelés olvasása és írása

A Delta Lake mélyen integrálva van a Spark strukturált streameléssel readStream és writeStreama . A Delta Lake leküzdi a streamelési rendszerekhez és fájlokhoz általában kapcsolódó számos korlátozást, többek között a következőket:

  • Kis késésű betöltéssel létrehozott kis fájlok szenesítése.
  • A "pontosan egyszer" feldolgozás fenntartása egynél több streammel (vagy egyidejű kötegelt feladatokkal).
  • Hatékonyan felderítheti, hogy mely fájlok újak, amikor fájlokat használ a stream forrásaként.

Feljegyzés

Ez a cikk a Delta Lake-táblák streamforrásként és fogadóként való használatát ismerteti. Ha tudni szeretné, hogyan tölthet be adatokat streamelő táblákkal a Databricks SQL-ben, olvassa el az Adatok betöltése streamtáblák használatával a Databricks SQL-ben című témakört.

Delta-tábla forrásként

A strukturált streamelés növekményesen olvassa be a Delta-táblákat. Bár a streamelési lekérdezés aktív egy Delta-táblán, a rendszer idempotens módon dolgozza fel az új rekordokat a forrástábla új táblaverzióinak véglegesítéseként.

Az alábbi példakód egy streamelési olvasás konfigurálását mutatja be a táblanév vagy a fájl elérési útja alapján.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Fontos

Ha egy Delta-tábla sémája megváltozik, miután streamelési olvasás kezdődött a táblával, a lekérdezés meghiúsul. A legtöbb sémamódosítás esetén újraindíthatja a streamet a sémaeltérés feloldásához és a feldolgozás folytatásához.

A Databricks Runtime 12.2 LTS-ben és alatta nem streamelhet olyan Delta-táblából, amelyen engedélyezve van az oszlopleképezés, amely nem additív sémafejlődésen ment keresztül, például oszlopok átnevezése vagy elvetése. Részletekért lásd: Streamelés oszlopleképezéssel és sémamódosításokkal.

Bemeneti sebesség korlátozása

A mikro kötegek szabályozásához a következő lehetőségek állnak rendelkezésre:

  • maxFilesPerTrigger: Hány új fájlt kell figyelembe venni minden mikrokötegben. Az alapértelmezett érték 1000.
  • maxBytesPerTrigger: Mennyi adat lesz feldolgozva az egyes mikrokötegekben. Ez a beállítás "soft max" értéket állít be, ami azt jelenti, hogy egy köteg körülbelül ennyi adatot dolgoz fel, és a korlátnál többet is feldolgozhat annak érdekében, hogy a streamlekérdezés előrehaladjon olyan esetekben, amikor a legkisebb bemeneti egység nagyobb ennél a korlátnál. Ez alapértelmezés szerint nincs beállítva.

Ha együtt maxFilesPerTriggerhasználjamaxBytesPerTrigger, a mikroköteg addig dolgozza fel az adatokat, amíg el nem éri a maxFilesPerTrigger korlátot.maxBytesPerTrigger

Feljegyzés

Ha a forrástábla tranzakciói a konfiguráció miatt logRetentionDurationtörlődnek, és a streamelési lekérdezés megpróbálja feldolgozni ezeket a verziókat, a lekérdezés alapértelmezés szerint nem tudja elkerülni az adatvesztést. Beállíthatja, hogy false figyelmen kívül hagyja az failOnDataLoss elveszett adatokat, és folytassa a feldolgozást.

Delta Lake change data capture (CDC) hírcsatorna streamelése

A Delta Lake módosítja az adatcsatorna változásait egy Delta-táblában, beleértve a frissítéseket és a törléseket is. Ha engedélyezve van, streamelhet a változásadatcsatornából és az írási logikából a beszúrások, frissítések és törlések feldolgozásához alsóbb rétegbeli táblákba. Bár a változási adatcsatorna adatkimenete kissé eltér az általa leírt Delta-táblától, ez megoldást nyújt a növekményes változások propagálására az alsóbb rétegbeli táblákra a medallion architektúrában.

Fontos

A Databricks Runtime 12.2 LTS-ben és alatta nem streamelhet a változásadatcsatornából olyan Delta-tábla esetében, amelynek oszlopleképezése engedélyezve van, és amely nem additív sémafejlődésen ment keresztül, például oszlopok átnevezése vagy elvetése. Lásd: Streamelés oszlopleképezéssel és sémamódosításokkal.

Frissítések és törlések mellőzése

A strukturált streamelés nem kezeli a nem hozzáfűző bemeneteket, és kivételt okoz, ha bármilyen módosítás történik a forrásként használt táblán. Az alsóbb rétegben nem automatikusan propagált változások kezelésére két fő stratégia létezik:

  • Törölheti a kimenetet és az ellenőrzőpontot, és az elejétől újraindíthatja a streamet.
  • A következő két lehetőség közül választhat:
    • ignoreDeletes: figyelmen kívül hagyja azokat a tranzakciókat, amelyek adatokat törölnek a partícióhatárokon.
    • skipChangeCommits: hagyja figyelmen kívül a meglévő rekordokat törlő vagy módosító tranzakciókat. skipChangeCommits alösszegek ignoreDeletes.

Feljegyzés

A Databricks Runtime 12.2 LTS-ben és újabb skipChangeCommits verziókban elavult az előző beállítás ignoreChanges. A Databricks Runtime 11.3 LTS-ben és az alacsonyabb ignoreChanges verzióban ez az egyetlen támogatott lehetőség.

A szemantikája ignoreChanges jelentősen eltér a skipChangeCommits. Ha ignoreChanges engedélyezve van, a forrástáblában lévő újraírt adatfájlok újra ki lesznek bocsátva egy adatmódosítási művelet után, például UPDATE: , MERGE INTODELETE (partíciókon belül) vagy OVERWRITE. A változatlan sorokat gyakran új sorok mellett bocsátják ki, így az alsóbb rétegbeli fogyasztóknak képesnek kell lenniük az ismétlődések kezelésére. A rendszer nem propagálja a törléseket az alsóbb rétegben. ignoreChanges alösszegek ignoreDeletes.

skipChangeCommits teljes mértékben figyelmen kívül hagyja a fájlmódosítási műveleteket. Azok az adatfájlok, amelyeket a forrástáblában az adatmódosítási művelet ( például UPDATEa , MERGE INTO, ) DELETEmiatt írnak át, és OVERWRITE teljes mértékben figyelmen kívül hagyják. A felsőbb rétegbeli forrástáblák változásainak tükrözéséhez külön logikát kell implementálnia a módosítások propagálásához.

A konfigurált számítási feladatok továbbra is ismert szemantikával ignoreChanges működnek, de a Databricks az összes új számítási feladat használatát skipChangeCommits javasolja. A számítási feladatok ignoreChanges migrálása újrabontási logikát skipChangeCommits igényel.

Példa

Tegyük fel például, hogy van egy táblája dateuser_events , user_emailés action az oszlopokat particionáltadate. A táblázatból user_events streamelhet, és a GDPR miatt törölnie kell belőle az adatokat.

Ha partícióhatárokon töröl (vagyis WHERE egy partícióoszlopon van), a fájlok már érték szerint vannak szegmentálva, így a törlés egyszerűen eltávolítja ezeket a fájlokat a metaadatokból. Ha egy teljes adatpartíciót töröl, az alábbiakat használhatja:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/tmp/delta/user_events")

Ha több partícióban töröl adatokat (ebben a példában a szűrésre user_email), használja a következő szintaxist:

spark.readStream.format("delta")
  .option("skipChangeCommits", "true")
  .load("/tmp/delta/user_events")

Ha módosít egy utasítást user_emailUPDATE , a szóban forgó fájlt tartalmazó user_email fájl újra lesz írva. A módosított adatfájlok figyelmen kívül hagyása skipChangeCommits .

Kezdeti pozíció megadása

Az alábbi beállítások segítségével a Delta Lake streamelési forrásának kiindulópontját a teljes tábla feldolgozása nélkül adhatja meg.

  • startingVersion: A Delta Lake-verzió, amelyből kiindulni szeretne. A Databricks azt javasolja, hogy a legtöbb számítási feladat esetében kihagyja ezt a lehetőséget. Ha nincs beállítva, a stream a legújabb elérhető verziótól indul, beleértve az adott pillanatban a táblázat teljes pillanatképét.

    Ha meg van adva, a stream beolvassa a Delta-tábla minden módosítását a megadott verziótól kezdve (a teljes verziót is beleértve). Ha a megadott verzió már nem érhető el, a stream nem indul el. A véglegesítési verziókat a version DESCRIBE HISTORY parancs kimenetének oszlopából szerezheti be.

    Ha csak a legújabb módosításokat szeretné visszaadni, adja meg a következőt latest: .

  • startingTimestamp: A kezdéshez megadott időbélyeg. Az időbélyegen vagy után véglegesített összes táblamódosítást (beleértve) a streamolvasó felolvassa. Ha a megadott időbélyeg megelőzi az összes tábla véglegesítését, a streamelési olvasás a legkorábbi elérhető időbélyeggel kezdődik. Az alábbiak egyike:

    • Időbélyeg-sztring. Például: "2019-01-01T00:00:00.000Z".
    • Dátumsztring. Például: "2019-01-01".

A két beállítás egyszerre nem állítható be. Ezek csak új streamelési lekérdezés indításakor lépnek érvénybe. Ha egy streamlekérdezés elindult, és a folyamat az ellenőrzőponton lett rögzítve, a rendszer figyelmen kívül hagyja ezeket a beállításokat.

Fontos

Bár a streamforrást elindíthatja egy megadott verzióról vagy időbélyegről, a streamforrás sémája mindig a Delta-tábla legújabb sémája. Győződjön meg arról, hogy a megadott verzió vagy időbélyeg után nem történt inkompatibilis sémamódosítás a Delta táblában. Ellenkező esetben előfordulhat, hogy a streamforrás helytelen eredményeket ad vissza, amikor helytelen sémával olvassa be az adatokat.

Példa

Tegyük fel például, hogy van egy táblája user_events. Ha az 5. verzió óta szeretné elolvasni a módosításokat, használja a következőt:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/tmp/delta/user_events")

Ha 2018-10-18 óta szeretné elolvasni a módosításokat, használja a következőt:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/tmp/delta/user_events")

Kezdeti pillanatkép feldolgozása adatok elvetés nélkül

Feljegyzés

Ez a funkció a Databricks Runtime 11.3 LTS-en és újabb verziókban érhető el. Ez a funkció a nyilvános előzetes verzióban érhető el.

Ha deltatáblát használ streamforrásként, a lekérdezés először feldolgozza a táblában található összes adatot. Az ebben a verzióban található Delta-táblát kezdeti pillanatképnek nevezzük. Alapértelmezés szerint a Delta-tábla adatfájljai a legutóbb módosított fájl alapján lesznek feldolgozva. Az utolsó módosítási idő azonban nem feltétlenül a rekord eseményidejének sorrendjét jelöli.

Egy meghatározott vízjelet tartalmazó állapotalapú streamlekérdezésben a fájlok módosítási idő szerinti feldolgozása azt eredményezheti, hogy a rekordok feldolgozása helytelen sorrendben történik. Ez azt eredményezheti, hogy a rekordok a vízjel késői eseményeiként csökkennek.

Az adatcseppek problémáját az alábbi beállítás engedélyezésével kerülheti el:

  • withEventTimeOrder: Azt jelzi, hogy a kezdeti pillanatképet eseményidőrenddel kell-e feldolgozni.

Ha az esemény időrendje engedélyezve van, a kezdeti pillanatképadatok eseményideje időgyűjtőkre van osztva. Minden mikro köteg feldolgoz egy gyűjtőt az időtartományon belüli adatok szűrésével. A maxFilesPerTrigger és a maxBytesPerTrigger konfigurációs beállítások továbbra is alkalmazhatók a mikrobatch méretének szabályozására, de csak hozzávetőlegesen a feldolgozás jellege miatt.

Az alábbi ábrán ez a folyamat látható:

Kezdeti pillanatkép

A funkcióval kapcsolatos jelentős információk:

  • Az adatcsepp-probléma csak akkor fordul elő, ha egy állapotalapú streamelési lekérdezés kezdeti Delta-pillanatképe az alapértelmezett sorrendben van feldolgozva.
  • A stream lekérdezés elindítása után nem módosítható withEventTimeOrder a kezdeti pillanatkép feldolgozása. Ha módosítani szeretné az withEventTimeOrder újraindítást, törölnie kell az ellenőrzőpontot.
  • Ha olyan stream-lekérdezést futtat, amelyen engedélyezve van az EventTimeOrder, nem állíthatja vissza olyan DBR-verzióra, amely nem támogatja ezt a funkciót, amíg a kezdeti pillanatkép-feldolgozás be nem fejeződik. Ha vissza kell állítania, megvárhatja a kezdeti pillanatkép befejezését, vagy törölheti az ellenőrzőpontot, és újraindíthatja a lekérdezést.
  • Ez a funkció nem támogatott a következő gyakori forgatókönyvekben:
    • Az eseményidő oszlop egy generált oszlop, és nem vetületi átalakítások vannak a Delta-forrás és a vízjel között.
    • Van egy vízjel, amely több Delta-forrással rendelkezik a stream lekérdezésben.
  • Ha az esemény időrendje engedélyezve van, a Delta kezdeti pillanatképének feldolgozása lassabb lehet.
  • Minden mikro köteg megvizsgálja a kezdeti pillanatképet, hogy a megfelelő eseményidőtartományon belül szűrje az adatokat. A gyorsabb szűrőművelet érdekében érdemes delta forrásoszlopot használni eseményidőként, hogy az adatok kihagyása alkalmazható legyen (ellenőrizze , hogy a Delta Lake mikor hagyja ki az adatokat). Emellett a tábla particionálása az eseményidő oszlop mentén tovább felgyorsíthatja a feldolgozást. A Spark felhasználói felületén ellenőrizheti, hogy egy adott mikro köteg hány deltafájlt vizsgál.

Példa

Tegyük fel, hogy van egy oszlopot tartalmazó event_time táblázatauser_events. A streamelési lekérdezés egy összesítő lekérdezés. Ha biztosítani szeretné, hogy a pillanatképek kezdeti feldolgozása során ne csökkenjen az adat, a következőt használhatja:

spark.readStream.format("delta")
  .option("withEventTimeOrder", "true")
  .load("/tmp/delta/user_events")
  .withWatermark("event_time", "10 seconds")

Feljegyzés

Ezt a fürt Spark-konfigurációjával is engedélyezheti, amely az összes streamelési lekérdezésre vonatkozik: spark.databricks.delta.withEventTimeOrder.enabled true

Delta-tábla fogadóként

A Strukturált streamelés használatával adatokat is írhat a Delta-táblába. A tranzakciónapló lehetővé teszi, hogy a Delta Lake pontosan egyszeri feldolgozást garantáljon, még akkor is, ha más streamek vagy kötegelt lekérdezések futnak párhuzamosan a táblán.

Feljegyzés

A Delta Lake VACUUM függvény eltávolítja a Delta Lake által nem kezelt összes fájlt, de kihagyja a _kezdőkönyvtárakat. A Delta-táblákhoz tartozó egyéb adatokkal és metaadatokkal együtt biztonságosan tárolhat ellenőrzőpontokat egy olyan könyvtárstruktúra használatával, mint például <table-name>/_checkpointsa .

Mérőszámok

Megtudhatja, hogy hány bájtot és hány fájlt kell még feldolgozni egy streamelési lekérdezési folyamatban, mint a numBytesOutstanding metrikákat.numFilesOutstanding További metrikák:

  • numNewListedFiles: Azon Delta Lake-fájlok száma, amelyek a köteg hátralékának kiszámításához lettek felsorolva.
    • backlogEndOffset: A hátralék kiszámításához használt táblaverzió.

Ha egy jegyzetfüzetben futtatja a streamet, a streamelési lekérdezés folyamatának irányítópultján, a Nyers adatok lapon láthatja ezeket a metrikákat:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Hozzáfűzési mód

Alapértelmezés szerint a streamek hozzáfűzési módban futnak, amely új rekordokat ad hozzá a táblához.

Használhatja az elérési utat:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/_checkpoints/")
   .start("/delta/events")
)

Scala

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .start("/tmp/delta/events")

vagy a toTable módszer, az alábbiak szerint:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Kész mód

A strukturált streamelés használatával a teljes táblázatot lecserélheti minden kötegre. Az egyik példahasználati eset egy összegzés kiszámítása összesítéssel:

Python

(spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")
)

Scala

spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")

Az előző példa folyamatosan frissít egy táblát, amely az ügyfelek által összesített események számát tartalmazza.

Az enyhébb késési követelményekkel rendelkező alkalmazások esetében egyszeri eseményindítókkal mentheti a számítási erőforrásokat. Ezekkel frissítheti az összefoglaló aggregációs táblákat egy adott ütemezés szerint, és csak az utolsó frissítés óta érkezett új adatokat dolgozza fel.

Stream-statikus illesztések végrehajtása

Stream-statikus illesztések végrehajtásához támaszkodhat a Delta Lake tranzakciós garanciáira és verziószámozási protokolljára. A stream-statikus illesztések állapot nélküli illesztés használatával csatlakoztatják a Delta-tábla legújabb érvényes verzióját (a statikus adatokat) egy adatfolyamhoz.

Amikor az Azure Databricks stream-statikus illesztésben dolgoz fel egy mikroköteget, a statikus Delta-tábla adatainak legújabb érvényes verziója csatlakozik az aktuális mikrokötegben található rekordokkal. Mivel az illesztés állapot nélküli, nem kell konfigurálnia a vízjelezést, és alacsony késéssel tudja feldolgozni az eredményeket. Az illesztésben használt statikus Delta tábla adatainak lassan kell változnia.

streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
  .join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .table("orders_with_customer_info")
)

Upsert a streamelési lekérdezésekből a következő használatával: foreachBatch

Összetett upsertek kombinációját és írását használhatja mergeforeachBatch egy streamelési lekérdezésből egy Delta-táblába. Lásd: A foreachBatch használata tetszőleges adatelnyelőkbe való íráshoz című témakört.

Ez a minta számos alkalmazást tartalmaz, köztük a következőket:

  • Stream-aggregátumok írása frissítési módban: Ez sokkal hatékonyabb, mint a teljes mód.
  • Adatbázisváltozások adatfolyamának írása Delta-táblába: A változásadatok írására szolgáló egyesítési lekérdezés használatával foreachBatch folyamatosan alkalmazhatja a változásfolyamot egy Delta-táblára.
  • Adatstream írása Delta-táblába deduplikációval: A deduplikációra vonatkozó csak beszúrási egyesítési lekérdezés használatával foreachBatch folyamatosan írhat adatokat (duplikációkkal) egy Delta-táblába automatikus deduplikációval.

Feljegyzés

  • Győződjön meg arról, hogy a merge belső foreachBatch utasítás idempotens, mivel a streamelési lekérdezés újraindításai többször is alkalmazhatják a műveletet ugyanazon az adatkötegen.
  • Amikor merge használatban foreachBatchvan, a streamelési lekérdezés bemeneti adatsebessége (a notebook sebességi grafikonján keresztül StreamingQueryProgress jelentve és látható) a forrásnál az adatok létrehozásának tényleges sebességének többszöröseként jelenhet meg. Ennek az az oka, hogy merge többször olvassa be a bemeneti adatokat, ami a bemeneti metrikák szorzását okozza. Ha szűk keresztmetszetről van szó, gyorsítótárazhatja a kötegelt DataFrame-et a(z) merge előtt , majd merge után törölheti.

Az alábbi példa bemutatja, hogyan használhatja az SQL-t foreachBatch a feladat végrehajtásához:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

A Delta Lake API-k használatával streamelési upserts-eket is végrehajthat, ahogyan az alábbi példában is látható:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Idempotens tábla írása foreachBatch

Feljegyzés

A Databricks azt javasolja, hogy minden frissíteni kívánt fogadóhoz külön streamelési írást konfiguráljon. Ha foreachBatch több táblába szeretne írni, szerializálja az írásokat, ami csökkenti a párhuzamosságokat, és növeli az általános késést.

A Delta-táblák az alábbi DataFrameWriter lehetőségeket támogatják, hogy idempotensen belül foreachBatch több táblába is írjanak:

  • txnAppId: Egyedi sztring, amelyet az egyes DataFrame-írások továbbíthatnak. Használhatja például a StreamingQuery azonosítót txnAppId.
  • txnVersion: Egy monoton módon növekvő szám, amely tranzakciós verzióként működik.

A Delta Lake az ismétlődő írások kombinációját txnAppId és txnVersion azonosítását használja, és figyelmen kívül hagyja őket.

Ha egy kötegírás meghiúsul, a köteg újrafuttatása ugyanazzal az alkalmazással és kötegazonosítóval segíti a futtatókörnyezetet az ismétlődő írások helyes azonosításában és figyelmen kívül hagyásában. Az alkalmazásazonosító (txnAppId) bármely felhasználó által létrehozott egyedi sztring lehet, és nem kell kapcsolódnia a streamazonosítóhoz. Lásd: A foreachBatch használata tetszőleges adatelnyelőkbe való íráshoz című témakört.

Figyelmeztetés

Ha törli a streamelési ellenőrzőpontot, és egy új ellenőrzőponttal újraindítja a lekérdezést, egy másikat txnAppIdkell megadnia. Az új ellenőrzőpontok a kötegazonosítóval kezdődnek 0. A Delta Lake a kötegazonosítót használja txnAppId egyedi kulcsként, és kihagyja a már látott értékeket tartalmazó kötegeket.

Az alábbi példakód ezt a mintát mutatja be:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}