Använda Delta Lake-ändringsdataflöde i Azure Databricks

Kommentar

Med ändringsdataflöde kan Azure Databricks spåra ändringar på radnivå mellan versioner av en Delta-tabell. När den är aktiverad i en Delta-tabell registrerar körningen ändringshändelser för alla data som skrivits till tabellen. Detta inkluderar raddata tillsammans med metadata som anger om den angivna raden infogades, togs bort eller uppdaterades.

Du kan läsa ändringshändelserna i batchfrågor med Spark SQL, Apache Spark DataFrames och Structured Streaming.

Viktigt!

Ändringsdataflödet fungerar tillsammans med tabellhistoriken för att tillhandahålla ändringsinformation. Eftersom kloning av en Delta-tabell skapar en separat historik matchar inte ändringsdataflödet för klonade tabeller den ursprungliga tabellens.

Användningsfall

Ändringsdataflöde är inte aktiverat som standard. Följande användningsfall bör köras när du aktiverar ändringsdataflödet.

  • Silver- och Gold-tabeller: Förbättra Delta Lake-prestanda genom att endast bearbeta ändringar på radnivå efter inledande MERGE, UPDATEeller DELETE åtgärder för att påskynda och förenkla ETL- och ELT-åtgärder.
  • Materialiserade vyer: Skapa uppdaterade, aggregerade vyer av information för användning i BI och analys utan att behöva bearbeta de fullständiga underliggande tabellerna igen, i stället uppdateras endast var ändringarna har kommit.
  • Överför ändringar: Skicka ett ändringsdataflöde till underordnade system som Kafka eller RDBMS som kan använda det för att stegvis bearbeta i senare skeden av datapipelines.
  • Granskningstabell: Samla in ändringsdataflödet som en Delta-tabell ger evig lagring och effektiv frågefunktion för att se alla ändringar över tid, inklusive när borttagningar inträffar och vilka uppdateringar som har gjorts.

Aktivera ändringsdataflöde

Du måste uttryckligen aktivera alternativet ändra dataflöde med någon av följande metoder:

  • Ny tabell: Ange tabellegenskapen CREATE TABLEdelta.enableChangeDataFeed = true i kommandot .

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Befintlig tabell: Ange tabellegenskapen ALTER TABLEdelta.enableChangeDataFeed = true i kommandot .

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Alla nya tabeller:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Viktigt!

Endast ändringar som görs när du har aktiverat ändringsdataflödet registreras. Tidigare ändringar i en tabell registreras inte.

Ändra datalagring

Azure Databricks registrerar ändringsdata för UPDATE, DELETEoch MERGE åtgärder i _change_data mappen under tabellkatalogen. Vissa åtgärder, till exempel åtgärder med endast infogning och fullständiga partitionsborttagningar, genererar inte data i _change_data katalogen eftersom Azure Databricks effektivt kan beräkna ändringsdataflödet direkt från transaktionsloggen.

Filerna i _change_data mappen följer kvarhållningsprincipen för tabellen. Om du kör kommandot VACUUM tas därför även data för ändringsdataflöde bort.

Läsa ändringar i batchfrågor

Du kan ange antingen version eller tidsstämpel för start och slut. Start- och slutversionerna och tidsstämplarna ingår i frågorna. Om du vill läsa ändringarna från en viss startversion till den senaste versionen av tabellen anger du endast startversionen eller tidsstämpeln.

Du anger en version som ett heltal och en tidsstämplar som en sträng i formatet yyyy-MM-dd[ HH:mm:ss[.SSS]].

Om du anger en lägre version eller en tidsstämpel som är äldre än en som har registrerat ändringshändelser, dvs. när ändringsdataflödet aktiverades, utlöses ett fel som anger att ändringsdataflödet inte har aktiverats.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Läsa ändringar i strömmande frågor

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Om du vill hämta ändringsdata när du läser tabellen anger du alternativet readChangeFeed till true. Eller startingVersionstartingTimestamp är valfria och om de inte tillhandahålls returnerar strömmen den senaste ögonblicksbilden av tabellen vid tidpunkten för strömning som en INSERT och framtida ändringar som ändringsdata. Alternativ som hastighetsbegränsningar (maxFilesPerTrigger, maxBytesPerTrigger) och excludeRegex stöds även när du läser ändringsdata.

Kommentar

Hastighetsbegränsning kan vara atomiska för andra versioner än den första ögonblicksbildversionen. Det innebär att hela incheckningsversionen kommer att vara frekvensbegränsad eller så returneras hela incheckningen.

