Überwachung strukturierter Streaming-Abfragen auf Azure Databricks

Azure Databricks bietet über die Spark-Benutzeroberfläche auf der Registerkarte Streaming eine integrierte Überwachung für strukturierte Streaming-Anwendungen.

Unterscheiden Sie strukturierte Streaming-Abfragen in der Spark-Benutzeroberfläche

Geben Sie Ihren Streams einen eindeutigen Abfragenamen, indem Sie .queryName(<query-name>) zu Ihrem writeStream-Code hinzufügen, damit Sie in der Spark-Benutzeroberfläche leicht unterscheiden können, welche Metriken zu welchem Stream gehören.

Pushen Sie strukturierte Streaming-Metriken an externe Dienste

Streamingmetriken können unter Verwendung der Streamingabfragelistener-Schnittstelle von Apache Spark an externe Dienste für Warnungs- oder Dashboardanwendungsfälle übertragen werden. Ab Databricks Runtime 11.3 LTS ist der Streamingabfragelistener in Python und Scala verfügbar.

Wichtig

Anmeldeinformationen und Objekte, die von Unity Catalog verwaltet werden, können nicht in der StreamingQueryListener-Logik verwendet werden.

Hinweis

Die Verarbeitungslatenz im Zusammenhang mit Listenern kann die Abfrageverarbeitung beeinträchtigen. Databricks empfiehlt, die Verarbeitungslogik in diesen Listenern zu minimieren und in Senken mit geringer Latenz (beispielsweise Kafka) zu schreiben.

Der folgende Code enthält einfache Syntaxbeispiele für die Implementierung eines Listeners:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definieren von beobachtbaren Metriken in strukturiertem Streaming

Beobachtbare Metriken werden als beliebige Aggregatfunktionen bezeichnet, die in einer Abfrage (DataFrame) definiert werden können. Sobald die Ausführung eines DataFrames einen Abschlusspunkt erreicht (d. h. eine Batchabfrage beendet oder eine Streamingepoche erreicht), wird ein benanntes Ereignis ausgegeben, das die Metriken für die seit dem letzten Abschlusspunkt verarbeiteten Daten enthält.

Sie können diese Metriken beobachten, indem Sie einen Listener an die Spark-Sitzung anfügen. Der Listener hängt vom Ausführungsmodus ab:

  • Batchmodus: Verwenden Sie QueryExecutionListener.

    QueryExecutionListener wird aufgerufen, wenn die Abfrage abgeschlossen ist. Greifen Sie mithilfe der QueryExecution.observedMetrics-Karte auf die Metriken zu.

  • Streaming oder Microbatch: Verwenden Sie StreamingQueryListener.

    StreamingQueryListener wird aufgerufen, wenn die Streamingabfrage eine Epoche abgeschlossen hat. Greifen Sie mithilfe der StreamingQueryProgress.observedMetrics-Karte auf die Metriken zu. Azure Databricks unterstützt kein fortlaufendes Ausführungsstreaming.

Beispiel:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener-Objektmetriken

Metrik Beschreibung
id Eindeutige Abfrage-ID, die über Neustarts hinweg bestehen bleibt. Siehe StreamingQuery.id().
runId Eindeutige Abfrage-ID für jeden Start oder Neustart. Siehe StreamingQuery.runId().
name Benutzerdefinierter Name der Abfrage. Null, wenn nicht angegeben.
timestamp Zeitstempel für die Ausführung des Micro-Batch.
batchId Eindeutige ID für den aktuellen Datenbatch der verarbeiteten Daten. Beachten Sie, dass im Falle von Wiederholungsversuchen nach einem Fehler eine bestimmte Batch-ID mehr als einmal ausgeführt werden kann. Wenn keine Daten zu verarbeiten sind, wird die Batch-ID nicht erhöht.
numInputRows Gesamtzahl der in einem Trigger verarbeiteten Datensätze (für alle Quellen hinweg).
inputRowsPerSecond Aggregierte Rate der eingehenden Daten (über alle Quellen hinweg).
processedRowsPerSecond Aggregierte Geschwindigkeit, mit der Spark Daten verarbeitet (über alle Quellen hinweg).

durationMs-Objekt

Informationen über die Zeit, die benötigt wird, um die verschiedenen Phasen der Ausführung des Micro-Batch-Prozesses abzuschließen.

