Delta-Tabelle: Streaming für Lese- und Schreibvorgänge

Delta Lake ist stark in Spark Structured Streaming über readStream und writeStream integriert. Delta Lake überwindet viele der Einschränkungen, die in der Regel mit Streaming-Systemen und -Dateien verbunden sind, einschließlich:

  • Zusammenführen kleiner Dateien, die durch die Erfassung mit geringer Wartezeit entstanden sind
  • Beibehalten der „Genau einmal“-Verarbeitung mit mehr als einem Stream (oder gleichzeitigen Batchaufträgen)
  • Effizientes Aufdecken, welche Dateien neu sind, wenn Dateien als Quelle für einen Stream verwendet werden

Hinweis

In diesem Artikel wird die Verwendung von Delta Lake-Tabellen als Quellen und Senken für das Streaming beschrieben. Informationen zum Laden von Daten mithilfe von Streamingtabellen in Databricks SQL finden Sie unter Laden von Daten mithilfe von Streamingtabellen in Databricks SQL.

Delta-Tabelle als Quelle

Beim strukturierten Streaming werden Delta-Tabellen inkrementell gelesen. Während eine Streamingabfrage für eine Delta-Tabelle aktiv ist, werden neue Datensätze idempotent verarbeitet, wenn neue Tabellenversionen in die Quelltabelle committet werden.

Die folgenden Codebeispiele zeigen das Konfigurieren eines Streaminglesevorgangs mithilfe des Tabellennamens oder des Dateipfads.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Wichtig

Wenn sich das Schema für eine Delta-Tabelle ändert, nachdem ein Streaminglesevorgang für die Tabelle beginnt, dann schlägt die Abfrage fehl. Bei den meisten Schemaänderungen können Sie den Stream neu starten, um Schemakonflikte zu beheben und die Verarbeitung fortzusetzen.

In Databricks Runtime 13.0 und älteren Versionen können Sie nicht aus einer Delta-Tabelle mit aktivierter Spaltenzuordnung streamen, die eine nicht additive Schemaentwicklung durchlaufen hat, z. B. Umbenennen oder Löschen von Spalten. Ausführliche Informationen finden Sie unter Streaming mit Spaltenzuordnung und Schemaänderungen.

Einschränken der Eingaberate

Die folgenden Optionen stehen zur Steuerung von Microbatchs zur Verfügung:

  • maxFilesPerTrigger: Gibt an, wie viele neue Dateien in jedem Microbatch berücksichtigt werden sollen. Der Standardwert lautet 1000.
  • maxBytesPerTrigger: Gibt an, wie viele Daten in jedem Microbatch verarbeitet werden. Diese Option legt einen „soft max“ fest, was bedeutet, dass ein Batch ungefähr diese Datenmenge verarbeitet und möglicherweise mehr als den Grenzwert verarbeitet, um die Streamingabfrage voranzubringen, wenn die kleinste Eingabeeinheit größer als dieser Grenzwert ist. Dies ist nicht standardmäßig festgelegt.

Wenn Sie maxBytesPerTrigger in Verbindung mit maxFilesPerTrigger verwenden, verarbeitet der Microbatch Daten, bis entweder der Grenzwert maxFilesPerTrigger oder maxBytesPerTrigger erreicht ist.

Hinweis

In Fällen, in denen die Quelltabellentransaktionen aufgrund der logRetentionDurationKonfiguration bereinigt werden und die Streamingabfrage versucht, diese Versionen zu verarbeiten, kann die Abfrage standardmäßig keinen Datenverlust vermeiden. Sie können die Option failOnDataLoss auf false festlegen, um verlorene Daten zu ignorieren und die Verarbeitung fortzusetzen.

Streamen eines Delta Lake-CDC-Feeds (Change Data Capture)

Im Änderungsdatenfeed von Delta Lake werden Änderungen an einer Delta-Tabelle aufgezeichnet, einschließlich Updates und Löschvorgängen. Wenn der Feed aktiviert ist, können Sie aus einem Änderungsdatenfeed streamen und Logik schreiben, um Einfüge-, Aktualisierungs- und Löschvorgänge in Downstreamtabellen zu verarbeiten. Obwohl sich die Datenausgabe des Änderungsdatenfeeds geringfügig von der beschriebenen Delta-Tabelle unterscheidet, stellt dies eine Lösung für die Weitergabe inkrementeller Änderungen an Downstreamtabellen in einer Medaillon-Architektur dar.

