Compartilhar via


Monitoramento de consultas de Streaming Estruturado no Azure Databricks

O Azure Databricks fornece monitoramento interno para aplicativos de Fluxo Estruturado por meio da interface do usuário do Spark na guia Fluxo.

Distinguir consultas de Streaming Estruturado na interface do usuário do Spark

Forneça aos fluxos um nome de consulta exclusivo adicionando .queryName(<query-name>) ao código writeStream para distinguir facilmente quais métricas pertencem a qual fluxo na interface do usuário do Spark.

Enviar métricas de Streaming Estruturado por push para serviços externos

As métricas de streaming podem ser enviadas por push para serviços externos a fim de gerar alertas ou criar painéis de casos de uso com a interface do Ouvinte de Consulta de Streaming do Apache Spark. No Databricks Runtime 11.3 LTS e superior, o Streaming Query Listener está disponível em Python e Scala.

Importante

Credenciais e objetos gerenciados pelo Catálogo do Unity não podem ser usados na lógica StreamingQueryListener.

Observação

A latência de processamento associada aos ouvintes pode afetar negativamente o processamento de consulta. O Databricks recomenda minimizar a lógica de processamento nesses ouvintes e gravar em coletores de baixa latência, como Kafka.

O código a seguir fornece exemplos básicos da sintaxe para implementar um ouvinte:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definindo métricas observáveis no Streaming Estruturado

As métricas observáveis são chamadas de funções de agregação arbitrárias e podem ser definidas em uma consulta (DataFrame). Assim que a execução de um DataFrame atinge um ponto de conclusão (ou seja, conclui uma consulta em lote ou atinge uma época de streaming), um evento nomeado é emitido contendo as métricas para os dados processados desde o último ponto de conclusão.

É possível observar essas métricas anexando um ouvinte à sessão do Spark. O ouvinte depende do modo de execução:

  • Modo de lote: use QueryExecutionListener.

    QueryExecutionListener é chamado quando a consulta é concluída. Acesse as métricas usando o mapa QueryExecution.observedMetrics.

  • Streaming ou micro-lote: use StreamingQueryListener.

    StreamingQueryListener é chamado quando a consulta de streaming conclui uma época. Acesse as métricas usando o mapa StreamingQueryProgress.observedMetrics. O Azure Databricks não dá suporte ao streaming de execução contínua.

Por exemplo:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

métricas do objeto StreamingQueryListener

Métrica Descrição
id ID de consulta exclusiva que persiste em todas as reinicializações. Consulte StreamingQuery.id().
runId ID de consulta exclusiva para cada início ou reinício. Consulte StreamingQuery.runId().
name Nome da consulta especificado pelo usuário. Nulo se não for especificado.
timestamp Carimbo de data/hora para a execução do microlote.
batchId ID exclusiva para o lote atual de dados que está sendo processado. Observe que, no caso de novas tentativas após uma falha, uma determinada ID de lote pode ser executada mais de uma vez. De modo semelhante, quando não existem dados a serem processados, a ID do lote não é incrementada.
numInputRows Número de agregações (em todas as fontes) de registros processados em um gatilho.
inputRowsPerSecond Taxa de agregação (em todas as fontes) de chegada de dados.
processedRowsPerSecond Taxa de agregação (em todas as fontes) na qual o Spark está processando dados.

objeto durationMs

Informações sobre o tempo necessário para concluir várias fases do processo de execução do microlote.

Métrica Descrição
durationMs.addBatch Tempo necessário para executar o microlote. Isso exclui o tempo que o Spark leva para planejar o microlote.
durationMs.getBatch Tempo que leva para recuperar os metadados sobre os deslocamentos da origem.
durationMs.latestOffset Último deslocamento consumido para o microlote. Esse objeto de progresso se refere ao tempo necessário para recuperar o deslocamento mais recente das fontes.
durationMs.queryPlanning Tempo necessário para gerar o plano de execução.
durationMs.triggerExecution Tempo necessário para planejar e executar o microlote.
durationMs.walCommit Tempo necessário para confirmar os novos deslocamentos disponíveis.

objeto eventTime

Informações sobre o valor do tempo do evento visto nos dados que estão sendo processados no microlote. Esses dados são utilizados pela marca d'água para descobrir como cortar o estado para processar agregações com estado definidas no trabalho de Fluxo Estruturado.

Métrica Descrição
eventTime.avg Tempo médio do evento visto no gatilho.
eventTime.max Tempo máximo do evento visto no gatilho.
eventTime.min Tempo mínimo do evento visto no gatilho.
eventTime.watermark Valor da marca d'água utilizada no gatilho.

objeto stateOperators

Informações sobre as operações com estado definidas no trabalho de Fluxo Estruturado e as agregações que são produzidas a partir delas.