Metrik Beschreibung
durationMs.addBatch Zeit, die für die Ausführung des Mikro-Batch benötigt wird. Dies schließt die Zeit aus, die Spark für die Planung des Mikro-Batch benötigt.
durationMs.getBatch Zeit, die benötigt wird, um die Metadaten zu den Offsets von der Quelle abzurufen.
durationMs.latestOffset Zuletzt verbrauchter Offset für das Mikro-Batch. Dieses Fortschrittsobjekt bezieht sich auf die Zeit, die benötigt wird, um den letzten Offset aus den Quellen abzurufen.
durationMs.queryPlanning Zeit, die für die Erstellung des Ausführungsplans benötigt wird.
durationMs.triggerExecution Zeit, die für die Planung und Ausführung des Mikro-Batch benötigt wird.
durationMs.walCommit Zeit, die für die Übertragung der neuen verfügbaren Offsets benötigt wird.

eventTime-Objekt

Informationen über den Zeitwert des Ereignisses in den Daten, die im Micro-Batch verarbeitet werden. Diese Daten werden vom Wasserzeichen verwendet, um herauszufinden, wie der Zustand für die Verarbeitung von zustandsbehafteten Aggregationen, die im Auftrag für strukturiertes Streaming definiert sind, gekürzt werden kann.

Metrik Beschreibung
eventTime.avg Durchschnittliche Zeit des Ereignisses im Auslöser.
eventTime.max Maximale Zeit des Ereignisses im Auslöser.
eventTime.min Minimale Zeit des Ereignisses im Auslöser.
eventTime.watermark Wert des Wasserzeichens, das im Auslöser verwendet wird.

stateOperators-Objekt

Informationen über die zustandsbehafteten Vorgänge, die im Auftrag für strukturiertes Streaming definiert sind, und die daraus resultierenden Aggregationen.

Metrik Beschreibung
stateOperators.operatorName Name des zustandsbehafteten Operators, auf den sich die Metriken beziehen. Beispiel: symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal Anzahl der Zeilen im Zustand als Ergebnis des zustandsbehafteten Operators oder der Aggregation.
stateOperators.numRowsUpdated Anzahl der Zeilen, die als Ergebnis des zustandsbehafteten Operators oder der Aggregation im Zustand aktualisiert wurden.
stateOperators.numRowsRemoved Anzahl der Zeilen, die als Ergebnis des zustandsbehafteten Operators oder der Aggregation aus dem Zustand entfernt wurden.
stateOperators.commitTimeMs Zeit, die benötigt wird, um alle Aktualisierungen (hinzugefügt und entfernt) zu übertragen und eine neue Version zu erstellen.
stateOperators.memoryUsedBytes Vom Zustandsspeicher verwendeter Speicher.
stateOperators.numRowsDroppedByWatermark Anzahl der Zeilen, die als zu spät angesehen werden, um in die zustandsbehaftete Aggregation einbezogen zu werden. Gilt nur für Streamingaggregationen: Anzahl der Zeilen, die nach der Aggregation verworfen wurden, und keine unformatierten Eingabezeilen. Die Zahl ist nicht genau, aber sie kann darauf hinweisen, dass verspätete Daten verworfen werden.
stateOperators.numShufflePartitions Anzahl der Shuffle-Partitionen für diesen zustandsbehafteten Operator.
stateOperators.numStateStoreInstances Tatsächliche Zustandsspeicherinstanz, die der Operator initialisiert und verwaltet hat. In vielen zustandsbehafteten Operatoren entspricht dies der Anzahl der Partitionen, aber die Stream-Stream-Verknüpfung initialisiert vier Zustandsspeicherinstanzen pro Partition.

stateOperators.customMetrics-Objekt

Von RocksDB gesammelte Informationen, die Metriken über die Leistung und die Vorgänge in Bezug auf die zustandsbehafteten Werte erfassen, die für den strukturierten Streaming-Auftrag verwaltet werden. Weitere Informationen finden Sie unter Konfigurieren Sie den RocksDB-Statusspeicher auf Azure Databricks.