Wichtig

In Databricks Runtime 13.0 und älteren Versionen können Sie nicht aus dem Änderungsdatenfeed für eine Delta-Tabelle mit aktivierter Spaltenzuordnung streamen, die eine nicht additive Schemaentwicklung durchlaufen hat, z. B. Umbenennen oder Löschen von Spalten. Weitere Informationen finden Sie unter Streaming mit Spaltenzuordnung und Schemaänderungen.

Ignorieren von Updates und Löschungen

Strukturiertes Streaming verarbeitet keine Eingabe, die keine Anfügung ist, und löst eine Ausnahme aus, wenn Änderungen an der Tabelle vorgenommen werden, die als Quelle verwendet wird. Es gibt zwei Hauptstrategien für den Umgang mit Änderungen, die nicht automatisch nachgeschaltet propagiert werden können:

  • Sie können die Ausgabe und den Prüfpunkt löschen und den Stream von Anfang an neu starten.
  • Sie können eine der beiden Optionen festlegen:
    • ignoreDeletes: Transaktionen ignorieren, die Daten an Partitionsgrenzen löschen.
    • skipChangeCommits: Transaktionen ignorieren, die vorhandene Datensätze löschen oder ändern. skipChangeCommits subsumiert ignoreDeletes.

Hinweis

In Databricks Runtime 12.1 und höher wird skipChangeCommits die vorherige Einstellung ignoreChanges als veraltet festlegen. In Databricks Runtime 12.0 und niedriger ist ignoreChanges die einzige unterstützte Option.

Die Semantik für ignoreChanges unterscheidet sich sehr stark von skipChangeCommits. Wenn ignoreChanges aktiviert ist, werden neu geschriebene Datendateien in der Quelltabelle nach einem Datenänderungsvorgang wie UPDATE, MERGE INTO, DELETE (innerhalb von Partitionen) oder OVERWRITE erneut ausgegeben. Unveränderte Zeilen werden häufig zusammen mit neuen Zeilen ausgegeben, sodass Downstream-Consumer in der Lage sein müssen, mit Duplikaten umzugehen. Löschungen werden nicht nachgeschaltet propagiert. ignoreChanges subsumiert ignoreDeletes.

skipChangeCommits ignoriert Dateiänderungsvorgänge vollständig. Datendateien, die in der Quelltabelle aufgrund von Datenänderungsvorgängen wie UPDATE, MERGE INTO, DELETE und OVERWRITE neu geschrieben werden, werden vollständig ignoriert. Um Änderungen in Upstream-Quelltabellen widerzuspiegeln, müssen Sie eine separate Logik implementieren, um diese Änderungen weiterzuverbreiten.

Workloads, die mit ignoreChanges konfiguriert sind, werden weiterhin mit bekannter Semantik funktionieren, aber Databricks empfiehlt die Verwendung von skipChangeCommits für alle neuen Workloads. Das Migrieren von Workloads mittels ignoreChanges für skipChangeCommits erfordert Refactoringlogik.

Beispiel

Angenommen, Sie haben eine Tabelle user_events mit den Spalten date, user_email und action, die bis date partitioniert ist. Sie streamen aus der Tabelle user_events und müssen Daten aus dieser Tabelle aufgrund der DSGVO löschen.

Wenn Sie an Partitionsgrenzen löschen (d. h. WHERE befindet sich in einer Partitionsspalte), sind die Dateien bereits nach Wert segmentiert, sodass diese Dateien beim Löschen einfach aus den Metadaten gelöscht werden. Wenn Sie eine gesamte Datenpartition löschen, können Sie Folgendes verwenden:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/tmp/delta/user_events")

Wenn Sie Daten in mehreren Partitionen löschen (in diesem Beispiel filtern nach user_email), verwenden Sie die folgende Syntax:

