Delta Lake-wijzigingenfeed gebruiken in Azure Databricks

Notitie

Met de gegevensfeed voor wijzigingen kan Azure Databricks wijzigingen op rijniveau bijhouden tussen versies van een Delta-tabel. Wanneer deze optie is ingeschakeld voor een Delta-tabel, worden in de runtime gebeurtenissen vastgelegd voor alle gegevens die in de tabel zijn geschreven. Dit omvat de rijgegevens, samen met metagegevens die aangeven of de opgegeven rij is ingevoegd, verwijderd of bijgewerkt.

U kunt de wijzigingsgebeurtenissen in batchquery's lezen met behulp van Spark SQL, Apache Spark DataFrames en Structured Streaming.

Belangrijk

Wijzigingen in de gegevensfeed werken samen met de tabelgeschiedenis om wijzigingsinformatie te bieden. Omdat het klonen van een Delta-tabel een afzonderlijke geschiedenis maakt, komt de wijzigingsgegevensfeed op gekloonde tabellen niet overeen met die van de oorspronkelijke tabel.

Gebruiksgevallen

Wijzigingenfeed is niet standaard ingeschakeld. De volgende use cases moeten worden aangedreven wanneer u de wijzigingengegevensfeed inschakelt.

  • Silver- en Gold-tabellen: Verbeter de prestaties van Delta Lake door alleen wijzigingen op rijniveau te verwerken na de eerste MERGE, UPDATEof DELETE bewerkingen om ETL- en ELT-bewerkingen te versnellen en te vereenvoudigen.
  • Gerealiseerde weergaven: Maak up-to-date, geaggregeerde weergaven van informatie voor gebruik in BI en analyses zonder dat u de volledige onderliggende tabellen opnieuw hoeft te verwerken, in plaats daarvan alleen bij te werken waar wijzigingen zijn doorgevoerd.
  • Wijzigingen verzenden: verzend een wijzigingengegevensfeed naar downstreamsystemen zoals Kafka of RDBMS die deze kunnen gebruiken om incrementeel te verwerken in latere fasen van gegevenspijplijnen.
  • Audittrailtabel: Leg de wijzigingengegevensfeed vast als een Delta-tabel en biedt permanente opslag en efficiënte querymogelijkheden om alle wijzigingen in de loop van de tijd te bekijken, inclusief wanneer verwijderingen plaatsvinden en welke updates zijn aangebracht.

Wijzigingenfeed inschakelen

U moet de optie voor de wijzigingengegevensfeed expliciet inschakelen met behulp van een van de volgende methoden:

  • Nieuwe tabel: Stel de tabeleigenschap delta.enableChangeDataFeed = true in de CREATE TABLE opdracht in.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Bestaande tabel: Stel de tabeleigenschap delta.enableChangeDataFeed = true in de ALTER TABLE opdracht in.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Alle nieuwe tabellen:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Belangrijk

Alleen wijzigingen die zijn aangebracht nadat u de wijzigingenfeed hebt ingeschakeld, worden vastgelegd; eerdere wijzigingen in een tabel worden niet vastgelegd.

Gegevensopslag wijzigen

Azure Databricks-records wijzigen gegevens voor UPDATEen DELETEMERGE bewerkingen in de _change_data map onder de tabelmap. Sommige bewerkingen, zoals bewerkingen die alleen invoegbewerkingen en volledige partities worden verwijderd, genereren geen gegevens in de _change_data directory, omdat Azure Databricks de wijzigingsgegevensfeed efficiënt kan berekenen vanuit het transactielogboek.

De bestanden in de _change_data map volgen het bewaarbeleid van de tabel. Als u de opdracht VACUUM uitvoert, worden gegevensfeedgegevens daarom ook verwijderd.

Wijzigingen in batchquery's lezen

U kunt versie of tijdstempel opgeven voor het begin en einde. De begin- en eindversies en tijdstempels zijn inclusief in de query's. Als u de wijzigingen van een bepaalde beginversie naar de nieuwste versie van de tabel wilt lezen, geeft u alleen de beginversie of tijdstempel op.

U geeft een versie op als een geheel getal en een tijdstempel als een tekenreeks in de notatie yyyy-MM-dd[ HH:mm:ss[.SSS]].

Als u een lagere versie of tijdstempel opgeeft die ouder is dan een versie die wijzigingengebeurtenissen heeft geregistreerd( dat wil zeggen wanneer de wijzigingenfeed is ingeschakeld), wordt er een fout gegenereerd die aangeeft dat de wijzigingenfeed niet is ingeschakeld.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Wijzigingen lezen in streamingquery's

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Als u de wijzigingsgegevens wilt ophalen tijdens het lezen van de tabel, stelt u de optie readChangeFeed in op true. De startingVersion of startingTimestamp zijn optioneel en indien niet opgegeven, retourneert de stream de meest recente momentopname van de tabel op het moment van streamen als een INSERT en toekomstige wijzigingen als wijzigingsgegevens. Opties zoals frequentielimieten (maxFilesPerTrigger, maxBytesPerTrigger) en excludeRegex worden ook ondersteund bij het lezen van wijzigingsgegevens.

Notitie

Snelheidsbeperking kan atomisch zijn voor andere versies dan de versie van de beginmomentopname. Dat wil gezegd, de volledige doorvoerversie is beperkt of de volledige doorvoering wordt geretourneerd.

