Delta Lake change data feed használata az Azure Databricksben

Feljegyzés

  • Ez a cikk bemutatja, hogyan rögzíthet és kérdezhet le sorszintű változásadatokat a Delta-táblákhoz a változásadatcsatorna funkcióval. Ha meg szeretné tudni, hogyan frissíthet táblákat a Delta Live Tables-folyamatokban a forrásadatok változásai alapján, olvassa el az APPLY CHANGES API: Change Data Capture egyszerűsítése a Delta Live Tablesben című témakört.

A változásadatcsatorna lehetővé teszi, hogy az Azure Databricks nyomon kövesse a Delta-tábla verziói közötti sorszintű változásokat. Ha egy Delta-táblában engedélyezve van, a futtatókörnyezeti rekordok megváltoztatják a táblába írt összes adat eseményeit . Ide tartoznak a soradatok, valamint a metaadatok, amelyek jelzik, hogy a megadott sor be lett-e szúrva, törölve vagy frissítve.

A batch-lekérdezések változási eseményeit a Spark SQL, az Apache Spark DataFrames és a strukturált streamelés használatával olvashatja el.

Fontos

A változásadatcsatorna a táblaelőzményekkel együtt működik a változásadatok megadásához. Mivel a Delta-tábla klónozása külön előzményt hoz létre, a klónozott táblák változási adatcsatornája nem egyezik meg az eredeti táblával.

Használati esetek

Az adatcsatorna módosítása alapértelmezés szerint nincs engedélyezve. A változásadatcsatorna engedélyezésekor az alábbi használati eseteknek kell vezetnie.

  • Ezüst- és aranytáblák: A Delta Lake teljesítményének javítása az ETL- és ELT-műveletek felgyorsítása és egyszerűsítése érdekében a kezdeti MERGE, UPDATEilletve DELETE a műveletek után csak sorszintű módosítások feldolgozásával.
  • Materializált nézetek: A BI-ban és az elemzésekben használható információk naprakész, összesített nézeteit hozhatja létre anélkül, hogy újra kellene dolgoznia a teljes mögöttes táblát, ehelyett csak ott kell frissítenie, ahol a változások történtek.
  • Módosítások továbbítása: Változásadatcsatorna küldése olyan alsóbb rétegbeli rendszereknek, mint a Kafka vagy az RDBMS, amelyek segítségével az adatfolyamok későbbi szakaszaiban növekményesen feldolgozhatók.
  • Naplótábla: A változásadatcsatorna rögzítése Delta-táblaként folyamatos tárolást és hatékony lekérdezési képességet biztosít az összes változás időbeli megtekintéséhez, beleértve a törlések időpontját és a frissítések elvégzését.

Változásadatcsatorna engedélyezése

Explicit módon engedélyeznie kell a változásadatcsatorna-beállítást az alábbi módszerek egyikével:

  • Új tábla: A táblatulajdonság delta.enableChangeDataFeed = true beállítása a CREATE TABLE parancsban.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Meglévő tábla: A táblatulajdonság delta.enableChangeDataFeed = true beállítása a ALTER TABLE parancsban.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Minden új tábla:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Fontos

A rendszer csak a változásadatcsatorna engedélyezése után végrehajtott módosításokat rögzíti, a tábla korábbi módosításait nem rögzíti a rendszer.

Adattárolás módosítása

Az Azure Databricks-rekordok a táblakönyvtár alatti mappában módosítják a , és a műveletek adataitUPDATEDELETE._change_dataMERGE Egyes műveletek, például a csak beszúrási műveletek és a teljes partíció törlések nem hoznak létre adatokat a címtárban, mert az _change_data Azure Databricks hatékonyan tudja kiszámítani a változásadatcsatornát közvetlenül a tranzakciónaplóból.

A mappában lévő _change_data fájlok a tábla adatmegőrzési szabályzatát követik. Ezért a VÁKUUM parancs futtatásakor az adatcsatornák adatainak módosítása is törlődik.

Módosítások olvasása kötegelt lekérdezésekben

A kezdéshez és a befejezéshez megadhat verziót vagy időbélyeget. A lekérdezésekben a kezdő és a befejező verziók és az időbélyegek szerepelnek. Ha egy adott kezdő verzióról a tábla legújabb verziójára szeretné beolvasni a módosításokat, csak a kezdő verziót vagy az időbélyeget adja meg.

A verziót egész számként, az időbélyegeket pedig sztringként adja meg.yyyy-MM-dd[ HH:mm:ss[.SSS]]

Ha egy korábbi verziót vagy időbélyeget ad meg, mint amelyik változáseseményeket rögzített – vagyis amikor a változásadatcsatorna engedélyezve volt – hibaüzenet jelenik meg, amely azt jelzi, hogy a változásadatcsatorna nem volt engedélyezve.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Változások olvasása streamelési lekérdezésekben

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Ha a táblázat olvasása közben szeretné lekérni a módosítási adatokat, állítsa a beállítást readChangeFeed a következőre true: . Az startingVersion vagy startingTimestamp nem kötelező, és ha a stream nem adja meg, a stream a táblázat legújabb pillanatképét adja vissza a streameléskor, és INSERT a jövőben változásadatokként változik. Az olyan lehetőségek, mint a sebességkorlátok (maxFilesPerTrigger), maxBytesPerTriggerés excludeRegex a változásadatok olvasásakor is támogatottak.

Feljegyzés

A sebességkorlátozás a kezdő pillanatkép-verziótól eltérő verziók esetében lehet atomi. Ez azt jelzi, hogy a véglegesítés teljes verziója korlátozott lesz, vagy a teljes véglegesítés vissza lesz adva.

