Dela via


Övervaka frågor om strukturerad direktuppspelning i Azure Databricks

Azure Databricks tillhandahåller inbyggd övervakning för program för strukturerad direktuppspelning via Spark-användargränssnittet under fliken Direktuppspelning .

Särskilja strukturerade strömningsfrågor i Spark-användargränssnittet

Ge dina strömmar ett unikt frågenamn genom att lägga .queryName(<query-name>) till i koden writeStream för att enkelt skilja vilka mått som tillhör vilken ström i Spark-användargränssnittet.

Skicka mått för strukturerad direktuppspelning till externa tjänster

Mått för direktuppspelning kan skickas till externa tjänster för användningsfall för aviseringar eller instrumentpaneler med hjälp av Apache Sparks gränssnitt för strömningsfrågaslyssnare. I Databricks Runtime 11.3 LTS och senare är lyssnaren för strömningsfråga tillgänglig i Python och Scala.

Viktigt!

Autentiseringsuppgifter och objekt som hanteras av Unity Catalog kan inte användas i StreamingQueryListener logik.

Kommentar

Bearbetningssvarstid som är associerad med lyssnare kan påverka frågebearbetningen negativt. Databricks rekommenderar att du minimerar bearbetningslogiken i dessa lyssnare och skriver till mottagare med låg latens, till exempel Kafka.

Följande kod innehåller grundläggande exempel på syntaxen för att implementera en lyssnare:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definiera observerbara mått i strukturerad direktuppspelning

Observerbara mått namnges godtyckliga aggregeringsfunktioner som kan definieras i en fråga (DataFrame). Så snart körningen av en DataFrame når en slutförandepunkt (dvs. slutför en batchfråga eller når en strömmande epok) genereras en namngiven händelse som innehåller måtten för de data som bearbetats sedan den senaste slutförandepunkten.

Du kan observera dessa mått genom att koppla en lyssnare till Spark-sessionen. Lyssnaren är beroende av körningsläget:

  • Batch-läge: Använd QueryExecutionListener.

    QueryExecutionListener anropas när frågan är klar. Få åtkomst till måtten med hjälp av kartan QueryExecution.observedMetrics .

  • Direktuppspelning eller mikrobatch: Använd StreamingQueryListener.

    StreamingQueryListener anropas när strömningsfrågan slutför en epok. Få åtkomst till måtten med hjälp av kartan StreamingQueryProgress.observedMetrics . Azure Databricks stöder inte kontinuerlig körningsströmning.

Till exempel:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Mått för StreamingQueryListener-objekt

Mätvärde Beskrivning
id Unikt fråge-ID som bevaras mellan omstarter. Se StreamingQuery.id().
runId Unikt fråge-ID för varje start eller omstart. Se StreamingQuery.runId().
name Användarens angivna namn på frågan. Null om det inte anges.
timestamp Tidsstämpel för körning av mikrobatchen.
batchId Unikt ID för den aktuella batchen med data som bearbetas. Observera att vid återförsök efter ett fel kan ett visst batch-ID köras mer än en gång. På samma sätt ökar inte batch-ID:t när det inte finns några data som ska bearbetas.
numInputRows Aggregera (över alla källor) antalet poster som bearbetas i en utlösare.
inputRowsPerSecond Aggregera (över alla källor) frekvensen för ankommande data.
processedRowsPerSecond Aggregera (över alla källor) hastighet med vilken Spark bearbetar data.

durationMs-objekt

Information om den tid det tar att slutföra olika steg i mikrobatchkörningsprocessen.

Mätvärde Beskrivning
durationMs.addBatch Tiden det tar att köra mikrobatchen. Detta utesluter den tid spark tar att planera mikrobatchen.
durationMs.getBatch Tiden det tar att hämta metadata om förskjutningarna från källan.
durationMs.latestOffset Senaste förskjutning som förbrukats för mikrobatchen. Det här förloppsobjektet refererar till den tid det tar att hämta den senaste förskjutningen från källor.
durationMs.queryPlanning Tiden det tar att generera körningsplanen.
durationMs.triggerExecution Tiden det tar att planera och köra mikrobatchen.
durationMs.walCommit Tiden det tar att checka in de nya tillgängliga förskjutningarna.

eventTime-objekt

Information om händelsetidsvärdet som visas i de data som bearbetas i mikrobatchen. Dessa data används av vattenstämpeln för att ta reda på hur tillståndet kan trimmas för bearbetning av tillståndskänsliga aggregeringar som definierats i structured streaming-jobbet.

Mätvärde Beskrivning
eventTime.avg Genomsnittlig händelsetid som visas i utlösaren.
eventTime.max Maximal händelsetid som visas i utlösaren.
eventTime.min Minsta händelsetid som visas i utlösaren.
eventTime.watermark Värdet för vattenstämpeln som används i utlösaren.

stateOperators-objekt

