Upsert in een Delta Lake-tabel met behulp van samenvoegen

U kunt gegevens uit een brontabel, weergave of DataFrame upsert in een delta-doeltabel met behulp van de MERGE SQL-bewerking. Delta Lake biedt ondersteuning voor invoegingen, updates en verwijderingen en biedt ondersteuning voor uitgebreide syntaxis buiten de SQL-standaarden om geavanceerde gebruiksvoorbeelden MERGEmogelijk te maken.

Stel dat u een brontabel hebt met de naam people10mupdates of een bronpad waarop nieuwe gegevens voor een doeltabel met de naam people10m of het doelpad /tmp/delta/people-10m-updates staan/tmp/delta/people-10m. Sommige van deze nieuwe records zijn mogelijk al aanwezig in de doelgegevens. Als u de nieuwe gegevens wilt samenvoegen, wilt u rijen bijwerken waar de persoon id al aanwezig is en de nieuwe rijen invoegen waar geen overeenkomende id items aanwezig zijn. U kunt de volgende query uitvoeren:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Zie de Documentatie voor de Delta Lake-API voor de syntaxis van Scala en Python. Zie MERGE INTO voor meer informatie over DE SQL-syntaxis

Alle niet-overeenkomende rijen wijzigen met behulp van samenvoegen

In Databricks SQL en Databricks Runtime 12.2 LTS en hoger kunt u de WHEN NOT MATCHED BY SOURCE component gebruiken voor UPDATE of DELETE records in de doeltabel zonder bijbehorende records in de brontabel. Databricks raadt aan een optionele voorwaardelijke component toe te voegen om te voorkomen dat de doeltabel volledig wordt herschreven.

In het volgende codevoorbeeld ziet u de basissyntaxis van het gebruik hiervan voor verwijderingen, waarbij de doeltabel wordt overschreven met de inhoud van de brontabel en niet-overeenkomende records in de doeltabel worden verwijderd. Zie Incrementeel Delta-tabel synchroniseren met bron voor een meer schaalbaar patroon voor tabellen waarin bronupdates en verwijderingen tijdsgebonden zijn.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

In het volgende voorbeeld worden voorwaarden aan de WHEN NOT MATCHED BY SOURCE component toegevoegd en worden waarden opgegeven die moeten worden bijgewerkt in niet-overeenkomende doelrijen.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Semantiek van bewerking samenvoegen

Hier volgt een gedetailleerde beschrijving van de semantiek van de merge programmatische bewerking.

  • Er kan een willekeurig aantal whenMatched componenten zijn whenNotMatched .

  • whenMatched componenten worden uitgevoerd wanneer een bronrij overeenkomt met een doeltabelrij op basis van de voorwaarde van de overeenkomst. Deze componenten hebben de volgende semantiek.

    • whenMatched componenten kunnen maximaal één update en één delete actie hebben. Met update de actie worden merge alleen de opgegeven kolommen (vergelijkbaar met de updatebewerking) van de overeenkomende doelrij bijgewerkt. Met delete de actie wordt de overeenkomende rij verwijderd.

    • Elke whenMatched component kan een optionele voorwaarde hebben. Als deze componentvoorwaarde bestaat, wordt de update of delete actie alleen uitgevoerd voor een overeenkomend brondoelrijpaar wanneer aan de componentvoorwaarde wordt voldaan.

    • Als er meerdere whenMatched componenten zijn, worden ze geëvalueerd in de volgorde waarin ze zijn opgegeven. Alle whenMatched componenten, met uitzondering van de laatste, moeten voorwaarden hebben.

    • Als geen van de whenMatched voorwaarden waar is voor een bron- en doelrijpaar dat overeenkomt met de samenvoegvoorwaarde, blijft de doelrij ongewijzigd.

    • Als u alle kolommen van de doel-Delta-tabel wilt bijwerken met de bijbehorende kolommen van de brongegevensset, gebruikt u whenMatched(...).updateAll(). Dit komt overeen met:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      voor alle kolommen van de doel-Delta-tabel. Daarom gaat deze actie ervan uit dat de brontabel dezelfde kolommen heeft als die in de doeltabel, anders genereert de query een analysefout.

      Notitie

      Dit gedrag verandert wanneer automatische schemamigratie is ingeschakeld. Zie de automatische schemaontwikkeling voor meer informatie.

  • whenNotMatched componenten worden uitgevoerd wanneer een bronrij niet overeenkomt met een doelrij op basis van de overeenkomstvoorwaarde. Deze componenten hebben de volgende semantiek.

    • whenNotMatched componenten kunnen alleen de insert actie hebben. De nieuwe rij wordt gegenereerd op basis van de opgegeven kolom en bijbehorende expressies. U hoeft niet alle kolommen in de doeltabel op te geven. Voor niet-opgegeven doelkolommen wordt NULL ingevoegd.

    • Elke whenNotMatched component kan een optionele voorwaarde hebben. Als de componentvoorwaarde aanwezig is, wordt alleen een bronrij ingevoegd als die voorwaarde waar is voor die rij. Anders wordt de bronkolom genegeerd.

    • Als er meerdere whenNotMatched componenten zijn, worden ze geëvalueerd in de volgorde waarin ze zijn opgegeven. Alle whenNotMatched componenten, met uitzondering van de laatste, moeten voorwaarden hebben.

    • Als u alle kolommen van de doel-Delta-tabel wilt invoegen met de bijbehorende kolommen van de brongegevensset, gebruikt u whenNotMatched(...).insertAll(). Dit komt overeen met:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      voor alle kolommen van de doel-Delta-tabel. Daarom gaat deze actie ervan uit dat de brontabel dezelfde kolommen heeft als die in de doeltabel, anders genereert de query een analysefout.

      Notitie

      Dit gedrag verandert wanneer automatische schemamigratie is ingeschakeld. Zie de automatische schemaontwikkeling voor meer informatie.

  • whenNotMatchedBySource componenten worden uitgevoerd wanneer een doelrij niet overeenkomt met een bronrij op basis van de samenvoegvoorwaarde. Deze componenten hebben de volgende semantiek.

    • whenNotMatchedBySource componenten kunnen opgeven delete en update acties uitvoeren.
    • Elke whenNotMatchedBySource component kan een optionele voorwaarde hebben. Als de componentvoorwaarde aanwezig is, wordt alleen een doelrij gewijzigd als die voorwaarde waar is voor die rij. Anders blijft de doelrij ongewijzigd.
    • Als er meerdere whenNotMatchedBySource componenten zijn, worden ze geëvalueerd in de volgorde waarin ze zijn opgegeven. Alle whenNotMatchedBySource componenten, met uitzondering van de laatste, moeten voorwaarden hebben.
    • Componenten hebben per definitie whenNotMatchedBySource geen bronrij waaruit kolomwaarden kunnen worden opgehaald, zodat er niet naar bronkolommen kan worden verwezen. Voor elke kolom die moet worden gewijzigd, kunt u een letterlijke waarde opgeven of een actie uitvoeren op de doelkolom, zoals SET target.deleted_count = target.deleted_count + 1.

