Övervaka frågor om strukturerad direktuppspelning i Azure Databricks
Azure Databricks tillhandahåller inbyggd övervakning för program för strukturerad direktuppspelning via Spark-användargränssnittet under fliken Direktuppspelning .
Särskilja strukturerade strömningsfrågor i Spark-användargränssnittet
Ge dina strömmar ett unikt frågenamn genom att lägga .queryName(<query-name>)
till i koden writeStream
för att enkelt skilja vilka mått som tillhör vilken ström i Spark-användargränssnittet.
Skicka mått för strukturerad direktuppspelning till externa tjänster
Mått för direktuppspelning kan skickas till externa tjänster för användningsfall för aviseringar eller instrumentpaneler med hjälp av Apache Sparks gränssnitt för strömningsfrågaslyssnare. I Databricks Runtime 11.3 LTS och senare är lyssnaren för strömningsfråga tillgänglig i Python och Scala.
Viktigt!
Autentiseringsuppgifter och objekt som hanteras av Unity Catalog kan inte användas i StreamingQueryListener
logik.
Kommentar
Bearbetningssvarstid som är associerad med lyssnare kan påverka frågebearbetningen negativt. Databricks rekommenderar att du minimerar bearbetningslogiken i dessa lyssnare och skriver till mottagare med låg latens, till exempel Kafka.
Följande kod innehåller grundläggande exempel på syntaxen för att implementera en lyssnare:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definiera observerbara mått i strukturerad direktuppspelning
Observerbara mått namnges godtyckliga aggregeringsfunktioner som kan definieras i en fråga (DataFrame). Så snart körningen av en DataFrame når en slutförandepunkt (dvs. slutför en batchfråga eller når en strömmande epok) genereras en namngiven händelse som innehåller måtten för de data som bearbetats sedan den senaste slutförandepunkten.
Du kan observera dessa mått genom att koppla en lyssnare till Spark-sessionen. Lyssnaren är beroende av körningsläget:
Batch-läge: Använd
QueryExecutionListener
.QueryExecutionListener
anropas när frågan är klar. Få åtkomst till måtten med hjälp av kartanQueryExecution.observedMetrics
.Direktuppspelning eller mikrobatch: Använd
StreamingQueryListener
.StreamingQueryListener
anropas när strömningsfrågan slutför en epok. Få åtkomst till måtten med hjälp av kartanStreamingQueryProgress.observedMetrics
. Azure Databricks stöder inte kontinuerlig körningsströmning.
Till exempel:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Mått för StreamingQueryListener-objekt
Mätvärde | Beskrivning |
---|---|
id |
Unikt fråge-ID som bevaras mellan omstarter. Se StreamingQuery.id(). |
runId |
Unikt fråge-ID för varje start eller omstart. Se StreamingQuery.runId(). |
name |
Användarens angivna namn på frågan. Null om det inte anges. |
timestamp |
Tidsstämpel för körning av mikrobatchen. |
batchId |
Unikt ID för den aktuella batchen med data som bearbetas. Observera att vid återförsök efter ett fel kan ett visst batch-ID köras mer än en gång. På samma sätt ökar inte batch-ID:t när det inte finns några data som ska bearbetas. |
numInputRows |
Aggregera (över alla källor) antalet poster som bearbetas i en utlösare. |
inputRowsPerSecond |
Aggregera (över alla källor) frekvensen för ankommande data. |
processedRowsPerSecond |
Aggregera (över alla källor) hastighet med vilken Spark bearbetar data. |
durationMs-objekt
Information om den tid det tar att slutföra olika steg i mikrobatchkörningsprocessen.
Mätvärde | Beskrivning |
---|---|
durationMs.addBatch |
Tiden det tar att köra mikrobatchen. Detta utesluter den tid spark tar att planera mikrobatchen. |
durationMs.getBatch |
Tiden det tar att hämta metadata om förskjutningarna från källan. |
durationMs.latestOffset |
Senaste förskjutning som förbrukats för mikrobatchen. Det här förloppsobjektet refererar till den tid det tar att hämta den senaste förskjutningen från källor. |
durationMs.queryPlanning |
Tiden det tar att generera körningsplanen. |
durationMs.triggerExecution |
Tiden det tar att planera och köra mikrobatchen. |
durationMs.walCommit |
Tiden det tar att checka in de nya tillgängliga förskjutningarna. |
eventTime-objekt
Information om händelsetidsvärdet som visas i de data som bearbetas i mikrobatchen. Dessa data används av vattenstämpeln för att ta reda på hur tillståndet kan trimmas för bearbetning av tillståndskänsliga aggregeringar som definierats i structured streaming-jobbet.
Mätvärde | Beskrivning |
---|---|
eventTime.avg |
Genomsnittlig händelsetid som visas i utlösaren. |
eventTime.max |
Maximal händelsetid som visas i utlösaren. |
eventTime.min |
Minsta händelsetid som visas i utlösaren. |
eventTime.watermark |
Värdet för vattenstämpeln som används i utlösaren. |
stateOperators-objekt
Information om tillståndskänsliga åtgärder som definieras i structured streaming-jobbet och de sammansättningar som skapas från dem.
Mätvärde | Beskrivning |
---|---|
stateOperators.operatorName |
Namnet på den tillståndskänsliga operator som måtten är relaterade till. Till exempel symmetricHashJoin , dedupe , stateStoreSave . |
stateOperators.numRowsTotal |
Antal rader i tillståndet som ett resultat av den tillståndskänsliga operatorn eller aggregeringen. |
stateOperators.numRowsUpdated |
Antal rader som uppdaterats i tillståndet till följd av den tillståndskänsliga operatorn eller aggregeringen. |
stateOperators.numRowsRemoved |
Antal rader som tagits bort från tillståndet till följd av den tillståndskänsliga operatorn eller aggregeringen. |
stateOperators.commitTimeMs |
Tiden det tar att checka in alla uppdateringar (placerar och tar bort) och returnerar en ny version. |
stateOperators.memoryUsedBytes |
Minne som används av tillståndsarkivet. |
stateOperators.numRowsDroppedByWatermark |
Antal rader som anses vara för sena för att inkluderas i den tillståndskänsliga aggregeringen. Endast strömmande aggregeringar: Antal rader som släppts efter aggregering och inte råa indatarader. Talet är inte exakt, men det kan tyda på att sena data tas bort. |
stateOperators.numShufflePartitions |
Antal shuffle-partitioner för den här tillståndskänsliga operatorn. |
stateOperators.numStateStoreInstances |
Faktisk tillståndslagerinstans som operatorn har initierat och underhållit. I många tillståndskänsliga operatorer är detta samma som antalet partitioner, men stream-stream join initierar fyra tillståndslagerinstanser per partition. |
stateOperators.customMetrics-objekt
Information som samlas in från RocksDB som samlar in mått om dess prestanda och åtgärder med avseende på de tillståndskänsliga värden som den upprätthåller för structured streaming-jobbet. Mer information finns i Konfigurera RocksDB-tillståndslager på Azure Databricks.
Mätvärde | Beskrivning |
---|---|
customMetrics.rocksdbBytesCopied |
Antal byte som kopierats enligt spårning av RocksDB-filhanteraren. |
customMetrics.rocksdbCommitCheckpointLatency |
Tid i millisekunder för att ta en ögonblicksbild av inbyggda RocksDB och skriva den till en lokal katalog. |
customMetrics.rocksdbCompactLatency |
Tid i millisekunder för komprimering (valfritt) under kontrollpunktsincheckningen. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Tid i millisekunder för att synkronisera de inbyggda RocksDB-ögonblicksbildsrelaterade filerna till en extern lagringsplats (kontrollpunktsplats). |
customMetrics.rocksdbCommitFlushLatency |
Tid i millisekunder för att rensa rocksDB-minnesinterna ändringar till din lokala disk. |
customMetrics.rocksdbCommitPauseLatency |
Tid i millisekunder för att stoppa bakgrundsarbetstrådarna (till exempel för komprimering) som en del av kontrollpunktsincheckningen. |
customMetrics.rocksdbCommitWriteBatchLatency |
Tid i millisekunder för att tillämpa mellanlagrade skrivningar i minnesintern struktur (WriteBatch ) på interna RocksDB. |
customMetrics.rocksdbFilesCopied |
Antal filer som kopierats som spårade av RocksDB-filhanteraren. |
customMetrics.rocksdbFilesReused |
Antal filer som återanvänds enligt spåras av RocksDB-filhanteraren. |
customMetrics.rocksdbGetCount |
Antal get anrop till databasen (detta inkluderar gets inte från WriteBatch : Minnesintern batch som används för mellanlagring av skrivningar). |
customMetrics.rocksdbGetLatency |
Genomsnittlig tid i nanosekunder för det underliggande interna RocksDB::Get anropet. |
customMetrics.rocksdbReadBlockCacheHitCount |
Hur mycket av blockcacheminnet i RocksDB som är användbart eller inte och undvika lokala diskläsningar. |
customMetrics.rocksdbReadBlockCacheMissCount |
Hur mycket av blockcacheminnet i RocksDB som är användbart eller inte och undvika lokala diskläsningar. |
customMetrics.rocksdbSstFileSize |
Storleken på alla SST-filer. SST står för Static Sorted Table, som är den tabellstruktur som RocksDB använder för att lagra data. |
customMetrics.rocksdbTotalBytesRead |
Antal okomprimerade byte som lästs av get åtgärder. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Antal byte som komprimeringsprocessen läser från disken. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Vissa tillståndskänsliga åtgärder (till exempel timeoutbearbetning i FlatMapGroupsWithState och vattenstämpling) kräver att data läss i DB via en iterator. Det här måttet representerar storleken på okomprimerade data som läss med iteratorn. |
customMetrics.rocksdbTotalBytesWritten |
Antal okomprimerade byte som skrivits av put åtgärder. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Antal byte som komprimeringsprocessen skriver till disken. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Tids millisekunder för RocksDB-komprimering, inklusive bakgrundskomprimeringar och den valfria komprimering som initierades under incheckningen. |
customMetrics.rocksdbTotalFlushLatencyMs |
Tömningstid, inklusive bakgrundsspolning. Tömningsåtgärder är processer med vilka MemTable töms till lagring när den är full. MemTables är den första nivån där data lagras i RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
RocksDB File Manager hanterar den fysiska SST-filens diskutrymmesanvändning och borttagning. Det här måttet representerar de okomprimerade zip-filerna i byte som rapporterats av Filhanteraren. |
källobjekt (Kafka)
Mätvärde | Beskrivning |
---|---|
sources.description |
Namnet på källan som strömningsfrågan läser från. Exempel: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” |
sources.startOffset Objekt |
Starta förskjutningsnummer i Kafka-ämnet som strömningsjobbet startade på. |
sources.endOffset Objekt |
Den senaste förskjutningen som bearbetas av mikrobatchen. Detta kan vara lika med latestOffset för en pågående mikrobatchkörning. |
sources.latestOffset Objekt |
Den senaste förskjutningen räknas av mikrobatchen. När det finns begränsningar kanske mikrobatchprocessen inte bearbetar alla förskjutningar, vilket orsakar endOffset och latestOffset skiljer sig åt. |
sources.numInputRows |
Antal indatarader som bearbetas från den här källan. |
sources.inputRowsPerSecond |
Hastighet med vilken data anländer för bearbetning för den här källan. |
sources.processedRowsPerSecond |
Hastighet med vilken Spark bearbetar data för den här källan. |
sources.metrics-objekt (Kafka)
Mätvärde | Beskrivning |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Genomsnittligt antal förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumerationsämnen. |
sources.metrics.estimatedTotalBytesBehindLatest |
Beräknat antal byte som frågeprocessen inte har förbrukat från de prenumererade ämnena. |
sources.metrics.maxOffsetsBehindLatest |
Maximalt antal förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumerationsämnen. |
sources.metrics.minOffsetsBehindLatest |
Minsta antal förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumerationsämnen. |
mottagarobjekt (Kafka)
Mätvärde | Beskrivning |
---|---|
sink.description |
Namnet på mottagaren som strömningsfrågan skriver till. Exempel: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” |
sink.numOutputRows |
Antal rader som skrivits till utdatatabellen eller mottagaren som en del av mikrobatchen. I vissa situationer kan det här värdet vara "-1" och kan i allmänhet tolkas som "okänt". |
källobjekt (Delta Lake)
Mätvärde | Beskrivning |
---|---|
sources.description |
Namnet på källan som strömningsfrågan läser från. Exempel: “DeltaSource[table]” |
sources.[startOffset/endOffset].sourceVersion |
Version av serialisering som den här förskjutningen kodas med. |
sources.[startOffset/endOffset].reservoirId |
ID för tabellen som du läser från. Detta används för att identifiera felkonfiguration vid omstart av en fråga. |
sources.[startOffset/endOffset].reservoirVersion |
Version av tabellen som du bearbetar för närvarande. |
sources.[startOffset/endOffset].index |
Index i sekvensen AddFiles i den här versionen. Detta används för att dela upp stora incheckningar i flera batchar. Det här indexet skapas genom sortering på modificationTimestamp och path . |
sources.[startOffset/endOffset].isStartingVersion |
Om den här förskjutningen anger en fråga som startar i stället för att bearbeta ändringar. När du startar en ny fråga bearbetas alla data som finns i tabellen i början och sedan nya data som har anlänt. |
sources.latestOffset |
Den senaste förskjutningen som bearbetas av mikrobatchfrågan. |
sources.numInputRows |
Antal indatarader som bearbetas från den här källan. |
sources.inputRowsPerSecond |
Hastighet med vilken data anländer för bearbetning för den här källan. |
sources.processedRowsPerSecond |
Hastighet med vilken Spark bearbetar data för den här källan. |
sources.metrics.numBytesOutstanding |
Storleken på de utestående filerna (filer som spåras av RocksDB) tillsammans. Det här är måttet för kvarvarande uppgifter för Delta och Auto Loader som strömningskälla. |
sources.metrics.numFilesOutstanding |
Antal utestående filer som ska bearbetas. Det här är måttet för kvarvarande uppgifter för Delta och Auto Loader som strömningskälla. |
mottagarobjekt (Delta Lake)
Mätvärde | Beskrivning |
---|---|
sink.description |
Namnet på mottagaren som den strömmande frågan skriver till. Exempel: “DeltaSink[table]” |
sink.numOutputRows |
Antalet rader i det här måttet är "-1" eftersom Spark inte kan härleda utdatarader för DSv1-mottagare, vilket är klassificeringen för Delta Lake-mottagare. |
Exempel
Exempel på Kafka-to-Kafka StreamingQueryListener-händelse
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Exempel på Delta Lake-to-Delta Lake StreamingQueryListener-händelse
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Exempel på frekvenskälla till Delta Lake StreamingQueryListener-händelse
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}
Feedback
https://aka.ms/ContentUserFeedback.
Kommer snart: Under hela 2024 kommer vi att fasa ut GitHub-problem som feedbackmekanism för innehåll och ersätta det med ett nytt feedbacksystem. Mer information finns i:Skicka och visa feedback för