Alapértelmezés szerint, ha egy felhasználó egy táblán az utolsó véglegesítést meghaladó verziót vagy időbélyeget ad át, a rendszer timestampGreaterThanLatestCommit hibát jelez. A Databricks Runtime 11.3 LTS és újabb verzióiban az adatcsatorna módosítása képes kezelni a tartományon kívüli verzió esetét, ha a felhasználó a következő konfigurációt trueállítja be:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Ha egy táblában az utolsó véglegesítésnél nagyobb kezdőverziót vagy egy tábla utolsó véglegesítésénél újabb kezdési időbélyeget ad meg, akkor az előző konfiguráció engedélyezésekor a rendszer üres olvasási eredményt ad vissza.

Ha egy táblában az utolsó véglegesítésnél nagyobb végverziót vagy egy tábla utolsó véglegesítésénél újabb befejezési időbélyeget ad meg, akkor amikor az előző konfiguráció engedélyezve van kötegelt olvasási módban, a rendszer visszaadja a kezdő verzió és az utolsó véglegesítés közötti összes módosítást.

Mi a változásadatcsatorna sémája?

Amikor egy tábla változásadatcsatornájából olvas, a rendszer a legújabb táblaverzió sémáját használja.

Feljegyzés

A legtöbb sémamódosítási és evolúciós művelet teljes mértékben támogatott. Az oszlopleképezést engedélyező táblázat nem támogatja az összes használati esetet, és eltérő viselkedést mutat. Lásd: Adatcsatornákra vonatkozó korlátozások módosítása az oszlopleképezést engedélyező táblák esetében.

A Változástábla sémájának adatoszlopai mellett a változásadatcsatorna metaadatoszlopokat is tartalmaz, amelyek azonosítják a változásesemény típusát:

Oszlop neve Típus Értékek
_change_type Sztring insert, update_preimage , update_postimagedelete(1)
_commit_version Hosszú A módosítást tartalmazó Delta-napló vagy táblaverzió.
_commit_timestamp Időbélyegző A véglegesítés létrehozásakor társított időbélyeg.

(1)preimage a frissítés előtti érték, postimage a frissítés utáni érték.

Feljegyzés

Nem engedélyezhető a tábla adatcsatornájának módosítása, ha a séma a hozzáadott oszlopokkal azonos nevű oszlopokat tartalmaz. Nevezze át a tábla oszlopait az ütközés feloldásához, mielőtt engedélyezni szeretné az adatcsatorna módosítását.

Adatcsatornákra vonatkozó korlátozások módosítása olyan táblák esetében, amelyeken engedélyezve van az oszlopleképezés

Ha egy Delta-táblában engedélyezve van az oszlopleképezés, a meglévő adatok adatfájljainak újraírása nélkül is elvetheti vagy átnevezheti a tábla oszlopait. Ha az oszlopleképezés engedélyezve van, a változásadatcsatorna korlátozásokkal rendelkezik a nem additív sémamódosítások, például az oszlopok átnevezése vagy elvetése, az adattípus módosítása vagy a nullbilitás módosítása után.

Fontos

  • Nem olvasható be a változásadatcsatorna olyan tranzakcióhoz vagy tartományhoz, amelyben nem additív sémamódosítás történik kötegelt szemantikával.
  • A Databricks Runtime 12.2 LTS-ben és az alábbi verziókban az oszlopleképezést engedélyező táblák, amelyek nem additív sémamódosításokat tapasztaltak, nem támogatják a változásadatcsatorna streamelési olvasásait. Lásd: Streamelés oszlopleképezéssel és sémamódosításokkal.
  • A Databricks Runtime 11.3 LTS-ben és az alábbi verziókban nem olvashatja az oszlopleképezést engedélyező táblák adatcsatornájának módosítását, amelyeknél az oszlopok átnevezése vagy elvetése történt.

A Databricks Runtime 12.2 LTS-ben és újabb verziókban kötegelt beolvasásokat végezhet a változásadatcsatornán olyan táblák esetében, amelyek oszlopleképezése engedélyezve van, és amelyek nem additív sémamódosításokat tapasztaltak. A tábla legújabb verziójának sémája helyett az olvasási műveletek a lekérdezésben megadott tábla végverziójának sémáját használják. A lekérdezések továbbra is sikertelenek maradnak, ha a megadott verziótartomány nem additív sémamódosításra terjed ki.

Gyakori kérdések (GYIK)

Mi a változásadatcsatorna engedélyezésének többlettere?

Nincs jelentős hatása. A módosítási adatrekordok a lekérdezés végrehajtása során sorban jönnek létre, és általában sokkal kisebbek, mint az újraírt fájlok teljes mérete.

Mi az adatmegőrzési szabályzat a rekordok módosításához?

A rekordok módosítása ugyanazt az adatmegőrzési szabályzatot követi, mint a elavult táblaverziók, és a rendszer a VÁKUUM használatával törli őket, ha a megadott megőrzési időszakon kívül vannak.

Mikor válnak elérhetővé az új rekordok a változási adatcsatornában?

A változásadatok a Delta Lake-tranzakcióval együtt lesznek véglegesítettek, és az új adatok a táblában való elérhetővé válásával egyidejűleg válnak elérhetővé.

Példa jegyzetfüzetre: Módosítások propagálása Delta-változási adatcsatornával

Ez a jegyzetfüzet bemutatja, hogyan propagálhatja az abszolút számú vakcinációt tartalmazó ezüsttáblán végrehajtott módosításokat a vakcinázási arányok aranytáblájára.

Adatcsatorna-jegyzetfüzet módosítása

Jegyzetfüzet beszerzése