Belangrijk

  • Een merge bewerking kan mislukken als meerdere rijen van de brongegevensset overeenkomen en de samenvoegbewerking probeert dezelfde rijen van de delta-doeltabel bij te werken. Volgens de SQL-semantiek van samenvoegen is een dergelijke updatebewerking dubbelzinnig omdat het onduidelijk is welke bronrij moet worden gebruikt om de overeenkomende doelrij bij te werken. U kunt de brontabel vooraf verwerken om de mogelijkheid van meerdere overeenkomsten te elimineren.
  • U kunt een SQL-bewerking MERGE alleen toepassen op een SQL-WEERGAVE als de weergave is gedefinieerd als CREATE VIEW viewName AS SELECT * FROM deltaTable.

Gegevensontdubbeling bij het schrijven naar Delta-tabellen

Een veelvoorkomend ETL-gebruiksvoorbeeld is het verzamelen van logboeken in de Delta-tabel door ze toe te voegen aan een tabel. Vaak kunnen de bronnen echter dubbele logboekrecords genereren en downstreamontdubbelingsstappen nodig zijn om ze te kunnen verzorgen. Met mergekunt u voorkomen dat u de dubbele records invoegt.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Notitie

De gegevensset met de nieuwe logboeken moet op zichzelf worden ontdubbeld. Door de SQL-semantiek van samenvoegen worden de nieuwe gegevens gematcht en ontdubbeld met de bestaande gegevens in de tabel, maar als er dubbele gegevens in de nieuwe gegevensset zijn, wordt deze ingevoegd. Ontdubbel de nieuwe gegevens daarom voordat u deze samenvoegt in de tabel.

Als u weet dat u slechts een paar dagen dubbele records krijgt, kunt u de query verder optimaliseren door de tabel op datum te partitioneren en vervolgens het datumbereik van de doeltabel op te geven waarop moet worden vergeleken.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Dit is efficiënter dan de vorige opdracht, omdat er alleen in de afgelopen zeven dagen aan logboeken naar duplicaten wordt gezocht, niet de hele tabel. Bovendien kunt u deze invoegbewerking alleen gebruiken met Structured Streaming om continue ontdubbeling van de logboeken uit te voeren.

  • In een streamingquery kunt u de samenvoegbewerking foreachBatch gebruiken om continu streaminggegevens naar een Delta-tabel te schrijven met ontdubbeling. Zie het volgende streamingvoorbeeld voor meer informatie over foreachBatch.
  • In een andere streamingquery kunt u continu ontdubbelde gegevens uit deze Delta-tabel lezen. Dit is mogelijk omdat met een samenvoegbewerking alleen nieuwe gegevens worden toegevoegd aan de Delta-tabel.

Langzaam wijzigen van gegevens (SCD) en gegevensopname wijzigen (CDC) met Delta Lake

Delta Live Tables biedt systeemeigen ondersteuning voor het bijhouden en toepassen van SCD-type 1 en Type 2. Gebruik APPLY CHANGES INTO met Delta Live Tables om ervoor te zorgen dat records buiten de order correct worden verwerkt bij het verwerken van CDC-feeds. Zie DE WIJZIGINGEN-API TOEPASSEN: het vastleggen van wijzigingen in Delta Live Tables vereenvoudigen.

Delta-tabel incrementeel synchroniseren met bron

In Databricks SQL en Databricks Runtime 12.2 LTS en hoger kunt WHEN NOT MATCHED BY SOURCE u willekeurige voorwaarden maken om een deel van een tabel atomisch te verwijderen en te vervangen. Dit kan met name handig zijn wanneer u een brontabel hebt waarin records enkele dagen na de initiële gegevensinvoer kunnen worden gewijzigd of verwijderd, maar uiteindelijk een definitieve status hebben.

In de volgende query ziet u hoe u dit patroon gebruikt om vijf dagen records uit de bron te selecteren, overeenkomende records in het doel bij te werken, nieuwe records van de bron in te voegen aan het doel en alle niet-overeenkomende records uit de afgelopen 5 dagen in het doel te verwijderen.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Door hetzelfde Booleaanse filter op te geven voor de bron- en doeltabellen, kunt u wijzigingen van uw bron dynamisch doorgeven aan doeltabellen, inclusief verwijderingen.

Notitie

Hoewel dit patroon kan worden gebruikt zonder voorwaardelijke componenten, zou dit leiden tot het volledig herschrijven van de doeltabel die duur kan zijn.