Werken met delta lake-tabelgeschiedenis

Elke bewerking waarmee een Delta Lake-tabel wordt gewijzigd, maakt een nieuwe tabelversie. U kunt geschiedenisgegevens gebruiken om bewerkingen te controleren, een tabel terug te draaien of een tabel op een bepaald tijdstip op te vragen met behulp van tijdreizen.

Notitie

Databricks raadt het gebruik van de tabelgeschiedenis van Delta Lake niet aan als langetermijnoplossing voor de back-up van gegevensopslag. Databricks raadt aan om alleen de afgelopen 7 dagen te gebruiken voor time travel, tenzij u configuraties voor gegevens- en logboekretentie hebt ingesteld op een hogere waarde.

Delta-tabelgeschiedenis ophalen

U kunt informatie ophalen, waaronder bewerkingen, gebruikers en tijdstempels voor elke schrijfbewerking naar een Delta-tabel door de history opdracht uit te voeren. De bewerkingen worden geretourneerd in omgekeerde chronologische volgorde.

Retentie van tabelgeschiedenis wordt bepaald door de tabelinstelling delta.logRetentionDuration, die standaard 30 dagen is.

Notitie

Time travel en tabelgeschiedenis worden bepaald door verschillende retentiedrempels. Raadpleeg Wat is time travel in Delta Lake?

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Zie DESCRIBE HISTORY voor meer informatie over de Spark SQL-syntaxis.

Zie de Documentatie voor de Delta Lake-API voor de syntaxis van Scala/Java/Python.

Catalog Explorer biedt een visuele weergave van deze gedetailleerde tabelinformatie en geschiedenis voor Delta-tabellen. Naast het tabelschema en de voorbeeldgegevens kunt u op het tabblad Geschiedenis klikken om de tabelgeschiedenis weer te geven met DESCRIBE HISTORY.

Geschiedenisschema

De uitvoer van de history bewerking heeft de volgende kolommen.

Column Type Description
version long Tabelversie gegenereerd door de bewerking.
timestamp timestamp Wanneer deze versie is doorgevoerd.
userId tekenreeks Id van de gebruiker die de bewerking heeft uitgevoerd.
gebruikersnaam tekenreeks Naam van de gebruiker die de bewerking heeft uitgevoerd.
schakelapparatuur optimaliseren tekenreeks Naam van de bewerking.
operationParameters map Parameters van de bewerking (bijvoorbeeld predicaten.)
taak Struct Details van de taak die de bewerking heeft uitgevoerd.
notebook Struct Details van notebook waaruit de bewerking is uitgevoerd.
clusterId tekenreeks Id van het cluster waarop de bewerking is uitgevoerd.
readVersion long Versie van de tabel die is gelezen om de schrijfbewerking uit te voeren.
isolationLevel tekenreeks Isolatieniveau dat wordt gebruikt voor deze bewerking.
isBlindAppend boolean Of aan deze bewerking gegevens zijn toegevoegd.
operationMetrics map Metrische gegevens van de bewerking (bijvoorbeeld het aantal rijen en bestanden dat is gewijzigd.)
userMetadata tekenreeks Door de gebruiker gedefinieerde doorvoermetagegevens als deze is opgegeven
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Notitie

Sleutels voor metrische gegevens voor bewerking

De history bewerking retourneert een verzameling metrische gegevens voor bewerkingen in de operationMetrics kolomtoewijzing.

In de volgende tabellen worden de toewijzingssleuteldefinities per bewerking weergegeven.

