Monitoramento de consultas de Streaming Estruturado no Azure Databricks
O Azure Databricks fornece monitoramento interno para aplicativos de Fluxo Estruturado por meio da interface do usuário do Spark na guia Fluxo.
Distinguir consultas de Streaming Estruturado na interface do usuário do Spark
Forneça aos fluxos um nome de consulta exclusivo adicionando .queryName(<query-name>)
ao código writeStream
para distinguir facilmente quais métricas pertencem a qual fluxo na interface do usuário do Spark.
Enviar métricas de Streaming Estruturado por push para serviços externos
As métricas de streaming podem ser enviadas por push para serviços externos a fim de gerar alertas ou criar painéis de casos de uso com a interface do Ouvinte de Consulta de Streaming do Apache Spark. No Databricks Runtime 11.3 LTS e superior, o Streaming Query Listener está disponível em Python e Scala.
Importante
Credenciais e objetos gerenciados pelo Catálogo do Unity não podem ser usados na lógica StreamingQueryListener
.
Observação
A latência de processamento associada aos ouvintes pode afetar negativamente o processamento de consulta. O Databricks recomenda minimizar a lógica de processamento nesses ouvintes e gravar em coletores de baixa latência, como Kafka.
O código a seguir fornece exemplos básicos da sintaxe para implementar um ouvinte:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definindo métricas observáveis no Streaming Estruturado
As métricas observáveis são chamadas de funções de agregação arbitrárias e podem ser definidas em uma consulta (DataFrame). Assim que a execução de um DataFrame atinge um ponto de conclusão (ou seja, conclui uma consulta em lote ou atinge uma época de streaming), um evento nomeado é emitido contendo as métricas para os dados processados desde o último ponto de conclusão.
É possível observar essas métricas anexando um ouvinte à sessão do Spark. O ouvinte depende do modo de execução:
Modo de lote: use
QueryExecutionListener
.QueryExecutionListener
é chamado quando a consulta é concluída. Acesse as métricas usando o mapaQueryExecution.observedMetrics
.Streaming ou micro-lote: use
StreamingQueryListener
.StreamingQueryListener
é chamado quando a consulta de streaming conclui uma época. Acesse as métricas usando o mapaStreamingQueryProgress.observedMetrics
. O Azure Databricks não dá suporte ao streaming de execução contínua.
Por exemplo:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
métricas do objeto StreamingQueryListener
Métrica | Descrição |
---|---|
id |
ID de consulta exclusiva que persiste em todas as reinicializações. Consulte StreamingQuery.id(). |
runId |
ID de consulta exclusiva para cada início ou reinício. Consulte StreamingQuery.runId(). |
name |
Nome da consulta especificado pelo usuário. Nulo se não for especificado. |
timestamp |
Carimbo de data/hora para a execução do microlote. |
batchId |
ID exclusiva para o lote atual de dados que está sendo processado. Observe que, no caso de novas tentativas após uma falha, uma determinada ID de lote pode ser executada mais de uma vez. De modo semelhante, quando não existem dados a serem processados, a ID do lote não é incrementada. |
numInputRows |
Número de agregações (em todas as fontes) de registros processados em um gatilho. |
inputRowsPerSecond |
Taxa de agregação (em todas as fontes) de chegada de dados. |
processedRowsPerSecond |
Taxa de agregação (em todas as fontes) na qual o Spark está processando dados. |
objeto durationMs
Informações sobre o tempo necessário para concluir várias fases do processo de execução do microlote.
Métrica | Descrição |
---|---|
durationMs.addBatch |
Tempo necessário para executar o microlote. Isso exclui o tempo que o Spark leva para planejar o microlote. |
durationMs.getBatch |
Tempo que leva para recuperar os metadados sobre os deslocamentos da origem. |
durationMs.latestOffset |
Último deslocamento consumido para o microlote. Esse objeto de progresso se refere ao tempo necessário para recuperar o deslocamento mais recente das fontes. |
durationMs.queryPlanning |
Tempo necessário para gerar o plano de execução. |
durationMs.triggerExecution |
Tempo necessário para planejar e executar o microlote. |
durationMs.walCommit |
Tempo necessário para confirmar os novos deslocamentos disponíveis. |
objeto eventTime
Informações sobre o valor do tempo do evento visto nos dados que estão sendo processados no microlote. Esses dados são utilizados pela marca d'água para descobrir como cortar o estado para processar agregações com estado definidas no trabalho de Fluxo Estruturado.
Métrica | Descrição |
---|---|
eventTime.avg |
Tempo médio do evento visto no gatilho. |
eventTime.max |
Tempo máximo do evento visto no gatilho. |
eventTime.min |
Tempo mínimo do evento visto no gatilho. |
eventTime.watermark |
Valor da marca d'água utilizada no gatilho. |
objeto stateOperators
Informações sobre as operações com estado definidas no trabalho de Fluxo Estruturado e as agregações que são produzidas a partir delas.
Métrica | Descrição |
---|---|
stateOperators.operatorName |
Nome do operador com estado ao qual as métricas estão relacionadas. Por exemplo, symmetricHashJoin , dedupe , stateStoreSave . |
stateOperators.numRowsTotal |
Número de linhas no estado como resultado do operador ou agregação com estado. |
stateOperators.numRowsUpdated |
Número de linhas atualizadas no estado como resultado do operador ou agregação com estado. |
stateOperators.numRowsRemoved |
Número de linhas removidas do estado como resultado da agregação ou do operador com estado. |
stateOperators.commitTimeMs |
Tempo necessário para confirmar todas as atualizações (colocações e remoções) e retornar uma nova versão. |
stateOperators.memoryUsedBytes |
Memória utilizada pelo repositório de estado. |
stateOperators.numRowsDroppedByWatermark |
Número de linhas que são consideradas muito atrasadas para serem incluídas na agregação com estado. Apenas agregações de streaming: número de linhas removidas após a agregação, e não as linhas de entrada brutas. O número não é preciso, mas pode indicar que os dados atrasados estão sendo removidos. |
stateOperators.numShufflePartitions |
Número de partições aleatórias para esse operador com estado. |
stateOperators.numStateStoreInstances |
Instância real do repositório de estado que o operador inicializou e manteve. Em muitos operadores com estado, esse é o mesmo número de partições, mas a junção de fluxo contínuo inicializa quatro instâncias de repositório de estado por partição. |
Objeto stateOperators.customMetrics
As informações coletadas do RocksDB que capturam métricas sobre seu desempenho e operações com relação aos valores com estado que ele mantém para o trabalho de Fluxo Estruturado. Para mais informações, confira Configurar o repositório de estado do RocksDB no Azure Databricks.
Métrica | Descrição |
---|---|
customMetrics.rocksdbBytesCopied |
Número de bytes copiados, conforme rastreado pelo gerenciador de arquivos do RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
Tempo, em milissegundos, para tirar um instantâneo do RocksDB nativo e gravá-lo em um diretório local. |
customMetrics.rocksdbCompactLatency |
Tempo em milissegundos para compactação (opcional) durante a confirmação do ponto de verificação. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Tempo em milissegundos para sincronizar os arquivos nativos relacionados ao instantâneo do RocksDB em um armazenamento externo (local do ponto de verificação). |
customMetrics.rocksdbCommitFlushLatency |
Tempo em milissegundos para liberar as alterações na memória do RocksDB do seu disco local. |
customMetrics.rocksdbCommitPauseLatency |
Tempo, em milissegundos, para interromper as threads de trabalho em segundo plano (por exemplo, para compactação) como parte do ponto de verificação confirmado. |
customMetrics.rocksdbCommitWriteBatchLatency |
Tempo em milissegundos para aplicar as gravações em etapas na estrutura na memória (WriteBatch ) ao RocksDB nativo. |
customMetrics.rocksdbFilesCopied |
Número de arquivos copiados conforme rastreado pelo Gerenciador de arquivos do RocksDB. |
customMetrics.rocksdbFilesReused |
Número de arquivos reutilizados conforme rastreado pelo Gerenciador de arquivos do RocksDB. |
customMetrics.rocksdbGetCount |
Número de get chamadas ao banco de dados (isso não inclui gets de WriteBatch : lote na memória utilizado para processo de preparo de gravações). |
customMetrics.rocksdbGetLatency |
Tempo médio em nanossegundos para a chamada nativa RocksDB::Get subjacente. |
customMetrics.rocksdbReadBlockCacheHitCount |
Quanto do cache de blocos no RocksDB é útil ou não e como evitar leituras no disco local. |
customMetrics.rocksdbReadBlockCacheMissCount |
Quanto do cache de blocos no RocksDB é útil ou não e como evitar leituras no disco local. |
customMetrics.rocksdbSstFileSize |
Tamanho de todos os arquivos SST. SST significa Tabela Classificada Estática, que é a estrutura tabular que o RocksDB utiliza para armazenar dados. |
customMetrics.rocksdbTotalBytesRead |
Número de bytes não compactados lidos por operações de get . |
customMetrics.rocksdbTotalBytesReadByCompaction |
Número de bytes da leitura do processo de compactação do disco. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Algumas das operações de estado (por exemplo, o processamento de tempo limite em FlatMapGroupsWithState e a marca d'água) exigem a leitura de dados no BD por meio de um iterador. Essa métrica representa o tamanho dos dados não compactados lidos usando o iterador. |
customMetrics.rocksdbTotalBytesWritten |
Número de bytes não compactados gravados por operações put . |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Número de bytes que o processo de compactação grava no disco. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Tempo em milissegundos para compactações do RocksDB, incluindo compactações em segundo plano e a compactação opcional iniciada durante a confirmação. |
customMetrics.rocksdbTotalFlushLatencyMs |
Tempo de liberação, incluindo a liberação em segundo plano. As operações de liberação são processos pelos quais a MemTable é liberado no armazenamento uma vez que esteja cheio. As MemTables são o primeiro nível em que os dados são armazenados no RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
O Gerenciador de Arquivos do RocksDB gerencia a utilização e a exclusão do espaço em disco do arquivo SST. Essa métrica representa os arquivos zip não compactados em bytes, conforme relatado pelo Gerenciador de Arquivos. |
objeto de origem (Kafka)
Métrica | Descrição |
---|---|
sources.description |
Nome da fonte da qual a consulta de streaming está lendo. Por exemplo, “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
Objeto sources.startOffset |
Iniciando o número de deslocamento no tópico do Kafka em que o trabalho de streaming foi iniciado. |
Objeto sources.endOffset |
Último deslocamento processado pelo microlote. Isso pode ser igual a latestOffset para uma execução do microlote em andamento. |
Objeto sources.latestOffset |
Último deslocamento calculado pelo microlote. Quando existe limitação, o processo de microlote pode não processar todos os deslocamentos, fazendo com que endOffset e latestOffset sejam diferentes. |
sources.numInputRows |
Número de linhas de entrada processadas a partir dessa fonte. |
sources.inputRowsPerSecond |
Taxa na qual os dados estão chegando para processamento desta fonte. |
sources.processedRowsPerSecond |
Taxa na qual o Spark está processando os dados para essa fonte. |
Objeto sources.metrics (Kafka)
Métrica | Descrição |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Número médio de deslocamentos em que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos. |
sources.metrics.estimatedTotalBytesBehindLatest |
Número estimado de bytes que o processo de consulta não consumiu dos tópicos assinados. |
sources.metrics.maxOffsetsBehindLatest |
Número máximo de deslocamentos em que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos. |
sources.metrics.minOffsetsBehindLatest |
Número mínimo de deslocamentos em que a consulta de streaming está por trás do último deslocamento disponível entre todos os tópicos inscritos. |
objeto coletor (Kafka)
Métrica | Descrição |
---|---|
sink.description |
Nome do coletor no qual a consulta de streaming está gravando. Por exemplo, “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Número de linha gravadas no coletor ou na tabela de saída como parte do microlote. Em algumas situações, esse valor pode ser "-1" e, em geral, pode ser interpretado como "desconhecido". |
objeto de origem (Delta Lake)
Métrica | Descrição |
---|---|
sources.description |
Nome da fonte da qual a consulta de streaming está lendo. Por exemplo, “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Versão da serialização com a qual esse deslocamento é codificado. |
sources.[startOffset/endOffset].reservoirId |
ID da tabela da qual você está lendo. Isso é utilizado para detectar a configuração incorreta ao reiniciar uma consulta. |
sources.[startOffset/endOffset].reservoirVersion |
Versão da tabela que está sendo processada no momento. |
sources.[startOffset/endOffset].index |
Índice na sequência de AddFiles nesta versão. É utilizado para interromper grandes confirmações em vários lotes. Esse índice é criado pela classificação em modificationTimestamp e path . |
sources.[startOffset/endOffset].isStartingVersion |
Se esse deslocamento denota uma consulta que está iniciando, ao invés de processar alterações. Ao iniciar uma nova consulta, todos os dados presentes na tabela no início são processados e, em seguida, os novos dados que chegaram. |
sources.latestOffset |
Último deslocamento processado pela consulta do microlote. |
sources.numInputRows |
Número de linhas de entrada processadas a partir dessa fonte. |
sources.inputRowsPerSecond |
Taxa na qual os dados estão chegando para processamento desta fonte. |
sources.processedRowsPerSecond |
Taxa na qual o Spark está processando os dados para essa fonte. |
sources.metrics.numBytesOutstanding |
Tamanho dos arquivos pendentes (arquivos rastreados pelo RocksDB) combinados. Essa é a métrica de lista de pendências para a Delta e o Carregador Automático como fonte de streaming. |
sources.metrics.numFilesOutstanding |
Número de arquivos pendentes que serão processados. Essa é a métrica de lista de pendências para a Delta e o Carregador Automático como fonte de streaming. |
objeto de coletor (Delta Lake)
Métrica | Descrição |
---|---|
sink.description |
Nome do coletor no qual a consulta de streaming grava. Por exemplo, “DeltaSink[table]” . |
sink.numOutputRows |
O número de linhas nessa métrica é "-1" porque o Spark não pode inferir linhas de saída para coletores DSv1, que é a classificação para a Sincronização Delta Lake. |
Exemplos
Exemplo de evento StreamingQueryListener do Kafka para Kafka
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Exemplo de evento StreamingQueryListener do Delta Lake-to-Delta Lake
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Fonte de taxa de exemplo para o evento StreamingQueryListener do Delta Lake
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}
Comentários
https://aka.ms/ContentUserFeedback.
Em breve: Ao longo de 2024, eliminaremos os problemas do GitHub como o mecanismo de comentários para conteúdo e o substituiremos por um novo sistema de comentários. Para obter mais informações, consulteEnviar e exibir comentários de