Metrik Beschreibung
customMetrics.rocksdbBytesCopied Anzahl der kopierten Bytes, die vom RocksDB File Manager nachverfolgt werden.
customMetrics.rocksdbCommitCheckpointLatency Zeit in Millisekunden, um eine Momentaufnahme der nativen RocksDB zu machen und sie in ein lokales Verzeichnis zu schreiben.
customMetrics.rocksdbCompactLatency Zeit in Millisekunden für die Komprimierung (optional) während des Prüfpunktcommits.
customMetrics.rocksdbCommitFileSyncLatencyMs Zeit in Millisekunden für die Synchronisierung der Dateien für die Momentaufnahme der nativen RocksDB-Instanz mit einem externen Speicher (Prüfpunktspeicherort).
customMetrics.rocksdbCommitFlushLatency Zeit in Millisekunden für das Leeren der In-Memory-Änderungen in RocksDB auf Ihren lokalen Datenträger.
customMetrics.rocksdbCommitPauseLatency Zeit in Millisekunden zum Anhalten der Hintergrund-Arbeitsthreads (z. B. für die Komprimierung) als Teil des Prüfpunktcommits.
customMetrics.rocksdbCommitWriteBatchLatency Zeit in Millisekunden, um die gestaffelten Schreibvorgänge in der In-Memory-Struktur (WriteBatch) auf die native RocksDB-Instanz anzuwenden.
customMetrics.rocksdbFilesCopied Anzahl der kopierten Dateien, die vom RocksDB File Manager nachverfolgt werden.
customMetrics.rocksdbFilesReused Anzahl der wiederverwendeten Dateien, die vom RocksDB File Manager nachverfolgt werden.
customMetrics.rocksdbGetCount Anzahl der get-Aufrufe an die DB (dies beinhaltet nicht gets von WriteBatch: In-Memory-Batch, der für Staging-Schreibvorgänge verwendet wird).
customMetrics.rocksdbGetLatency Durchschnittliche Zeit in Nanosekunden pro zugrunde liegendem nativen RocksDB::Get-Aufruf.
customMetrics.rocksdbReadBlockCacheHitCount Wie viel vom Block-Cache in RocksDB nützlich ist oder nicht und wie man lokale Datentägerlesevorgänge vermeiden kann.
customMetrics.rocksdbReadBlockCacheMissCount Wie viel vom Block-Cache in RocksDB nützlich ist oder nicht und wie man lokale Datentägerlesevorgänge vermeiden kann.
customMetrics.rocksdbSstFileSize Größe aller SST-Dateien. SST steht für Static Sorted Table (statisch sortierte Tabelle). Dies ist die tabellarische Struktur, die RocksDB zum Speichern von Daten verwendet.
customMetrics.rocksdbTotalBytesRead Anzahl unkomprimierter Bytes, die durch get-Vorgänge gelesen wurden.
customMetrics.rocksdbTotalBytesReadByCompaction Anzahl der Bytes, die der Komprimierungsprozess vom Datenträger liest.
customMetrics.rocksdbTotalBytesReadThroughIterator Einige der zustandsbehafteten Vorgänge (z. B. Timeout-Verarbeitung in FlatMapGroupsWithState und Wasserzeichen) erfordern das Lesen von Daten in der DB über einen Iterator. Diese Metrik stellt die Größe der unkomprimierten Daten dar, die mit dem Iterator gelesen werden.
customMetrics.rocksdbTotalBytesWritten Anzahl der unkomprimierten Bytes, die durch put-Vorgänge geschrieben wurden.
customMetrics.rocksdbTotalBytesWrittenByCompaction Anzahl der Bytes, die der Komprimierungsprozess auf den Datenträger schreibt.
customMetrics.rocksdbTotalCompactionLatencyMs Zeit in Millisekunden für die Komprimierung von RocksDB, einschließlich der Komprimierung im Hintergrund und der optionalen Komprimierung, die während des Commits eingeleitet wird.
customMetrics.rocksdbTotalFlushLatencyMs Leerungsdauer, einschließlich Hintergrundleerung. Leerungsvorgänge sind Vorgänge, mit denen die MemTable in den Speicher geleert wird, sobald sie voll ist. MemTables sind die erste Ebene, auf der Daten in RocksDB gespeichert werden.
customMetrics.rocksdbZipFileBytesUncompressed RocksDB File Manager verwaltet die Nutzung und Löschung des physischen SST-Dateispeichers. Diese Metrik stellt die unkomprimierten Zip-Dateien in Bytes dar, wie vom Dateimanager gemeldet.

sources-Objekt (Kafka)