Operation Naam van meetwaarde Beschrijving
SCHRIJVEN, TABEL MAKEN ALS SELECTEREN, TABEL VERVANGEN ALS SELECTEREN, KOPIËREN NAAR
numFiles Aantal geschreven bestanden.
numOutputBytes Grootte in bytes van de geschreven inhoud.
numOutputRows Aantal rijen dat is geschreven.
STREAMINGUPDATE
numAddedFiles Aantal toegevoegde bestanden.
numRemovedFiles Aantal bestanden verwijderd.
numOutputRows Aantal rijen dat is geschreven.
numOutputBytes Grootte van schrijfbewerkingen in bytes.
DELETE
numAddedFiles Aantal toegevoegde bestanden. Niet opgegeven wanneer partities van de tabel worden verwijderd.
numRemovedFiles Aantal bestanden verwijderd.
numDeletedRows Aantal rijen verwijderd. Niet opgegeven wanneer partities van de tabel worden verwijderd.
numCopiedRows Het aantal rijen dat is gekopieerd tijdens het verwijderen van bestanden.
executionTimeMs De tijd die nodig is om de hele bewerking uit te voeren.
scanTimeMs De tijd die nodig is om de bestanden te scannen op overeenkomsten.
rewriteTimeMs De tijd die nodig is om de overeenkomende bestanden te herschrijven.
TRUNCATE
numRemovedFiles Aantal bestanden verwijderd.
executionTimeMs De tijd die nodig is om de hele bewerking uit te voeren.
SAMENVOEGEN
numSourceRows Het aantal rijen in het dataframe van de bron.
numTargetRowsInserted Het aantal rijen dat is ingevoegd in de doeltabel.
numTargetRowsUpdated Het aantal rijen dat is bijgewerkt in de doeltabel.
numTargetRowsDeleted Het aantal rijen dat in de doeltabel is verwijderd.
numTargetRowsCopied Aantal gekopieerde doelrijen.
numOutputRows Totaal aantal rijen dat is geschreven.
numTargetFilesAdded Het aantal bestanden dat is toegevoegd aan de sink(doel).
numTargetFilesRemoved Aantal bestanden verwijderd uit de sink(doel).
executionTimeMs De tijd die nodig is om de hele bewerking uit te voeren.
scanTimeMs De tijd die nodig is om de bestanden te scannen op overeenkomsten.
rewriteTimeMs De tijd die nodig is om de overeenkomende bestanden te herschrijven.
UPDATE
numAddedFiles Aantal toegevoegde bestanden.
numRemovedFiles Aantal bestanden verwijderd.
numUpdatedRows Aantal rijen bijgewerkt.
numCopiedRows Het aantal rijen dat zojuist is gekopieerd tijdens het bijwerken van bestanden.
executionTimeMs De tijd die nodig is om de hele bewerking uit te voeren.
scanTimeMs De tijd die nodig is om de bestanden te scannen op overeenkomsten.
rewriteTimeMs De tijd die nodig is om de overeenkomende bestanden te herschrijven.
FSCK numRemovedFiles Aantal bestanden verwijderd.
CONVERTEREN numConvertedFiles Aantal Parquet-bestanden dat is geconverteerd.
OPTIMIZE
numAddedFiles Aantal toegevoegde bestanden.
numRemovedFiles Het aantal bestanden dat is geoptimaliseerd.
numAddedBytes Het aantal bytes dat is toegevoegd nadat de tabel is geoptimaliseerd.
numRemovedBytes Aantal verwijderde bytes.
minFileSize De grootte van het kleinste bestand nadat de tabel is geoptimaliseerd.
p25FileSize De grootte van het 25e percentielbestand nadat de tabel is geoptimaliseerd.
p50FileSize Mediaanbestandsgrootte nadat de tabel is geoptimaliseerd.
p75FileSize De grootte van het 75e percentielbestand nadat de tabel is geoptimaliseerd.
maxFileSize De grootte van het grootste bestand nadat de tabel is geoptimaliseerd.
CLONE
sourceTableSize Grootte in bytes van de brontabel op de versie die is gekloond.
sourceNumOfFiles Het aantal bestanden in de brontabel op de versie die is gekloond.
numRemovedFiles Het aantal bestanden dat uit de doeltabel is verwijderd als een vorige Delta-tabel is vervangen.
removedFilesSize Totale grootte in bytes van de bestanden die uit de doeltabel zijn verwijderd als een vorige Delta-tabel is vervangen.
numCopiedFiles Het aantal bestanden dat naar de nieuwe locatie is gekopieerd. 0 voor ondiepe klonen.
copiedFilesSize Totale grootte in bytes van de bestanden die zijn gekopieerd naar de nieuwe locatie. 0 voor ondiepe klonen.
HERSTELLEN
tableSizeAfterRestore Tabelgrootte in bytes na herstel.
numOfFilesAfterRestore Aantal bestanden in de tabel na herstel.
numRemovedFiles Het aantal bestanden dat is verwijderd door de herstelbewerking.
numRestoredFiles Aantal bestanden dat is toegevoegd als gevolg van de herstelbewerking.
removedFilesSize Grootte in bytes aan bestanden die zijn verwijderd door de herstelbewerking.
restoredFilesSize Grootte in bytes aan bestanden die zijn toegevoegd door de herstelbewerking.
VACUUM
numDeletedFiles Aantal verwijderde bestanden.
numVacuumedDirectories Aantal gevacueerde mappen.
numFilesToDelete Aantal te verwijderen bestanden.

Wat is Delta Lake time travel?