Om en användare som standard skickar in en version eller tidsstämpel som överskrider den senaste incheckningen i en tabell utlöses felet timestampGreaterThanLatestCommit . I Databricks Runtime 11.3 LTS och senare kan ändringsdataflödet hantera versionsfallet för out-of-range om användaren anger följande konfiguration till true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Om du anger en startversion som är större än den senaste incheckningen i en tabell eller en starttidsstämpel som är nyare än den senaste incheckningen i en tabell returneras ett tomt läsresultat när föregående konfiguration är aktiverad.

Om du anger en slutversion som är större än den senaste incheckningen i en tabell eller en sluttidsstämpel som är nyare än den senaste incheckningen i en tabell, returneras alla ändringar mellan startversionen och den senaste incheckningen när den föregående konfigurationen är aktiverad i batchläsningsläge.

Vad är schemat för ändringsdataflödet?

När du läser från ändringsdataflödet för en tabell används schemat för den senaste tabellversionen.

Kommentar

De flesta schemaändringar och utvecklingsåtgärder stöds fullt ut. Tabell med kolumnmappning aktiverat stöder inte alla användningsfall och visar olika beteende. Se Ändra dataflödesbegränsningar för tabeller med kolumnmappning aktiverat.

Förutom datakolumnerna från schemat i Delta-tabellen innehåller ändringsdataflödet metadatakolumner som identifierar typen av ändringshändelse:

Kolumnnamn Typ Värden
_change_type String insert, update_preimage , update_postimage, delete(1)
_commit_version Long Deltaloggen eller tabellversionen som innehåller ändringen.
_commit_timestamp Tidsstämpel Tidsstämpeln som var associerad när incheckningen skapades.

(1)preimage är värdet före uppdateringen, postimage är värdet efter uppdateringen.

Kommentar

Du kan inte aktivera ändringsdataflöde i en tabell om schemat innehåller kolumner med samma namn som de tillagda kolumnerna. Byt namn på kolumner i tabellen för att lösa den här konflikten innan du försöker aktivera ändringsdataflöde.

Ändra dataflödesbegränsningar för tabeller med kolumnmappning aktiverat

Med kolumnmappning aktiverat i en Delta-tabell kan du släppa eller byta namn på kolumner i tabellen utan att skriva om datafiler för befintliga data. När kolumnmappningen är aktiverad har ändringsdataflödet begränsningar efter att ha utfört icke-additiva schemaändringar som att byta namn på eller släppa en kolumn, ändra datatyp eller nullabilitetsändringar.

Viktigt!

  • Du kan inte läsa ändringsdataflöde för en transaktion eller ett intervall där en icke-additiv schemaändring sker med hjälp av batch-semantik.
  • I Databricks Runtime 12.2 LTS och nedan har tabeller med kolumnmappning aktiverats som har upplevt icke-additiva schemaändringar inte stöd för strömmande läsningar i ändringsdataflöde. Mer information finns i Strömning med kolumnmappning och schemaändringar.
  • I Databricks Runtime 11.3 LTS och nedan kan du inte läsa ändra dataflöde för tabeller med kolumnmappning aktiverat som har fått kolumnbyte eller borttagning.

I Databricks Runtime 12.2 LTS och senare kan du utföra batchläsningar på ändringsdataflöde för tabeller med kolumnmappning aktiverat som har upplevt icke-additiva schemaändringar. I stället för att använda schemat för den senaste versionen av tabellen använder läsåtgärder schemat för slutversionen av tabellen som anges i frågan. Frågor misslyckas fortfarande om det angivna versionsintervallet sträcker sig över en icke-additiv schemaändring.

Vanliga frågor och svar

Vad är kostnaden för att aktivera ändringsdataflödet?

Det finns ingen betydande inverkan. Ändringsdataposterna genereras i rad under frågekörningsprocessen och är vanligtvis mycket mindre än den totala storleken på omskrivna filer.

Vad är kvarhållningsprincipen för ändringsposter?

Ändringsposter följer samma kvarhållningsprincip som inaktuella tabellversioner och rensas via VACUUM om de ligger utanför den angivna kvarhållningsperioden.

När blir nya poster tillgängliga i ändringsdataflödet?

Ändringsdata checkas in tillsammans med Delta Lake-transaktionen och blir tillgängliga samtidigt som nya data är tillgängliga i tabellen.

Notebook-exempel: Sprida ändringar med deltaändringsdataflöde

Den här notebook-filen visar hur du sprider ändringar som gjorts i en silvertabell med absolut antal vaccinationer till en guldtabell med vaccinationsfrekvenser.

Ändra notebook-fil för dataflöde

Hämta notebook-fil