spark.readStream.format("delta")
  .option("skipChangeCommits", "true")
  .load("/tmp/delta/user_events")

Wenn Sie eine user_email mit der Anweisung UPDATE aktualisieren, wird die Datei mit der entsprechenden user_email umgeschrieben. Verwenden Sie skipChangeCommits, um die geänderten Datendateien zu ignorieren.

Angeben der Anfangsposition

Sie können die folgenden Optionen verwenden, um den Ausgangspunkt der Delta Lake-Streamingquelle anzugeben, ohne die gesamte Tabelle zu verarbeiten.

  • startingVersion: Die Delta Lake-Version, ab der gestartet werden soll. Databricks empfiehlt, diese Option für die meisten Workloads wegzulassen. Wird die Option nicht festgelegt, beginnt der Stream mit der neuesten verfügbaren Version, einschließlich einer vollständigen Momentaufnahme der Tabelle zu diesem Zeitpunkt.

    Wird die Option angegeben, liest der Stream ab der angegebenen Version (einschließlich) alle Änderungen an der Delta-Tabelle. Wenn die angegebene Version nicht mehr verfügbar ist, kann der Stream nicht gestartet werden. Sie können die Commit-Versionen aus der Spalte version des Befehlsverlaufs der DESCRIBE HISTORY abrufen.

    Geben Sie latest in Databricks Runtime 7.4 und höher an, um nur die neuesten Änderungen zurückzugeben.

  • startingTimestamp: Der Zeitstempel, ab dem gestartet werden soll. Alle Tabellenänderungen, die zum oder nach dem Zeitstempel (einschließlich) vorgenommen wurden, werden vom Streamingleser gelesen. Wenn der angegebene Zeitstempel allen Tabellencommits vorangestellt ist, beginnt der Streaminglesevorgang mit dem frühesten verfügbaren Zeitstempel. Eine der folgenden Optionen:

    • Eine Zeitstempelzeichenfolge. Beispiel: "2019-01-01T00:00:00.000Z".
    • Eine Datumszeichenfolge. Beispiel: "2019-01-01".

Sie können nicht beide Optionen gleichzeitig festlegen. Sie werden nur wirksam, wenn eine neue Streamingabfrage gestartet wird. Wenn eine Streamingabfrage gestartet wurde und der Fortschritt im Prüfpunkt aufgezeichnet wurde, werden diese Optionen ignoriert.

Wichtig

Obwohl Sie die Streamingquelle von einer angegebenen Version oder einem angegebenen Zeitstempel starten können, ist das Schema der Streamingquelle immer das neueste Schema der Delta-Tabelle. Sie müssen sicherstellen, dass es nach der angegebenen Version oder dem angegebenen Zeitstempel keine inkompatible Schemaänderung an der Delta-Tabelle gibt. Andernfalls gibt die Streamingquelle möglicherweise falsche Ergebnisse zurück, wenn die Daten mit einem falschen Schema gelesen werden.

Beispiel

Beispiel: Sie haben eine Tabelle user_events. Wenn Sie Änderungen ab Version 5 lesen möchten, verwenden Sie:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/tmp/delta/user_events")

Wenn Sie Änderungen ab Version 2018-10-18 lesen möchten, verwenden Sie:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/tmp/delta/user_events")

Verarbeiten der Anfangsmomentaufnahme, ohne dass Daten gelöscht werden

Hinweis

Dieses Feature ist in Databricks Runtime 11.1 und höher verfügbar. Dieses Feature befindet sich in der Public Preview.

Wenn eine Deltatabelle als Streamquelle verwendet wird, verarbeitet die Abfrage zunächst alle in der Tabelle vorhandenen Daten. Die Delta-Tabelle in dieser Version wird als Anfangsmomentaufnahme bezeichnet. Standardmäßig werden die Datendateien der Delta-Tabelle danach verarbeitet, welche Datei zuletzt geändert wurde. Der Zeitpunkt der letzten Änderung entspricht jedoch nicht unbedingt der zeitlichen Reihenfolge der Aufzeichnungsereignisse.