Metrik Beschreibung
sources.description Name der Quelle, aus der die Streaming-Abfrage gelesen wird. Beispiel: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset-Objekt Start-Offset-Nummer innerhalb des Kafka-Themas, mit dem der Streaming-Auftrag begonnen hat.
sources.endOffset-Objekt Letzter vom Mikro-Batch verarbeiteter Offset. Bei einer laufenden Mikro-Batch-Ausführung könnte dies gleich latestOffset sein.
sources.latestOffset-Objekt Der letzte vom Mikrobatch ermittelte Wert. Wenn es eine Drosselung gibt, kann es sein, dass der Micro-Batching-Prozess nicht alle Offsets verarbeitet, so dass endOffset und latestOffset voneinander abweichen.
sources.numInputRows Anzahl der verarbeiteten Eingabezeilen aus dieser Quelle.
sources.inputRowsPerSecond Rate, mit der die Daten für diese Quelle zur Verarbeitung ankommen.
sources.processedRowsPerSecond Rate, mit der Spark die Daten für diese Quelle verarbeitet.

sources.metrics-Objekt (Kafka)

Metrik Beschreibung
sources.metrics.avgOffsetsBehindLatest Durchschnittliche Anzahl der Offsets, um die die Streaming-Anfrage unter allen abonnierten Themen hinter dem letzten verfügbaren Offset zurückliegt.
sources.metrics.estimatedTotalBytesBehindLatest Geschätzte Anzahl der Bytes, die der Abfrageprozess nicht von den abonnierten Themen verbraucht hat.
sources.metrics.maxOffsetsBehindLatest Maximale Anzahl der Offsets, um die die Streaming-Anfrage hinter dem letzten verfügbaren Offset aller abonnierten Themen zurückliegt.
sources.metrics.minOffsetsBehindLatest Mindestanzahl der Offsets, um die die Streaming-Anfrage hinter dem letzten verfügbaren Offset aller abonnierten Themen zurückliegt.

sink-Objekt (Kafka)

Metrik Beschreibung
sink.description Name der Senke, in die die Streaming-Abfrage schreibt. Beispiel: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Anzahl der Zeilen, die als Teil des Mikro-Batch in die Ausgabetabelle oder Senke geschrieben wurden. In einigen Situationen kann dieser Wert „-1“ sein und kann im Allgemeinen als „unbekannt“ interpretiert werden.

sources-Objekt (Delta Lake)

Metrik Beschreibung
sources.description Name der Quelle, aus der die Streaming-Abfrage gelesen wird. Beispiel: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Version der Serialisierung, mit der dieser Offset kodiert ist.
sources.[startOffset/endOffset].reservoirId ID der Tabelle, aus der Sie lesen. Dies wird verwendet, um Fehlkonfigurationen beim Neustart einer Abfrage zu erkennen.
sources.[startOffset/endOffset].reservoirVersion Version der Tabelle, die Sie gerade bearbeiten.
sources.[startOffset/endOffset].index Index in der Sequenz von AddFiles in dieser Version. Dies wird verwendet, um große Commits in mehrere Batches aufzuteilen. Dieser Index wird durch Sortieren nach modificationTimestamp und path erstellt.
sources.[startOffset/endOffset].isStartingVersion Gibt an, ob dieser Offset eine Abfrage bezeichnet, die gerade beginnt, anstatt Änderungen zu verarbeiten. Wenn Sie eine neue Abfrage starten, werden zunächst alle Daten verarbeitet, die sich zu Beginn in der Tabelle befinden, und dann die neu hinzugekommenen Daten.
sources.latestOffset Letzter vom Mikro-Batch verarbeiteter Offset.
sources.numInputRows Anzahl der verarbeiteten Eingabezeilen aus dieser Quelle.
sources.inputRowsPerSecond Rate, mit der die Daten für diese Quelle zur Verarbeitung ankommen.
sources.processedRowsPerSecond Rate, mit der Spark die Daten für diese Quelle verarbeitet.
sources.metrics.numBytesOutstanding Gesamtgröße der ausstehenden Dateien (von RocksDB nachverfolgte Dateien). Dies ist die Backlog-Metrik für Delta und Autoloader als Streaming-Quelle.
sources.metrics.numFilesOutstanding Anzahl der ausstehenden Dateien, die verarbeitet werden müssen. Dies ist die Backlog-Metrik für Delta und Autoloader als Streaming-Quelle.

sink-Objekt (Delta Lake)

Metrik Beschreibung
sink.description Name der Senke, in die die Streaming-Abfrage schreibt. Beispiel: “DeltaSink[table]”.
sink.numOutputRows Die Anzahl der Zeilen in dieser Metrik ist „-1“, da Spark keine Ausgabezeilen für DSv1-Senken ableiten kann, was die Klassifizierung für die Delta Lake-Senke ist.

Beispiele

Beispielereignis Kafka-zu-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Beispielereignis Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Beispielereignis Raten-Quelle zu Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}