Als een gebruiker standaard een versie of tijdstempel doorgeeft die de laatste doorvoering in een tabel overschrijdt, wordt de fout timestampGreaterThanLatestCommit gegenereerd. In Databricks Runtime 11.3 LTS en hoger kan de gegevensfeed de case van de buiten bereikversie verwerken als de gebruiker de volgende configuratie instelt op true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Als u een beginversie opgeeft die groter is dan de laatste doorvoering in een tabel of een nieuwere begintijdstempel dan de laatste doorvoering in een tabel, wordt er een leeg leesresultaat geretourneerd wanneer de voorgaande configuratie is ingeschakeld.

Als u een eindversie opgeeft die groter is dan de laatste doorvoering in een tabel of een nieuwere eindtijdstempel dan de laatste doorvoering in een tabel, worden alle wijzigingen tussen de beginversie en de laatste doorvoering geretourneerd wanneer de voorgaande configuratie is ingeschakeld in de batchleesmodus.

Wat is het schema voor de wijzigingengegevensfeed?

Wanneer u uit de wijzigingengegevensfeed voor een tabel leest, wordt het schema voor de nieuwste tabelversie gebruikt.

Notitie

De meeste schemawijzigings- en evolutiebewerkingen worden volledig ondersteund. Tabel waarvoor kolomtoewijzing is ingeschakeld, bieden geen ondersteuning voor alle use cases en demonstreren verschillend gedrag. Zie Beperkingen voor gegevensfeeds wijzigen voor tabellen waarvoor kolomtoewijzing is ingeschakeld.

Naast de gegevenskolommen uit het schema van de Delta-tabel bevat wijzigingenfeed metagegevenskolommen waarmee het type wijzigingsevenement wordt geïdentificeerd:

Kolomnaam Type Waarden
_change_type String insert, , update_postimage, deleteupdate_preimage(1)
_commit_version Lang Het Delta-logboek of de tabelversie met de wijziging.
_commit_timestamp Tijdstempel Het tijdstempel dat is gekoppeld aan het moment dat de doorvoering is gemaakt.

(1)preimage is de waarde vóór de update, postimage is de waarde na de update.

Notitie

U kunt de gegevensfeed voor een tabel niet inschakelen als het schema kolommen bevat met dezelfde namen als deze toegevoegde kolommen. Wijzig de naam van kolommen in de tabel om dit conflict op te lossen voordat u de wijzigingenfeed probeert in te schakelen.

Beperkingen voor gegevensfeeds wijzigen voor tabellen waarvoor kolomtoewijzing is ingeschakeld

Als kolomtoewijzing is ingeschakeld voor een Delta-tabel, kunt u kolommen in de tabel verwijderen of de naam ervan wijzigen zonder dat u gegevensbestanden voor bestaande gegevens hoeft te herschrijven. Als kolomtoewijzing is ingeschakeld, heeft wijzigingsgegevensfeed beperkingen na het uitvoeren van niet-additieve schemawijzigingen, zoals het wijzigen van de naam of het verwijderen van een kolom, het wijzigen van het gegevenstype of wijzigingen in null-functionaliteit.

Belangrijk

  • U kunt de wijzigingenfeed niet lezen voor een transactie of bereik waarin een niet-additieve schemawijziging plaatsvindt met behulp van batch-semantiek.
  • In Databricks Runtime 12.2 LTS en lager bieden tabellen met kolomtoewijzing ingeschakeld waarvoor niet-additieve schemawijzigingen zijn ingeschakeld, geen ondersteuning voor streaming-leesbewerkingen voor wijzigingengegevensfeeds. Zie Streaming met kolomtoewijzing en schemawijzigingen.
  • In Databricks Runtime 11.3 LTS en hieronder kunt u de gegevensfeed voor tabellen waarvoor kolomtoewijzing is ingeschakeld, niet lezen waarvoor de naam van kolommen is gewijzigd of verwijderd.

In Databricks Runtime 12.2 LTS en hoger kunt u batchleesbewerkingen uitvoeren op wijzigingengegevensfeeds voor tabellen waarvoor kolomtoewijzing is ingeschakeld waarvoor niet-additieve schemawijzigingen zijn aangebracht. In plaats van het schema van de nieuwste versie van de tabel te gebruiken, gebruiken leesbewerkingen het schema van de eindversie van de tabel die is opgegeven in de query. Query's mislukken nog steeds als het opgegeven versiebereik een niet-additieve schemawijziging omvat.

Veelgestelde vragen

Wat is de overhead voor het inschakelen van de wijzigingengegevensfeed?

Er is geen aanzienlijke impact. De wijzigingsgegevensrecords worden in lijn gegenereerd tijdens het uitvoeringsproces van de query en zijn over het algemeen veel kleiner dan de totale grootte van herschreven bestanden.

Wat is het bewaarbeleid voor wijzigingsrecords?

Wijzigingenrecords volgen hetzelfde bewaarbeleid als verouderde tabelversies en worden opgeschoond via VACUUM als ze buiten de opgegeven bewaarperiode vallen.

Wanneer worden nieuwe records beschikbaar in de wijzigingengegevensfeed?

Wijzigingsgegevens worden samen met de Delta Lake-transactie doorgevoerd en worden beschikbaar op hetzelfde moment als de nieuwe gegevens beschikbaar zijn in de tabel.

Voorbeeld van notitieblok: Wijzigingen doorgeven met Delta-wijzigingenfeed

In dit notitieblok ziet u hoe u wijzigingen in een zilveren tabel met absolute aantallen vaccinaties doorgeeft aan een gouden tabel met vaccinatiepercentages.

Notitieblok voor gegevensfeed wijzigen

Notebook downloaden