Delta Lake time travel ondersteunt het uitvoeren van query's op eerdere tabelversies op basis van tijdstempel of tabelversie (zoals vastgelegd in het transactielogboek). U kunt tijdreizen gebruiken voor toepassingen zoals:

  • Analyses, rapporten of uitvoer opnieuw maken (bijvoorbeeld de uitvoer van een machine learning-model). Dit kan nuttig zijn voor foutopsporing of controle, met name in gereguleerde branches.
  • Complexe tijdelijke query's schrijven.
  • Fouten in uw gegevens corrigeren.
  • Het bieden van isolatie van momentopnamen voor een set query's voor snel veranderende tabellen.

Belangrijk

Tabelversies die toegankelijk zijn met tijdreizen, worden bepaald door een combinatie van de bewaardrempel voor transactielogboekbestanden en de frequentie en opgegeven retentie voor VACUUM bewerkingen. Als u dagelijks met de standaardwaarden uitvoert VACUUM , zijn zeven dagen aan gegevens beschikbaar voor tijdreizen.

Delta-syntaxis voor time travel

U kunt een query uitvoeren op een Delta-tabel met tijdreizen door een component toe te voegen na de tabelnaamspecificatie.

  • timestamp_expression kan een van de volgende zijn:
    • '2018-10-18T22:15:12.013Z', dat wil gezegd, een tekenreeks die kan worden gecast naar een tijdstempel
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', dat wil gezegd, een datumtekenreeks
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Elke andere expressie die wel of niet kan worden omgezet in een tijdstempel
  • version is een lange waarde die kan worden verkregen uit de uitvoer van DESCRIBE HISTORY table_spec.

version Geen van beide timestamp_expression subquery's.

Alleen datum- of tijdstempeltekenreeksen worden geaccepteerd. Bijvoorbeeld "2019-01-01" en "2019-01-01T00:00:00.000Z". Zie de volgende code voor voorbeeldsyntaxis:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

U kunt de @ syntaxis ook gebruiken om de tijdstempel of versie op te geven als onderdeel van de tabelnaam. De tijdstempel moet een yyyyMMddHHmmssSSS indeling hebben. U kunt een versie opgeven nadat @ u een v versie wilt toevoegen aan de versie. Zie de volgende code voor voorbeeldsyntaxis:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

Wat zijn controlepunten voor transactielogboeken?

Delta Lake registreert tabelversies als JSON-bestanden in de _delta_log map, die naast tabelgegevens worden opgeslagen. Om controlepuntquery's te optimaliseren, aggregeert Delta Lake tabelversies naar Parquet-controlepuntbestanden, waardoor alle JSON-versies van de tabelgeschiedenis niet hoeven te worden gelezen. Azure Databricks optimaliseert de frequentie van controlepunten voor gegevensgrootte en workload. Gebruikers hoeven niet rechtstreeks met controlepunten te communiceren. De controlepuntfrequentie kan zonder kennisgeving worden gewijzigd.

Gegevensretentie configureren voor query's voor tijdreizen

Als u een query wilt uitvoeren op een eerdere tabelversie, moet u zowel het logboek als de gegevensbestanden voor die versie behouden.

Gegevensbestanden worden verwijderd wanneer VACUUM ze worden uitgevoerd op een tabel. Delta Lake beheert het automatisch verwijderen van logboekbestanden na de versies van de controlepuntentabel.

Omdat de meeste Delta-tabellen regelmatig op deze tabellen worden VACUUM uitgevoerd, moeten point-in-time-query's de retentiedrempel VACUUMrespecteren, wat standaard 7 dagen is.

Als u de drempelwaarde voor gegevensretentie voor Delta-tabellen wilt verhogen, moet u de volgende tabeleigenschappen configureren:

  • delta.logRetentionDuration = "interval <interval>": bepaalt hoe lang de geschiedenis voor een tabel wordt bewaard. De standaardwaarde is interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": bepaalt de drempelwaarde VACUUM die wordt gebruikt om gegevensbestanden te verwijderen waarnaar niet meer wordt verwezen in de huidige tabelversie. De standaardwaarde is interval 7 days.

U kunt Delta-eigenschappen opgeven tijdens het maken van een tabel of deze instellen met een ALTER TABLE instructie. Zie naslaginformatie over eigenschappen van Delta-tabellen.

Notitie

U moet beide eigenschappen instellen om ervoor te zorgen dat de tabelgeschiedenis langer wordt bewaard voor tabellen met frequente VACUUM bewerkingen. Bijvoorbeeld voor toegang tot 30 dagen historische gegevens, ingesteld delta.deletedFileRetentionDuration = "interval 30 days" (die overeenkomt met de standaardinstelling voor delta.logRetentionDuration).