Métrica Descrição
stateOperators.operatorName Nome do operador com estado ao qual as métricas estão relacionadas. Por exemplo, symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal Número de linhas no estado como resultado do operador ou agregação com estado.
stateOperators.numRowsUpdated Número de linhas atualizadas no estado como resultado do operador ou agregação com estado.
stateOperators.numRowsRemoved Número de linhas removidas do estado como resultado da agregação ou do operador com estado.
stateOperators.commitTimeMs Tempo necessário para confirmar todas as atualizações (colocações e remoções) e retornar uma nova versão.
stateOperators.memoryUsedBytes Memória utilizada pelo repositório de estado.
stateOperators.numRowsDroppedByWatermark Número de linhas que são consideradas muito atrasadas para serem incluídas na agregação com estado. Apenas agregações de streaming: número de linhas removidas após a agregação, e não as linhas de entrada brutas. O número não é preciso, mas pode indicar que os dados atrasados estão sendo removidos.
stateOperators.numShufflePartitions Número de partições aleatórias para esse operador com estado.
stateOperators.numStateStoreInstances Instância real do repositório de estado que o operador inicializou e manteve. Em muitos operadores com estado, esse é o mesmo número de partições, mas a junção de fluxo contínuo inicializa quatro instâncias de repositório de estado por partição.

Objeto stateOperators.customMetrics

As informações coletadas do RocksDB que capturam métricas sobre seu desempenho e operações com relação aos valores com estado que ele mantém para o trabalho de Fluxo Estruturado. Para mais informações, confira Configurar o repositório de estado do RocksDB no Azure Databricks.