In einer zustandsbehafteten Streamingabfrage mit einem definierten Grenzwert kann die Verarbeitung von Dateien nach Änderungszeit dazu führen, dass Datensätze in der falschen Reihenfolge verarbeitet werden. Dies könnte dazu führen, dass Aufzeichnungen als verspätete Ereignisse durch den Grenzwert gelöscht werden.

Sie können das Datenverlustproblem vermeiden, indem Sie die folgende Option aktivieren:

  • withEventTimeOrder: Legt fest, ob die Anfangsmomentaufnahme mit Ereigniszeitreihenfolge verarbeitet werden soll.

Wenn die Ereigniszeitreihenfolge aktiviert ist, wird der Ereigniszeitbereich der Anfangsmomentaufnahme in Zeitbuckets unterteilt. Jeder Mikrobatch verarbeitet einen Bereich, indem er die Daten innerhalb des Zeitbereichs filtert. Die Konfigurationsoptionen „maxFilesPerTrigger“ und „maxBytesPerTrigger“ können weiterhin zur Steuerung der Mikrobatchgröße verwendet werden, jedoch aufgrund der Art der Verarbeitung nur annähernd.

Die folgende Grafik zeigt diesen Prozess:

Initial Snapshot

Wichtige Informationen zu diesem Feature:

  • Das Datenverlustproblem tritt nur auf, wenn die Delta-Anfangsmomentaufnahme einer zustandsbehafteten Streamingabfrage in der Standardreihenfolge verarbeitet wird.
  • Sie können withEventTimeOrder nicht mehr ändern, sobald die Streamabfrage gestartet wurde, während die Anfangsmomentaufnahme noch verarbeitet wird. Um mit geänderter withEventTimeOrder-Einstellung neu zu starten, müssen Sie den Prüfpunkt löschen.
  • Wenn Sie eine Streamabfrage mit aktivierter withEventTimeOrder-Option ausführen, können Sie erst auf eine DBR-Version herabstufen, die diese Funktion nicht unterstützt, wenn die Verarbeitung der Anfangsmomentaufnahme abgeschlossen ist. Wenn Sie ein Downgrade durchführen müssen, können Sie warten, bis die Anfangsmomentaufnahme abgeschlossen ist, oder Sie löschen den Prüfpunkt und starten die Abfrage neu.
  • Dieses Feature wird in den folgenden ungewöhnlichen Szenarien nicht unterstützt:
    • Die Ereigniszeitspalte ist eine generierte Spalte und es gibt nicht projektive Transformationen zwischen der Delta-Quelle und dem Grenzwert.
    • Es gibt einen Grenzwert, der mehr als eine Delta-Quelle in der Streamabfrage hat.
  • Bei aktivierter Ereigniszeitreihenfolge kann die Leistung der Delta-Anfangsmomentaufnahme langsamer sein.
  • Jeder Mikrobatch überprüft die Anfangsmomentaufnahme, um Daten innerhalb des entsprechenden Ereigniszeitbereichs zu filtern. Für eine schnellere Filterung ist es ratsam, eine Delta-Quellenspalte als Ereigniszeit zu verwenden, so dass Daten übersprungen werden können (Informationen zur Anwendbarkeit finden Sie unter Überspringen von Daten für Delta Lake). Darüber hinaus kann die Tabellenpartitionierung entlang der Ereigniszeitspalte die Verarbeitung weiter beschleunigen. Sie können auf der Spark-Benutzeroberfläche nachsehen, wie viele Delta-Dateien für einen bestimmten Mikrobatch gescannt werden.

Beispiel

Angenommen, Sie haben Tabelle user_events mit der Spalte event_time. Ihre Streamingabfrage ist eine Aggregationsabfrage. Wenn Sie sicherstellen möchten, dass während der Verarbeitung der Anfangsmomentaufnahme keine Daten verloren gehen, können Sie Folgendes verwenden:

spark.readStream.format("delta")
  .option("withEventTimeOrder", "true")
  .load("/tmp/delta/user_events")
  .withWatermark("event_time", "10 seconds")

Hinweis

Sie können dies auch mit der Spark-Konfiguration auf dem Cluster aktivieren, was dann für alle Streamingabfragen gilt: spark.databricks.delta.withEventTimeOrder.enabled true