Het verhogen van de drempelwaarde voor gegevensretentie kan ertoe leiden dat uw opslagkosten stijgen, omdat er meer gegevensbestanden worden onderhouden.

Een Delta-tabel herstellen naar een eerdere status

U kunt een Delta-tabel herstellen naar de eerdere status met behulp van de RESTORE opdracht. Een Delta-tabel onderhoudt intern historische versies van de tabel, zodat deze kan worden hersteld naar een eerdere status. Een versie die overeenkomt met de eerdere status of een timestamp van wanneer de eerdere status is gemaakt, worden ondersteund als opties door de RESTORE-opdracht .

Belangrijk

  • U kunt een al herstelde tabel herstellen.
  • U kunt een gekloonde tabel herstellen.
  • U moet gemachtigd zijn MODIFY voor de tabel die wordt hersteld.
  • U kunt een tabel niet herstellen naar een oudere versie waarin de gegevensbestanden handmatig of door vacuumzijn verwijderd. Herstellen naar deze versie is gedeeltelijk nog steeds mogelijk als spark.sql.files.ignoreMissingFiles deze is ingesteld op true.
  • De tijdstempelindeling voor het herstellen naar een eerdere status is yyyy-MM-dd HH:mm:ss. Het verstrekken van alleen een tekenreeks voor datum(yyyy-MM-dd) wordt ook ondersteund.
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Zie RESTORE voor meer informatie over de syntaxis.

Belangrijk

Herstellen wordt beschouwd als een bewerking voor het wijzigen van gegevens. Delta Lake-logboekvermeldingen die door de RESTORE opdracht zijn toegevoegd, bevatten dataChange ingesteld op true. Als er een downstreamtoepassing is, zoals een gestructureerde streamingtaak die de updates naar een Delta Lake-tabel verwerkt, worden de vermeldingen voor gegevenswijzigingslogboeken die door de herstelbewerking zijn toegevoegd, beschouwd als nieuwe gegevensupdates en kunnen deze verwerken tot dubbele gegevens.

Voorbeeld:

Tabelversie Operation Delta-logboekupdates Records in wijzigingenlogboekupdates voor gegevens
0 INSERT AddFile(/path/to/file-1, dataChange = true) (naam = Victor, leeftijd = 29, (naam = George, leeftijd = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (naam = George, leeftijd = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Geen records omdat de compressie optimaliseren de gegevens in de tabel niet wijzigt)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (naam = Victor, leeftijd = 29), (naam = George, leeftijd = 55), (naam = George, leeftijd = 39)

In het voorgaande voorbeeld resulteert de RESTORE opdracht in updates die al zijn gezien bij het lezen van versie 0 en 1 van de Delta-tabel. Als een streamingquery deze tabel heeft gelezen, worden deze bestanden beschouwd als nieuw toegevoegde gegevens en worden deze opnieuw verwerkt.

Metrische gegevens herstellen

RESTORE rapporteert de volgende metrische gegevens als één rij DataFrame zodra de bewerking is voltooid:

  • table_size_after_restore: De grootte van de tabel na het herstellen.

  • num_of_files_after_restore: Het aantal bestanden in de tabel na het herstellen.

  • num_removed_files: Het aantal bestanden dat uit de tabel is verwijderd (logisch verwijderd).

  • num_restored_files: Het aantal bestanden dat is hersteld vanwege terugdraaien.

  • removed_files_size: Totale grootte in bytes van de bestanden die uit de tabel worden verwijderd.

  • restored_files_size: Totale grootte in bytes van de bestanden die worden hersteld.

    Voorbeeld van herstel van metrische gegevens

Voorbeelden van het gebruik van time travel in Delta Lake

  • Onopzettelijke verwijderingen in een tabel voor de gebruiker 111herstellen:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Onopzettelijke onjuiste updates voor een tabel oplossen:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Voer een query uit op het aantal nieuwe klanten dat de afgelopen week is toegevoegd.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Hoe kan ik de versie van de laatste doorvoering zoeken in de Spark-sessie?

Als u het versienummer van de laatste doorvoering wilt ophalen die door de huidige SparkSession is geschreven in alle threads en alle tabellen, voert u een query uit op de SQL-configuratie spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Als er geen doorvoeringen zijn uitgevoerd door de SparkSessionsleutel, retourneert een query op de sleutel een lege waarde.

Notitie

Als u hetzelfde SparkSession deelt over meerdere threads, is het vergelijkbaar met het delen van een variabele over meerdere threads. U kunt racevoorwaarden tegenkomen wanneer de configuratiewaarde gelijktijdig wordt bijgewerkt.