Métrica Descrição
customMetrics.rocksdbBytesCopied Número de bytes copiados, conforme rastreado pelo gerenciador de arquivos do RocksDB.
customMetrics.rocksdbCommitCheckpointLatency Tempo, em milissegundos, para tirar um instantâneo do RocksDB nativo e gravá-lo em um diretório local.
customMetrics.rocksdbCompactLatency Tempo em milissegundos para compactação (opcional) durante a confirmação do ponto de verificação.
customMetrics.rocksdbCommitFileSyncLatencyMs Tempo em milissegundos para sincronizar os arquivos nativos relacionados ao instantâneo do RocksDB em um armazenamento externo (local do ponto de verificação).
customMetrics.rocksdbCommitFlushLatency Tempo em milissegundos para liberar as alterações na memória do RocksDB do seu disco local.
customMetrics.rocksdbCommitPauseLatency Tempo, em milissegundos, para interromper as threads de trabalho em segundo plano (por exemplo, para compactação) como parte do ponto de verificação confirmado.
customMetrics.rocksdbCommitWriteBatchLatency Tempo em milissegundos para aplicar as gravações em etapas na estrutura na memória (WriteBatch) ao RocksDB nativo.
customMetrics.rocksdbFilesCopied Número de arquivos copiados conforme rastreado pelo Gerenciador de arquivos do RocksDB.
customMetrics.rocksdbFilesReused Número de arquivos reutilizados conforme rastreado pelo Gerenciador de arquivos do RocksDB.
customMetrics.rocksdbGetCount Número de get chamadas ao banco de dados (isso não inclui gets de WriteBatch: lote na memória utilizado para processo de preparo de gravações).
customMetrics.rocksdbGetLatency Tempo médio em nanossegundos para a chamada nativa RocksDB::Get subjacente.
customMetrics.rocksdbReadBlockCacheHitCount Quanto do cache de blocos no RocksDB é útil ou não e como evitar leituras no disco local.
customMetrics.rocksdbReadBlockCacheMissCount Quanto do cache de blocos no RocksDB é útil ou não e como evitar leituras no disco local.
customMetrics.rocksdbSstFileSize Tamanho de todos os arquivos SST. SST significa Tabela Classificada Estática, que é a estrutura tabular que o RocksDB utiliza para armazenar dados.
customMetrics.rocksdbTotalBytesRead Número de bytes não compactados lidos por operações de get.
customMetrics.rocksdbTotalBytesReadByCompaction Número de bytes da leitura do processo de compactação do disco.
customMetrics.rocksdbTotalBytesReadThroughIterator Algumas das operações de estado (por exemplo, o processamento de tempo limite em FlatMapGroupsWithState e a marca d'água) exigem a leitura de dados no BD por meio de um iterador. Essa métrica representa o tamanho dos dados não compactados lidos usando o iterador.
customMetrics.rocksdbTotalBytesWritten Número de bytes não compactados gravados por operações put.
customMetrics.rocksdbTotalBytesWrittenByCompaction Número de bytes que o processo de compactação grava no disco.
customMetrics.rocksdbTotalCompactionLatencyMs Tempo em milissegundos para compactações do RocksDB, incluindo compactações em segundo plano e a compactação opcional iniciada durante a confirmação.
customMetrics.rocksdbTotalFlushLatencyMs Tempo de liberação, incluindo a liberação em segundo plano. As operações de liberação são processos pelos quais a MemTable é liberado no armazenamento uma vez que esteja cheio. As MemTables são o primeiro nível em que os dados são armazenados no RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed O Gerenciador de Arquivos do RocksDB gerencia a utilização e a exclusão do espaço em disco do arquivo SST. Essa métrica representa os arquivos zip não compactados em bytes, conforme relatado pelo Gerenciador de Arquivos.

objeto de origem (Kafka)

Métrica Descrição
sources.description Nome da fonte da qual a consulta de streaming está lendo. Por exemplo, “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
Objeto sources.startOffset Iniciando o número de deslocamento no tópico do Kafka em que o trabalho de streaming foi iniciado.
Objeto sources.endOffset Último deslocamento processado pelo microlote. Isso pode ser igual a latestOffset para uma execução do microlote em andamento.
Objeto sources.latestOffset Último deslocamento calculado pelo microlote. Quando existe limitação, o processo de microlote pode não processar todos os deslocamentos, fazendo com que endOffset e latestOffset sejam diferentes.
sources.numInputRows Número de linhas de entrada processadas a partir dessa fonte.
sources.inputRowsPerSecond Taxa na qual os dados estão chegando para processamento desta fonte.
sources.processedRowsPerSecond Taxa na qual o Spark está processando os dados para essa fonte.

Objeto sources.metrics (Kafka)

Métrica Descrição
sources.metrics.avgOffsetsBehindLatest Número médio de deslocamentos em que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos.
sources.metrics.estimatedTotalBytesBehindLatest Número estimado de bytes que o processo de consulta não consumiu dos tópicos assinados.
sources.metrics.maxOffsetsBehindLatest Número máximo de deslocamentos em que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos.
sources.metrics.minOffsetsBehindLatest Número mínimo de deslocamentos em que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos.

objeto coletor (Kafka)

Métrica Descrição
sink.description Nome do coletor no qual a consulta de streaming está gravando. Por exemplo, “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Número de linha gravadas no coletor ou na tabela de saída como parte do microlote. Em algumas situações, esse valor pode ser "-1" e, em geral, pode ser interpretado como "desconhecido".

objeto de origem (Delta Lake)

Métrica Descrição
sources.description Nome da fonte da qual a consulta de streaming está lendo. Por exemplo, “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Versão da serialização com a qual esse deslocamento é codificado.
sources.[startOffset/endOffset].reservoirId ID da tabela da qual você está lendo. Isso é utilizado para detectar a configuração incorreta ao reiniciar uma consulta.
sources.[startOffset/endOffset].reservoirVersion Versão da tabela que está sendo processada no momento.
sources.[startOffset/endOffset].index Índice na sequência de AddFiles nesta versão. É utilizado para interromper grandes confirmações em vários lotes. Esse índice é criado pela classificação em modificationTimestamp e path.
sources.[startOffset/endOffset].isStartingVersion Se esse deslocamento denota uma consulta que está iniciando, ao invés de processar alterações. Ao iniciar uma nova consulta, todos os dados presentes na tabela no início são processados e, em seguida, os novos dados que chegaram.
sources.latestOffset Último deslocamento processado pela consulta do microlote.
sources.numInputRows Número de linhas de entrada processadas a partir dessa fonte.
sources.inputRowsPerSecond Taxa na qual os dados estão chegando para processamento desta fonte.
sources.processedRowsPerSecond Taxa na qual o Spark está processando os dados para essa fonte.
sources.metrics.numBytesOutstanding Tamanho dos arquivos pendentes (arquivos rastreados pelo RocksDB) combinados. Essa é a métrica de lista de pendências para a Delta e o Carregador Automático como fonte de streaming.
sources.metrics.numFilesOutstanding Número de arquivos pendentes que serão processados. Essa é a métrica de lista de pendências para a Delta e o Carregador Automático como fonte de streaming.

objeto de coletor (Delta Lake)

Métrica Descrição
sink.description Nome do coletor no qual a consulta de streaming grava. Por exemplo, “DeltaSink[table]”.
sink.numOutputRows O número de linhas nessa métrica é "-1" porque o Spark não pode inferir linhas de saída para coletores DSv1, que é a classificação para a Sincronização Delta Lake.

Exemplos

Exemplo de evento StreamingQueryListener do Kafka para Kafka

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Exemplo de evento StreamingQueryListener do Delta Lake-to-Delta Lake

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Fonte de taxa de exemplo para o evento StreamingQueryListener do Delta Lake

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}