Information om tillståndskänsliga åtgärder som definieras i structured streaming-jobbet och de sammansättningar som skapas från dem.

Mätvärde Beskrivning
stateOperators.operatorName Namnet på den tillståndskänsliga operator som måtten är relaterade till. Till exempel symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal Antal rader i tillståndet som ett resultat av den tillståndskänsliga operatorn eller aggregeringen.
stateOperators.numRowsUpdated Antal rader som uppdaterats i tillståndet till följd av den tillståndskänsliga operatorn eller aggregeringen.
stateOperators.numRowsRemoved Antal rader som tagits bort från tillståndet till följd av den tillståndskänsliga operatorn eller aggregeringen.
stateOperators.commitTimeMs Tiden det tar att checka in alla uppdateringar (placerar och tar bort) och returnerar en ny version.
stateOperators.memoryUsedBytes Minne som används av tillståndsarkivet.
stateOperators.numRowsDroppedByWatermark Antal rader som anses vara för sena för att inkluderas i den tillståndskänsliga aggregeringen. Endast strömmande aggregeringar: Antal rader som släppts efter aggregering och inte råa indatarader. Talet är inte exakt, men det kan tyda på att sena data tas bort.
stateOperators.numShufflePartitions Antal shuffle-partitioner för den här tillståndskänsliga operatorn.
stateOperators.numStateStoreInstances Faktisk tillståndslagerinstans som operatorn har initierat och underhållit. I många tillståndskänsliga operatorer är detta samma som antalet partitioner, men stream-stream join initierar fyra tillståndslagerinstanser per partition.

stateOperators.customMetrics-objekt

Information som samlas in från RocksDB som samlar in mått om dess prestanda och åtgärder med avseende på de tillståndskänsliga värden som den upprätthåller för structured streaming-jobbet. Mer information finns i Konfigurera RocksDB-tillståndslager på Azure Databricks.

Mätvärde Beskrivning
customMetrics.rocksdbBytesCopied Antal byte som kopierats enligt spårning av RocksDB-filhanteraren.
customMetrics.rocksdbCommitCheckpointLatency Tid i millisekunder för att ta en ögonblicksbild av inbyggda RocksDB och skriva den till en lokal katalog.
customMetrics.rocksdbCompactLatency Tid i millisekunder för komprimering (valfritt) under kontrollpunktsincheckningen.
customMetrics.rocksdbCommitFileSyncLatencyMs Tid i millisekunder för att synkronisera de inbyggda RocksDB-ögonblicksbildsrelaterade filerna till en extern lagringsplats (kontrollpunktsplats).
customMetrics.rocksdbCommitFlushLatency Tid i millisekunder för att rensa rocksDB-minnesinterna ändringar till din lokala disk.
customMetrics.rocksdbCommitPauseLatency Tid i millisekunder för att stoppa bakgrundsarbetstrådarna (till exempel för komprimering) som en del av kontrollpunktsincheckningen.
customMetrics.rocksdbCommitWriteBatchLatency Tid i millisekunder för att tillämpa mellanlagrade skrivningar i minnesintern struktur (WriteBatch) på interna RocksDB.
customMetrics.rocksdbFilesCopied Antal filer som kopierats som spårade av RocksDB-filhanteraren.
customMetrics.rocksdbFilesReused Antal filer som återanvänds enligt spåras av RocksDB-filhanteraren.
customMetrics.rocksdbGetCount Antal get anrop till databasen (detta inkluderar gets inte från WriteBatch: Minnesintern batch som används för mellanlagring av skrivningar).
customMetrics.rocksdbGetLatency Genomsnittlig tid i nanosekunder för det underliggande interna RocksDB::Get anropet.
customMetrics.rocksdbReadBlockCacheHitCount Hur mycket av blockcacheminnet i RocksDB som är användbart eller inte och undvika lokala diskläsningar.
customMetrics.rocksdbReadBlockCacheMissCount Hur mycket av blockcacheminnet i RocksDB som är användbart eller inte och undvika lokala diskläsningar.
customMetrics.rocksdbSstFileSize Storleken på alla SST-filer. SST står för Static Sorted Table, som är den tabellstruktur som RocksDB använder för att lagra data.
customMetrics.rocksdbTotalBytesRead Antal okomprimerade byte som lästs av get åtgärder.
customMetrics.rocksdbTotalBytesReadByCompaction Antal byte som komprimeringsprocessen läser från disken.
customMetrics.rocksdbTotalBytesReadThroughIterator Vissa tillståndskänsliga åtgärder (till exempel timeoutbearbetning i FlatMapGroupsWithState och vattenstämpling) kräver att data läss i DB via en iterator. Det här måttet representerar storleken på okomprimerade data som läss med iteratorn.
customMetrics.rocksdbTotalBytesWritten Antal okomprimerade byte som skrivits av put åtgärder.
customMetrics.rocksdbTotalBytesWrittenByCompaction Antal byte som komprimeringsprocessen skriver till disken.
customMetrics.rocksdbTotalCompactionLatencyMs Tids millisekunder för RocksDB-komprimering, inklusive bakgrundskomprimeringar och den valfria komprimering som initierades under incheckningen.
customMetrics.rocksdbTotalFlushLatencyMs Tömningstid, inklusive bakgrundsspolning. Tömningsåtgärder är processer med vilka MemTable töms till lagring när den är full. MemTables är den första nivån där data lagras i RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed RocksDB File Manager hanterar den fysiska SST-filens diskutrymmesanvändning och borttagning. Det här måttet representerar de okomprimerade zip-filerna i byte som rapporterats av Filhanteraren.