Delta-Tabelle als eine Senke

Daten können auch mithilfe von strukturiertem Streaming in eine Delta-Tabelle geschrieben werden. Durch das Delta Lake-Transaktionsprotokoll wird eine Exactly Once-Verarbeitung garantiert, auch wenn parallel andere Datenströme oder Batchabfragen für die Tabelle ausgeführt werden.

Hinweis

Die Delta Lake-Funktion VACUUM entfernt alle Dateien, die nicht von Delta Lake verwaltet werden. Es werden aber alle Verzeichnisse übersprungen, die mit _ beginnen. Sie können Prüfpunkte mit anderen Daten und Metadaten für eine Delta-Tabelle mit einer Verzeichnisstruktur wie z. B. <table-name>/_checkpoints sicher speichern.

metrics

Sie können die Anzahl von Bytes und die Anzahl der Dateien, die noch verarbeitet werden müssen, in einem Streamingabfrageprozess als Metriken numBytesOutstanding und numFilesOutstanding herausfinden. Weitere Metriken umfassen:

  • numNewListedFiles: Anzahl der Delta Lake-Dateien, die zum Berechnen des Backlogs für diesen Batch aufgelistet wurden.
    • backlogEndOffset: Die Tabellenversion, die zum Berechnen des Backlogs verwendet wird.

Wenn Sie den Stream in einem Notebook ausführen, können Sie diese Metriken auf der Registerkarte Rohdaten im Dashboard für den Fortschritt der Streamingabfrage sehen:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Anfügemodus

Datenströme werden standardmäßig im Anfügemodus ausgeführt, wodurch der Tabelle neue Datensätze hinzugefügt werden.

Sie können die Path-Methode verwenden:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/_checkpoints/")
   .start("/delta/events")
)

Scala

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .start("/tmp/delta/events")

oder der toTable-Methode wie folgt:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Vollständiger Modus

Sie können strukturiertes Streaming auch verwenden, um die gesamte Tabelle durch jeden Batch zu ersetzen. Ein Beispiel hierfür ist das Berechnen einer Zusammenfassung mit Aggregation:

Python

(spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")
)

Scala

spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")

Im vorherigen Beispiel wird kontinuierlich eine Tabelle aktualisiert, die die aggregierte Anzahl von Ereignissen nach Kunden enthält.

Für Anwendungen mit geringeren Latenzanforderungen können Sie Computingressourcen mit einmaligen Auslösern sparen. Verwenden Sie diese, um zusammenfassende Aggregationstabellen nach einem bestimmten Zeitplan zu aktualisieren und nur neue Daten zu verarbeiten, die seit dem letzten Update eingetroffen sind.

Ausführen von Stream-statischen Verknüpfungen

Sie können sich auf die Transaktionsgarantien und das Versionsverwaltungsprotokoll von Delta Lake verlassen, um Stream-statisch-Verknüpfungen durchzuführen. Bei einer Stream-statisch-Verknüpfung wird die neueste gültige Version einer Delta-Tabelle (die statischen Daten) mithilfe einer zustandslosen Verknüpfung mit einem Datenstrom verknüpft.

Wenn Azure Databricks einen Mikrobatch von Daten in einer Stream-statisch-Verknüpfung verarbeitet, wird die neueste gültige Version von Daten aus der statischen Delta-Tabelle mit den Datensätzen verknüpft, die im aktuellen Mikrobatch enthalten sind. Da die Verknüpfung zustandslos ist, müssen Sie keine Wasserzeichen konfigurieren und können die Ergebnisse mit geringer Latenz verarbeiten. Die Daten in der statischen Delta-Tabelle, die in der Verknüpfung verwendet werden, sollten nur selten geändert werden.

streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
  .join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .table("orders_with_customer_info")
)

Ausführen eines Upsert aus Streamingabfragen mithilfe von foreachBatch

Sie können mithilfe einer Kombination aus merge und foreachBatch komplexe Upserts aus einer Streamingabfrage in eine Delta-Tabelle schreiben. Siehe Verwenden von foreachBatch zum Schreiben in beliebige Datensenken.

