Använda Delta Lake-ändringsdataflöde i Azure Databricks
Kommentar
- I den här artikeln beskrivs hur du registrerar och frågar efter ändringsinformation på radnivå för Delta-tabeller med hjälp av funktionen för ändringsdataflöde. Information om hur du uppdaterar tabeller i en Delta Live Tables-pipeline baserat på ändringar i källdata finns i APPLY CHANGES API: Simplifi change data capture in Delta Live Tables (TILLÄMPA ÄNDRINGAR API: Förenkla insamling av ändringsdata i Delta Live Tables).
Med ändringsdataflöde kan Azure Databricks spåra ändringar på radnivå mellan versioner av en Delta-tabell. När den är aktiverad i en Delta-tabell registrerar körningen ändringshändelser för alla data som skrivits till tabellen. Detta inkluderar raddata tillsammans med metadata som anger om den angivna raden infogades, togs bort eller uppdaterades.
Du kan läsa ändringshändelserna i batchfrågor med Spark SQL, Apache Spark DataFrames och Structured Streaming.
Viktigt!
Ändringsdataflödet fungerar tillsammans med tabellhistoriken för att tillhandahålla ändringsinformation. Eftersom kloning av en Delta-tabell skapar en separat historik matchar inte ändringsdataflödet för klonade tabeller den ursprungliga tabellens.
Användningsfall
Ändringsdataflöde är inte aktiverat som standard. Följande användningsfall bör köras när du aktiverar ändringsdataflödet.
- Silver- och Gold-tabeller: Förbättra Delta Lake-prestanda genom att endast bearbeta ändringar på radnivå efter inledande
MERGE
,UPDATE
ellerDELETE
åtgärder för att påskynda och förenkla ETL- och ELT-åtgärder. - Materialiserade vyer: Skapa uppdaterade, aggregerade vyer av information för användning i BI och analys utan att behöva bearbeta de fullständiga underliggande tabellerna igen, i stället uppdateras endast var ändringarna har kommit.
- Överför ändringar: Skicka ett ändringsdataflöde till underordnade system som Kafka eller RDBMS som kan använda det för att stegvis bearbeta i senare skeden av datapipelines.
- Granskningstabell: Samla in ändringsdataflödet som en Delta-tabell ger evig lagring och effektiv frågefunktion för att se alla ändringar över tid, inklusive när borttagningar inträffar och vilka uppdateringar som har gjorts.
Aktivera ändringsdataflöde
Du måste uttryckligen aktivera alternativet ändra dataflöde med någon av följande metoder:
Ny tabell: Ange tabellegenskapen
CREATE TABLE
delta.enableChangeDataFeed = true
i kommandot .CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
Befintlig tabell: Ange tabellegenskapen
ALTER TABLE
delta.enableChangeDataFeed = true
i kommandot .ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Alla nya tabeller:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Viktigt!
Endast ändringar som görs när du har aktiverat ändringsdataflödet registreras. Tidigare ändringar i en tabell registreras inte.
Ändra datalagring
Azure Databricks registrerar ändringsdata för UPDATE
, DELETE
och MERGE
åtgärder i _change_data
mappen under tabellkatalogen. Vissa åtgärder, till exempel åtgärder med endast infogning och fullständiga partitionsborttagningar, genererar inte data i _change_data
katalogen eftersom Azure Databricks effektivt kan beräkna ändringsdataflödet direkt från transaktionsloggen.
Filerna i _change_data
mappen följer kvarhållningsprincipen för tabellen. Om du kör kommandot VACUUM tas därför även data för ändringsdataflöde bort.
Läsa ändringar i batchfrågor
Du kan ange antingen version eller tidsstämpel för start och slut. Start- och slutversionerna och tidsstämplarna ingår i frågorna. Om du vill läsa ändringarna från en viss startversion till den senaste versionen av tabellen anger du endast startversionen eller tidsstämpeln.
Du anger en version som ett heltal och en tidsstämplar som en sträng i formatet yyyy-MM-dd[ HH:mm:ss[.SSS]]
.
Om du anger en lägre version eller en tidsstämpel som är äldre än en som har registrerat ändringshändelser, dvs. när ändringsdataflödet aktiverades, utlöses ett fel som anger att ändringsdataflödet inte har aktiverats.
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
Python
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
Scala
// version as ints or longs
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// path based tables
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("pathToMyDeltaTable")
Läsa ändringar i strömmande frågor
Python
# providing a starting version
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# providing a starting timestamp
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2021-04-21 05:35:43") \
.load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("myDeltaTable")
Scala
// providing a starting version
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// providing a starting timestamp
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "2021-04-21 05:35:43")
.load("/pathToMyDeltaTable")
// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("myDeltaTable")
Om du vill hämta ändringsdata när du läser tabellen anger du alternativet readChangeFeed
till true
.
Eller startingVersion
startingTimestamp
är valfria och om de inte tillhandahålls returnerar strömmen den senaste ögonblicksbilden av tabellen vid tidpunkten för strömning som en INSERT
och framtida ändringar som ändringsdata.
Alternativ som hastighetsbegränsningar (maxFilesPerTrigger
, maxBytesPerTrigger
) och excludeRegex
stöds även när du läser ändringsdata.
Kommentar
Hastighetsbegränsning kan vara atomiska för andra versioner än den första ögonblicksbildversionen. Det innebär att hela incheckningsversionen kommer att vara frekvensbegränsad eller så returneras hela incheckningen.
Om en användare som standard skickar in en version eller tidsstämpel som överskrider den senaste incheckningen i en tabell utlöses felet timestampGreaterThanLatestCommit
. I Databricks Runtime 11.3 LTS och senare kan ändringsdataflödet hantera versionsfallet för out-of-range om användaren anger följande konfiguration till true
:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Om du anger en startversion som är större än den senaste incheckningen i en tabell eller en starttidsstämpel som är nyare än den senaste incheckningen i en tabell returneras ett tomt läsresultat när föregående konfiguration är aktiverad.
Om du anger en slutversion som är större än den senaste incheckningen i en tabell eller en sluttidsstämpel som är nyare än den senaste incheckningen i en tabell, returneras alla ändringar mellan startversionen och den senaste incheckningen när den föregående konfigurationen är aktiverad i batchläsningsläge.
Vad är schemat för ändringsdataflödet?
När du läser från ändringsdataflödet för en tabell används schemat för den senaste tabellversionen.
Kommentar
De flesta schemaändringar och utvecklingsåtgärder stöds fullt ut. Tabell med kolumnmappning aktiverat stöder inte alla användningsfall och visar olika beteende. Se Ändra dataflödesbegränsningar för tabeller med kolumnmappning aktiverat.
Förutom datakolumnerna från schemat i Delta-tabellen innehåller ändringsdataflödet metadatakolumner som identifierar typen av ändringshändelse:
Kolumnnamn | Typ | Värden |
---|---|---|
_change_type |
String | insert , update_preimage , update_postimage , delete (1) |
_commit_version |
Long | Deltaloggen eller tabellversionen som innehåller ändringen. |
_commit_timestamp |
Tidsstämpel | Tidsstämpeln som var associerad när incheckningen skapades. |
(1)preimage
är värdet före uppdateringen, postimage
är värdet efter uppdateringen.
Kommentar
Du kan inte aktivera ändringsdataflöde i en tabell om schemat innehåller kolumner med samma namn som de tillagda kolumnerna. Byt namn på kolumner i tabellen för att lösa den här konflikten innan du försöker aktivera ändringsdataflöde.
Ändra dataflödesbegränsningar för tabeller med kolumnmappning aktiverat
Med kolumnmappning aktiverat i en Delta-tabell kan du släppa eller byta namn på kolumner i tabellen utan att skriva om datafiler för befintliga data. När kolumnmappningen är aktiverad har ändringsdataflödet begränsningar efter att ha utfört icke-additiva schemaändringar som att byta namn på eller släppa en kolumn, ändra datatyp eller nullabilitetsändringar.
Viktigt!
- Du kan inte läsa ändringsdataflöde för en transaktion eller ett intervall där en icke-additiv schemaändring sker med hjälp av batch-semantik.
- I Databricks Runtime 12.2 LTS och nedan har tabeller med kolumnmappning aktiverats som har upplevt icke-additiva schemaändringar inte stöd för strömmande läsningar i ändringsdataflöde. Mer information finns i Strömning med kolumnmappning och schemaändringar.
- I Databricks Runtime 11.3 LTS och nedan kan du inte läsa ändra dataflöde för tabeller med kolumnmappning aktiverat som har fått kolumnbyte eller borttagning.
I Databricks Runtime 12.2 LTS och senare kan du utföra batchläsningar på ändringsdataflöde för tabeller med kolumnmappning aktiverat som har upplevt icke-additiva schemaändringar. I stället för att använda schemat för den senaste versionen av tabellen använder läsåtgärder schemat för slutversionen av tabellen som anges i frågan. Frågor misslyckas fortfarande om det angivna versionsintervallet sträcker sig över en icke-additiv schemaändring.
Vanliga frågor och svar
Vad är kostnaden för att aktivera ändringsdataflödet?
Det finns ingen betydande inverkan. Ändringsdataposterna genereras i rad under frågekörningsprocessen och är vanligtvis mycket mindre än den totala storleken på omskrivna filer.
Vad är kvarhållningsprincipen för ändringsposter?
Ändringsposter följer samma kvarhållningsprincip som inaktuella tabellversioner och rensas via VACUUM om de ligger utanför den angivna kvarhållningsperioden.
När blir nya poster tillgängliga i ändringsdataflödet?
Ändringsdata checkas in tillsammans med Delta Lake-transaktionen och blir tillgängliga samtidigt som nya data är tillgängliga i tabellen.
Notebook-exempel: Sprida ändringar med deltaändringsdataflöde
Den här notebook-filen visar hur du sprider ändringar som gjorts i en silvertabell med absolut antal vaccinationer till en guldtabell med vaccinationsfrekvenser.