källobjekt (Kafka)

Mätvärde Beskrivning
sources.description Namnet på källan som strömningsfrågan läser från. Exempel: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”
sources.startOffset Objekt Starta förskjutningsnummer i Kafka-ämnet som strömningsjobbet startade på.
sources.endOffset Objekt Den senaste förskjutningen som bearbetas av mikrobatchen. Detta kan vara lika med latestOffset för en pågående mikrobatchkörning.
sources.latestOffset Objekt Den senaste förskjutningen räknas av mikrobatchen. När det finns begränsningar kanske mikrobatchprocessen inte bearbetar alla förskjutningar, vilket orsakar endOffset och latestOffset skiljer sig åt.
sources.numInputRows Antal indatarader som bearbetas från den här källan.
sources.inputRowsPerSecond Hastighet med vilken data anländer för bearbetning för den här källan.
sources.processedRowsPerSecond Hastighet med vilken Spark bearbetar data för den här källan.

sources.metrics-objekt (Kafka)

Mätvärde Beskrivning
sources.metrics.avgOffsetsBehindLatest Genomsnittligt antal förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumerationsämnen.
sources.metrics.estimatedTotalBytesBehindLatest Beräknat antal byte som frågeprocessen inte har förbrukat från de prenumererade ämnena.
sources.metrics.maxOffsetsBehindLatest Maximalt antal förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumerationsämnen.
sources.metrics.minOffsetsBehindLatest Minsta antal förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumerationsämnen.

mottagarobjekt (Kafka)

Mätvärde Beskrivning
sink.description Namnet på mottagaren som strömningsfrågan skriver till. Exempel: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”
sink.numOutputRows Antal rader som skrivits till utdatatabellen eller mottagaren som en del av mikrobatchen. I vissa situationer kan det här värdet vara "-1" och kan i allmänhet tolkas som "okänt".

källobjekt (Delta Lake)

Mätvärde Beskrivning
sources.description Namnet på källan som strömningsfrågan läser från. Exempel: “DeltaSource[table]”
sources.[startOffset/endOffset].sourceVersion Version av serialisering som den här förskjutningen kodas med.
sources.[startOffset/endOffset].reservoirId ID för tabellen som du läser från. Detta används för att identifiera felkonfiguration vid omstart av en fråga.
sources.[startOffset/endOffset].reservoirVersion Version av tabellen som du bearbetar för närvarande.
sources.[startOffset/endOffset].index Index i sekvensen AddFiles i den här versionen. Detta används för att dela upp stora incheckningar i flera batchar. Det här indexet skapas genom sortering på modificationTimestamp och path.
sources.[startOffset/endOffset].isStartingVersion Om den här förskjutningen anger en fråga som startar i stället för att bearbeta ändringar. När du startar en ny fråga bearbetas alla data som finns i tabellen i början och sedan nya data som har anlänt.
sources.latestOffset Den senaste förskjutningen som bearbetas av mikrobatchfrågan.
sources.numInputRows Antal indatarader som bearbetas från den här källan.
sources.inputRowsPerSecond Hastighet med vilken data anländer för bearbetning för den här källan.
sources.processedRowsPerSecond Hastighet med vilken Spark bearbetar data för den här källan.
sources.metrics.numBytesOutstanding Storleken på de utestående filerna (filer som spåras av RocksDB) tillsammans. Det här är måttet för kvarvarande uppgifter för Delta och Auto Loader som strömningskälla.
sources.metrics.numFilesOutstanding Antal utestående filer som ska bearbetas. Det här är måttet för kvarvarande uppgifter för Delta och Auto Loader som strömningskälla.

mottagarobjekt (Delta Lake)

Mätvärde Beskrivning
sink.description Namnet på mottagaren som den strömmande frågan skriver till. Exempel: “DeltaSink[table]”
sink.numOutputRows Antalet rader i det här måttet är "-1" eftersom Spark inte kan härleda utdatarader för DSv1-mottagare, vilket är klassificeringen för Delta Lake-mottagare.

Exempel

Exempel på Kafka-to-Kafka StreamingQueryListener-händelse

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Exempel på Delta Lake-to-Delta Lake StreamingQueryListener-händelse

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Exempel på frekvenskälla till Delta Lake StreamingQueryListener-händelse

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}