Dieses Muster hat viele Anwendungsmöglichkeiten, u. a.:

  • Schreiben von Streamingaggregaten im Updatemodus: Dies ist wesentlich effizienter als „Vollständiger Modus“.
  • Schreiben eines Datenstroms von Datenbankänderungen in eine Delta-Tabelle: Die Mergeabfrage zum Schreiben von Änderungsdaten kann in foreachBatch verwendet werden, um einen Datenstrom von Änderungen kontinuierlich auf eine Delta-Tabelle anzuwenden.
  • Schreiben eines Datenstroms in eine Delta-Tabelle mit Deduplizierung: Die „insert-only“-Mergeabfrage für die Deduplizierung kann in foreachBatch verwendet werden, um Daten (mit Duplikaten) kontinuierlich in eine Delta-Tabelle mit automatischer Deduplizierung zu schreiben.

Hinweis

  • Stellen Sie sicher, dass Ihre merge-Anweisung in foreachBatch idempotent ist, da Neustarts der Streamingabfrage den Vorgang mehrmals auf denselben Datenbatch anwenden können.
  • Wenn merge in foreachBatch verwendet wird, wird die Eingabedatenrate der Streamingabfrage (über StreamingQueryProgress gemeldet und im Notebook-Ratendiagramm sichtbar) möglicherweise als ein Vielfaches der tatsächlichen Rate gemeldet, mit der Daten an der Quelle generiert werden. Dies liegt daran, weil merge die Eingabedaten mehrmals liest, wodurch die Eingabemetriken vervielfacht werden. Wenn dies ein Engpass ist, können Sie den Datenrahmen-Batch vor merge zwischenspeichern und nach merge aus dem Zwischenspeicher wieder entfernen.

Im folgenden Beispiel wird veranschaulicht, wie Sie SQL in foreachBatch verwenden können, um diese Aufgabe auszuführen:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Sie können auch die Delta Lake-APIs verwenden, um Streamingupserts durchzuführen, wie im folgenden Beispiel gezeigt:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Idempotente Tabellenschreibvorgänge in foreachBatch

Hinweis

Databricks empfiehlt, für jede Senke, die Sie aktualisieren möchten, einen separaten Streamingschreibvorgang zu konfigurieren. Wenn Sie foreachBatch zum Schreiben in mehrere Tabellen verwenden, werden Schreibvorgänge serialisiert, was die Parallelität reduziert und die Gesamtwartezeit erhöht.

Delta-Tabellen unterstützen die folgenden DataFrameWriter Optionen, um Schreibvorgänge in mehrere Tabellen in foreachBatch idempotent zu erstellen:

  • txnAppId: Eine eindeutige Zeichenfolge, die Sie bei DataFrame-Schreibvorgang übergeben können. Beispielsweise können Sie die StreamingQuery ID als txnAppId verwenden.
  • txnVersion: Eine monoton steigende Zahl, die als Transaktionsversion fungiert.

Delta Lake verwendet die Kombination aus txnAppId und txnVersion, um doppelte Schreibvorgänge zu identifizieren und zu ignorieren.

Wenn ein Batchschreibvorgang mit einem Fehler unterbrochen wird, wird beim erneuten Ausführen des Batches die gleiche Anwendungs- und Batch ID verwendet, wodurch die Laufzeit doppelte Schreibvorgänge ordnungsgemäß identifizieren und ignorieren kann. Application ID (txnAppId) kann eine beliebige vom Benutzer generierte einzigartige Zeichenfolge sein und muss nicht mit der Stream ID verknüpft sein. Siehe Verwenden von foreachBatch zum Schreiben in beliebige Datensenken.

Warnung

Wenn Sie den Streamingprüfpunkt löschen und die Abfrage mit einem neuen Prüfpunkt neu starten, müssen Sie einen anderen txnAppId-Wert angeben. Neue Prüfpunkte beginnen mit der Batch-ID 0. Delta Lake verwendet die Batch-ID und txnAppId als eindeutigen Schlüssel und überspringt Batches mit bereits verwendeten Werten.

Im folgenden Codebeispiel wird dieses Muster veranschaulicht:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}