Überwachung strukturierter Streaming-Abfragen auf Azure Databricks
Azure Databricks bietet über die Spark-Benutzeroberfläche auf der Registerkarte Streaming eine integrierte Überwachung für strukturierte Streaming-Anwendungen.
Unterscheiden Sie strukturierte Streaming-Abfragen in der Spark-Benutzeroberfläche
Geben Sie Ihren Streams einen eindeutigen Abfragenamen, indem Sie .queryName(<query-name>)
zu Ihrem writeStream
-Code hinzufügen, damit Sie in der Spark-Benutzeroberfläche leicht unterscheiden können, welche Metriken zu welchem Stream gehören.
Pushen Sie strukturierte Streaming-Metriken an externe Dienste
Streamingmetriken können unter Verwendung der Streamingabfragelistener-Schnittstelle von Apache Spark an externe Dienste für Warnungs- oder Dashboardanwendungsfälle übertragen werden. Ab Databricks Runtime 11.3 LTS ist der Streamingabfragelistener in Python und Scala verfügbar.
Wichtig
Anmeldeinformationen und Objekte, die von Unity Catalog verwaltet werden, können nicht in der StreamingQueryListener
-Logik verwendet werden.
Hinweis
Die Verarbeitungslatenz im Zusammenhang mit Listenern kann die Abfrageverarbeitung beeinträchtigen. Databricks empfiehlt, die Verarbeitungslogik in diesen Listenern zu minimieren und in Senken mit geringer Latenz (beispielsweise Kafka) zu schreiben.
Der folgende Code enthält einfache Syntaxbeispiele für die Implementierung eines Listeners:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definieren von beobachtbaren Metriken in strukturiertem Streaming
Beobachtbare Metriken werden als beliebige Aggregatfunktionen bezeichnet, die in einer Abfrage (DataFrame) definiert werden können. Sobald die Ausführung eines DataFrames einen Abschlusspunkt erreicht (d. h. eine Batchabfrage beendet oder eine Streamingepoche erreicht), wird ein benanntes Ereignis ausgegeben, das die Metriken für die seit dem letzten Abschlusspunkt verarbeiteten Daten enthält.
Sie können diese Metriken beobachten, indem Sie einen Listener an die Spark-Sitzung anfügen. Der Listener hängt vom Ausführungsmodus ab:
Batchmodus: Verwenden Sie
QueryExecutionListener
.QueryExecutionListener
wird aufgerufen, wenn die Abfrage abgeschlossen ist. Greifen Sie mithilfe derQueryExecution.observedMetrics
-Karte auf die Metriken zu.Streaming oder Microbatch: Verwenden Sie
StreamingQueryListener
.StreamingQueryListener
wird aufgerufen, wenn die Streamingabfrage eine Epoche abgeschlossen hat. Greifen Sie mithilfe derStreamingQueryProgress.observedMetrics
-Karte auf die Metriken zu. Azure Databricks unterstützt kein fortlaufendes Ausführungsstreaming.
Beispiel:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
StreamingQueryListener-Objektmetriken
Metrik | Beschreibung |
---|---|
id |
Eindeutige Abfrage-ID, die über Neustarts hinweg bestehen bleibt. Siehe StreamingQuery.id(). |
runId |
Eindeutige Abfrage-ID für jeden Start oder Neustart. Siehe StreamingQuery.runId(). |
name |
Benutzerdefinierter Name der Abfrage. Null, wenn nicht angegeben. |
timestamp |
Zeitstempel für die Ausführung des Micro-Batch. |
batchId |
Eindeutige ID für den aktuellen Datenbatch der verarbeiteten Daten. Beachten Sie, dass im Falle von Wiederholungsversuchen nach einem Fehler eine bestimmte Batch-ID mehr als einmal ausgeführt werden kann. Wenn keine Daten zu verarbeiten sind, wird die Batch-ID nicht erhöht. |
numInputRows |
Gesamtzahl der in einem Trigger verarbeiteten Datensätze (für alle Quellen hinweg). |
inputRowsPerSecond |
Aggregierte Rate der eingehenden Daten (über alle Quellen hinweg). |
processedRowsPerSecond |
Aggregierte Geschwindigkeit, mit der Spark Daten verarbeitet (über alle Quellen hinweg). |
durationMs-Objekt
Informationen über die Zeit, die benötigt wird, um die verschiedenen Phasen der Ausführung des Micro-Batch-Prozesses abzuschließen.
Metrik | Beschreibung |
---|---|
durationMs.addBatch |
Zeit, die für die Ausführung des Mikro-Batch benötigt wird. Dies schließt die Zeit aus, die Spark für die Planung des Mikro-Batch benötigt. |
durationMs.getBatch |
Zeit, die benötigt wird, um die Metadaten zu den Offsets von der Quelle abzurufen. |
durationMs.latestOffset |
Zuletzt verbrauchter Offset für das Mikro-Batch. Dieses Fortschrittsobjekt bezieht sich auf die Zeit, die benötigt wird, um den letzten Offset aus den Quellen abzurufen. |
durationMs.queryPlanning |
Zeit, die für die Erstellung des Ausführungsplans benötigt wird. |
durationMs.triggerExecution |
Zeit, die für die Planung und Ausführung des Mikro-Batch benötigt wird. |
durationMs.walCommit |
Zeit, die für die Übertragung der neuen verfügbaren Offsets benötigt wird. |
eventTime-Objekt
Informationen über den Zeitwert des Ereignisses in den Daten, die im Micro-Batch verarbeitet werden. Diese Daten werden vom Wasserzeichen verwendet, um herauszufinden, wie der Zustand für die Verarbeitung von zustandsbehafteten Aggregationen, die im Auftrag für strukturiertes Streaming definiert sind, gekürzt werden kann.
Metrik | Beschreibung |
---|---|
eventTime.avg |
Durchschnittliche Zeit des Ereignisses im Auslöser. |
eventTime.max |
Maximale Zeit des Ereignisses im Auslöser. |
eventTime.min |
Minimale Zeit des Ereignisses im Auslöser. |
eventTime.watermark |
Wert des Wasserzeichens, das im Auslöser verwendet wird. |
stateOperators-Objekt
Informationen über die zustandsbehafteten Vorgänge, die im Auftrag für strukturiertes Streaming definiert sind, und die daraus resultierenden Aggregationen.
Metrik | Beschreibung |
---|---|
stateOperators.operatorName |
Name des zustandsbehafteten Operators, auf den sich die Metriken beziehen. Beispiel: symmetricHashJoin , dedupe , stateStoreSave . |
stateOperators.numRowsTotal |
Anzahl der Zeilen im Zustand als Ergebnis des zustandsbehafteten Operators oder der Aggregation. |
stateOperators.numRowsUpdated |
Anzahl der Zeilen, die als Ergebnis des zustandsbehafteten Operators oder der Aggregation im Zustand aktualisiert wurden. |
stateOperators.numRowsRemoved |
Anzahl der Zeilen, die als Ergebnis des zustandsbehafteten Operators oder der Aggregation aus dem Zustand entfernt wurden. |
stateOperators.commitTimeMs |
Zeit, die benötigt wird, um alle Aktualisierungen (hinzugefügt und entfernt) zu übertragen und eine neue Version zu erstellen. |
stateOperators.memoryUsedBytes |
Vom Zustandsspeicher verwendeter Speicher. |
stateOperators.numRowsDroppedByWatermark |
Anzahl der Zeilen, die als zu spät angesehen werden, um in die zustandsbehaftete Aggregation einbezogen zu werden. Gilt nur für Streamingaggregationen: Anzahl der Zeilen, die nach der Aggregation verworfen wurden, und keine unformatierten Eingabezeilen. Die Zahl ist nicht genau, aber sie kann darauf hinweisen, dass verspätete Daten verworfen werden. |
stateOperators.numShufflePartitions |
Anzahl der Shuffle-Partitionen für diesen zustandsbehafteten Operator. |
stateOperators.numStateStoreInstances |
Tatsächliche Zustandsspeicherinstanz, die der Operator initialisiert und verwaltet hat. In vielen zustandsbehafteten Operatoren entspricht dies der Anzahl der Partitionen, aber die Stream-Stream-Verknüpfung initialisiert vier Zustandsspeicherinstanzen pro Partition. |
stateOperators.customMetrics-Objekt
Von RocksDB gesammelte Informationen, die Metriken über die Leistung und die Vorgänge in Bezug auf die zustandsbehafteten Werte erfassen, die für den strukturierten Streaming-Auftrag verwaltet werden. Weitere Informationen finden Sie unter Konfigurieren Sie den RocksDB-Statusspeicher auf Azure Databricks.
Metrik | Beschreibung |
---|---|
customMetrics.rocksdbBytesCopied |
Anzahl der kopierten Bytes, die vom RocksDB File Manager nachverfolgt werden. |
customMetrics.rocksdbCommitCheckpointLatency |
Zeit in Millisekunden, um eine Momentaufnahme der nativen RocksDB zu machen und sie in ein lokales Verzeichnis zu schreiben. |
customMetrics.rocksdbCompactLatency |
Zeit in Millisekunden für die Komprimierung (optional) während des Prüfpunktcommits. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Zeit in Millisekunden für die Synchronisierung der Dateien für die Momentaufnahme der nativen RocksDB-Instanz mit einem externen Speicher (Prüfpunktspeicherort). |
customMetrics.rocksdbCommitFlushLatency |
Zeit in Millisekunden für das Leeren der In-Memory-Änderungen in RocksDB auf Ihren lokalen Datenträger. |
customMetrics.rocksdbCommitPauseLatency |
Zeit in Millisekunden zum Anhalten der Hintergrund-Arbeitsthreads (z. B. für die Komprimierung) als Teil des Prüfpunktcommits. |
customMetrics.rocksdbCommitWriteBatchLatency |
Zeit in Millisekunden, um die gestaffelten Schreibvorgänge in der In-Memory-Struktur (WriteBatch ) auf die native RocksDB-Instanz anzuwenden. |
customMetrics.rocksdbFilesCopied |
Anzahl der kopierten Dateien, die vom RocksDB File Manager nachverfolgt werden. |
customMetrics.rocksdbFilesReused |
Anzahl der wiederverwendeten Dateien, die vom RocksDB File Manager nachverfolgt werden. |
customMetrics.rocksdbGetCount |
Anzahl der get -Aufrufe an die DB (dies beinhaltet nicht gets von WriteBatch : In-Memory-Batch, der für Staging-Schreibvorgänge verwendet wird). |
customMetrics.rocksdbGetLatency |
Durchschnittliche Zeit in Nanosekunden pro zugrunde liegendem nativen RocksDB::Get -Aufruf. |
customMetrics.rocksdbReadBlockCacheHitCount |
Wie viel vom Block-Cache in RocksDB nützlich ist oder nicht und wie man lokale Datentägerlesevorgänge vermeiden kann. |
customMetrics.rocksdbReadBlockCacheMissCount |
Wie viel vom Block-Cache in RocksDB nützlich ist oder nicht und wie man lokale Datentägerlesevorgänge vermeiden kann. |
customMetrics.rocksdbSstFileSize |
Größe aller SST-Dateien. SST steht für Static Sorted Table (statisch sortierte Tabelle). Dies ist die tabellarische Struktur, die RocksDB zum Speichern von Daten verwendet. |
customMetrics.rocksdbTotalBytesRead |
Anzahl unkomprimierter Bytes, die durch get -Vorgänge gelesen wurden. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Anzahl der Bytes, die der Komprimierungsprozess vom Datenträger liest. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Einige der zustandsbehafteten Vorgänge (z. B. Timeout-Verarbeitung in FlatMapGroupsWithState und Wasserzeichen) erfordern das Lesen von Daten in der DB über einen Iterator. Diese Metrik stellt die Größe der unkomprimierten Daten dar, die mit dem Iterator gelesen werden. |
customMetrics.rocksdbTotalBytesWritten |
Anzahl der unkomprimierten Bytes, die durch put -Vorgänge geschrieben wurden. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Anzahl der Bytes, die der Komprimierungsprozess auf den Datenträger schreibt. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Zeit in Millisekunden für die Komprimierung von RocksDB, einschließlich der Komprimierung im Hintergrund und der optionalen Komprimierung, die während des Commits eingeleitet wird. |
customMetrics.rocksdbTotalFlushLatencyMs |
Leerungsdauer, einschließlich Hintergrundleerung. Leerungsvorgänge sind Vorgänge, mit denen die MemTable in den Speicher geleert wird, sobald sie voll ist. MemTables sind die erste Ebene, auf der Daten in RocksDB gespeichert werden. |
customMetrics.rocksdbZipFileBytesUncompressed |
RocksDB File Manager verwaltet die Nutzung und Löschung des physischen SST-Dateispeichers. Diese Metrik stellt die unkomprimierten Zip-Dateien in Bytes dar, wie vom Dateimanager gemeldet. |
sources-Objekt (Kafka)
Metrik | Beschreibung |
---|---|
sources.description |
Name der Quelle, aus der die Streaming-Abfrage gelesen wird. Beispiel: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
sources.startOffset -Objekt |
Start-Offset-Nummer innerhalb des Kafka-Themas, mit dem der Streaming-Auftrag begonnen hat. |
sources.endOffset -Objekt |
Letzter vom Mikro-Batch verarbeiteter Offset. Bei einer laufenden Mikro-Batch-Ausführung könnte dies gleich latestOffset sein. |
sources.latestOffset -Objekt |
Der letzte vom Mikrobatch ermittelte Wert. Wenn es eine Drosselung gibt, kann es sein, dass der Micro-Batching-Prozess nicht alle Offsets verarbeitet, so dass endOffset und latestOffset voneinander abweichen. |
sources.numInputRows |
Anzahl der verarbeiteten Eingabezeilen aus dieser Quelle. |
sources.inputRowsPerSecond |
Rate, mit der die Daten für diese Quelle zur Verarbeitung ankommen. |
sources.processedRowsPerSecond |
Rate, mit der Spark die Daten für diese Quelle verarbeitet. |
sources.metrics-Objekt (Kafka)
Metrik | Beschreibung |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Durchschnittliche Anzahl der Offsets, um die die Streaming-Anfrage unter allen abonnierten Themen hinter dem letzten verfügbaren Offset zurückliegt. |
sources.metrics.estimatedTotalBytesBehindLatest |
Geschätzte Anzahl der Bytes, die der Abfrageprozess nicht von den abonnierten Themen verbraucht hat. |
sources.metrics.maxOffsetsBehindLatest |
Maximale Anzahl der Offsets, um die die Streaming-Anfrage hinter dem letzten verfügbaren Offset aller abonnierten Themen zurückliegt. |
sources.metrics.minOffsetsBehindLatest |
Mindestanzahl der Offsets, um die die Streaming-Anfrage hinter dem letzten verfügbaren Offset aller abonnierten Themen zurückliegt. |
sink-Objekt (Kafka)
Metrik | Beschreibung |
---|---|
sink.description |
Name der Senke, in die die Streaming-Abfrage schreibt. Beispiel: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Anzahl der Zeilen, die als Teil des Mikro-Batch in die Ausgabetabelle oder Senke geschrieben wurden. In einigen Situationen kann dieser Wert „-1“ sein und kann im Allgemeinen als „unbekannt“ interpretiert werden. |
sources-Objekt (Delta Lake)
Metrik | Beschreibung |
---|---|
sources.description |
Name der Quelle, aus der die Streaming-Abfrage gelesen wird. Beispiel: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Version der Serialisierung, mit der dieser Offset kodiert ist. |
sources.[startOffset/endOffset].reservoirId |
ID der Tabelle, aus der Sie lesen. Dies wird verwendet, um Fehlkonfigurationen beim Neustart einer Abfrage zu erkennen. |
sources.[startOffset/endOffset].reservoirVersion |
Version der Tabelle, die Sie gerade bearbeiten. |
sources.[startOffset/endOffset].index |
Index in der Sequenz von AddFiles in dieser Version. Dies wird verwendet, um große Commits in mehrere Batches aufzuteilen. Dieser Index wird durch Sortieren nach modificationTimestamp und path erstellt. |
sources.[startOffset/endOffset].isStartingVersion |
Gibt an, ob dieser Offset eine Abfrage bezeichnet, die gerade beginnt, anstatt Änderungen zu verarbeiten. Wenn Sie eine neue Abfrage starten, werden zunächst alle Daten verarbeitet, die sich zu Beginn in der Tabelle befinden, und dann die neu hinzugekommenen Daten. |
sources.latestOffset |
Letzter vom Mikro-Batch verarbeiteter Offset. |
sources.numInputRows |
Anzahl der verarbeiteten Eingabezeilen aus dieser Quelle. |
sources.inputRowsPerSecond |
Rate, mit der die Daten für diese Quelle zur Verarbeitung ankommen. |
sources.processedRowsPerSecond |
Rate, mit der Spark die Daten für diese Quelle verarbeitet. |
sources.metrics.numBytesOutstanding |
Gesamtgröße der ausstehenden Dateien (von RocksDB nachverfolgte Dateien). Dies ist die Backlog-Metrik für Delta und Autoloader als Streaming-Quelle. |
sources.metrics.numFilesOutstanding |
Anzahl der ausstehenden Dateien, die verarbeitet werden müssen. Dies ist die Backlog-Metrik für Delta und Autoloader als Streaming-Quelle. |
sink-Objekt (Delta Lake)
Metrik | Beschreibung |
---|---|
sink.description |
Name der Senke, in die die Streaming-Abfrage schreibt. Beispiel: “DeltaSink[table]” . |
sink.numOutputRows |
Die Anzahl der Zeilen in dieser Metrik ist „-1“, da Spark keine Ausgabezeilen für DSv1-Senken ableiten kann, was die Klassifizierung für die Delta Lake-Senke ist. |
Beispiele
Beispielereignis Kafka-zu-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Beispielereignis Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Beispielereignis Raten-Quelle zu Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}