Azure Databricks에서 구조적 스트리밍 쿼리 모니터링

Azure Databricks는 스트리밍 탭의 Spark UI를 통해 구조적 스트리밍 애플리케이션에 대한 기본 제공 모니터링을 제공합니다.

Spark UI에서 구조적 스트리밍 쿼리 구분

writeStream 코드에 .queryName(<query-name>)을 추가하여 Spark UI의 스트림에 속하는 메트릭을 쉽게 구분하여 스트림에 고유한 쿼리 이름을 제공합니다.

외부 서비스에 구조적 스트리밍 메트릭 푸시

Apache Spark의 스트리밍 쿼리 수신기 인터페이스를 사용하여 경고 또는 대시보드 사용 사례를 위해 스트리밍 메트릭을 외부 서비스로 푸시할 수 있습니다. Databricks Runtime 11.3 LTS 이상에서는 Python 및 Scala에서 스트리밍 쿼리 수신기를 사용할 수 있습니다.

Important

Unity 카탈로그에서 관리하는 자격 증명 및 개체는 논리에서 StreamingQueryListener 사용할 수 없습니다.

참고 항목

수신기와 연결된 처리 대기 시간은 쿼리 처리에 부정적인 영향을 줄 수 있습니다. Databricks는 이러한 수신기에서 처리 논리를 최소화하고 Kafka와 같은 짧은 대기 시간 싱크에 쓰는 것이 좋습니다.

다음 코드는 수신기를 구현하기 위한 구문의 기본 예제를 제공합니다.

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

구조적 스트리밍에서 관찰 가능한 메트릭 정의

관찰 가능한 메트릭은 쿼리(DataFrame)에서 정의할 수 있는 임의의 집계 함수로 명명됩니다. DataFrame의 실행이 완료 지점에 도달하는 즉시(즉, 일괄 처리 쿼리를 완료하거나 스트리밍 Epoch에 도달) 마지막 완료 지점 이후 처리된 데이터에 대한 메트릭을 포함하는 명명된 이벤트가 내보내집니다.

수신기를 Spark 세션에 연결하여 이러한 메트릭을 관찰할 수 있습니다. 수신기는 실행 모드에 따라 달라집니다.

  • 일괄 처리 모드: QueryExecutionListener을(를) 사용합니다.

    QueryExecutionListener는 쿼리가 완료되면 호출됩니다. QueryExecution.observedMetrics 맵을 사용하여 메트릭에 액세스합니다.

  • 스트리밍 또는 마이크로 일괄 처리: StreamingQueryListener을(를) 사용합니다.

    StreamingQueryListener는 스트리밍 쿼리가 Epoch를 완료할 때 호출됩니다. StreamingQueryProgress.observedMetrics 맵을 사용하여 메트릭에 액세스합니다. Azure Databricks는 연속 실행 스트리밍을 지원하지 않습니다.

예시:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener 개체 메트릭

메트릭 설명
id 다시 시작할 때 지속되는 고유 쿼리 ID입니다. StreamingQuery.id()를 참조하세요.
runId 시작 또는 다시 시작할 때마다 고유한 쿼리 ID입니다. StreamingQuery.runId()를 참조하세요.
name 사용자가 지정한 쿼리 이름입니다. 지정하지 않으면 Null입니다.
timestamp 마이크로 일괄 처리를 실행하기 위한 타임스탬프입니다.
batchId 처리 중인 데이터의 현재 일괄 처리에 대한 고유 ID입니다. 오류 후 재시도의 경우 지정된 일괄 처리 ID를 두 번 이상 실행할 수 있습니다. 마찬가지로 처리할 데이터가 없으면 일괄 처리 ID가 증가하지 않습니다.
numInputRows 트리거에서 처리된 레코드의 집계(모든 원본에서) 수입니다.
inputRowsPerSecond 도착하는 데이터의 집계(모든 원본에서) 속도입니다.
processedRowsPerSecond Spark가 데이터를 처리하는 집계(모든 원본에서) 속도입니다.

durationMs 개체

마이크로 일괄 처리 실행 프로세스의 다양한 단계를 완료하는 데 걸리는 시간에 대한 정보입니다.

메트릭 설명
durationMs.addBatch 마이크로배치를 실행하는 데 걸린 시간입니다. 그러면 Spark가 마이크로배치를 계획하는 데 걸리는 시간이 제외됩니다.
durationMs.getBatch 원본에서 오프셋에 대한 메타데이터를 검색하는 데 걸리는 시간입니다.
durationMs.latestOffset 마이크로배치에 사용되는 최신 오프셋입니다. 이 진행률 개체는 원본에서 최신 오프셋을 검색하는 데 걸린 시간을 나타냅니다.
durationMs.queryPlanning 실행 계획을 생성하는 데 걸린 시간입니다.
durationMs.triggerExecution 마이크로배치를 계획하고 실행하는 데 걸린 시간입니다.
durationMs.walCommit 사용 가능한 새 오프셋을 커밋하는 데 걸린 시간입니다.

eventTime 개체

마이크로 일괄 처리에서 처리되는 데이터 내에서 표시되는 이벤트 시간 값에 대한 정보입니다. 이 데이터는 워터마크에서 구조적 스트리밍 작업에 정의된 상태 저장 집계를 처리하기 위해 상태를 트리밍하는 방법을 파악하는 데 사용됩니다.

메트릭 설명
eventTime.avg 트리거에 표시되는 평균 이벤트 시간입니다.
eventTime.max 트리거에 표시되는 최대 이벤트 시간입니다.
eventTime.min 트리거에 표시되는 최소 이벤트 시간입니다.
eventTime.watermark 트리거에 사용되는 워터마크의 값입니다.

stateOperators 개체

구조적 스트리밍 작업에 정의된 상태 저장 작업 및 해당 작업에서 생성된 집계에 대한 정보입니다.

메트릭 설명
stateOperators.operatorName 메트릭과 관련된 상태 저장 연산자의 이름입니다. 예: symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal 상태 저장 연산자 또는 집계의 결과로 상태의 행 수입니다.
stateOperators.numRowsUpdated 상태 저장 연산자 또는 집계의 결과로 상태에서 업데이트된 행 수입니다.
stateOperators.numRowsRemoved 상태 저장 연산자 또는 집계의 결과로 상태에서 제거된 행 수입니다.
stateOperators.commitTimeMs 모든 업데이트를 커밋하고(배치 및 제거) 새 버전을 반환하는 데 걸린 시간입니다.
stateOperators.memoryUsedBytes 상태 저장소에서 사용하는 메모리입니다.
stateOperators.numRowsDroppedByWatermark 상태 저장 집계에 포함하기에 너무 늦은 것으로 간주되는 행 수입니다. 스트리밍 집계만 해당: 집계 후 삭제된 행 수이며 원시 입력 행은 삭제되지 않습니다. 숫자는 정확하지 않지만 늦은 데이터가 삭제되고 있음을 나타낼 수 있습니다.
stateOperators.numShufflePartitions 이 상태 저장 연산자의 순서 섞기 파티션 수입니다.
stateOperators.numStateStoreInstances 연산자가 초기화하고 기본 실제 상태 저장소 인스턴스입니다. 많은 상태 저장 연산자에서 파티션 수와 동일하지만 스트림 스트림 조인은 파티션당 4개의 상태 저장소 인스턴스를 초기화합니다.

stateOperators.customMetrics 개체

구조적 스트리밍 작업에 대해 기본 상태 저장 값과 관련하여 성능 및 작업에 대한 메트릭을 캡처하는 RocksDB에서 수집된 정보입니다. 자세한 내용은 Azure Databricks에서 RocksDB 상태 저장소 구성을 참조하세요.

메트릭 설명
customMetrics.rocksdbBytesCopied RocksDB 파일 관리자가 추적한 대로 복사한 바이트 수입니다.
customMetrics.rocksdbCommitCheckpointLatency 네이티브 RocksDB의 스냅샷 가져와서 로컬 디렉터리에 쓰는 데 걸리는 시간(밀리초)입니다.
customMetrics.rocksdbCompactLatency 검사포인트 커밋 중 압축(선택 사항)을 위한 시간(밀리초)입니다.
customMetrics.rocksdbCommitFileSyncLatencyMs 네이티브 RocksDB 스냅샷 관련 파일을 외부 스토리지(검사point 위치)에 동기화하는 데 걸리는 시간(밀리초)입니다.
customMetrics.rocksdbCommitFlushLatency RocksDB 메모리 내 변경 내용을 로컬 디스크에 플러시하는 데 걸리는 시간(밀리초)입니다.
customMetrics.rocksdbCommitPauseLatency 검사point 커밋의 일부로 백그라운드 작업자 스레드(예: 압축)를 중지하는 데 걸리는 시간(밀리초)입니다.
customMetrics.rocksdbCommitWriteBatchLatency 스테이징된 쓰기를 메모리 내 구조()에서 네이티브 RocksDB에 적용하는 데 걸리는 시간(WriteBatch밀리초)입니다.
customMetrics.rocksdbFilesCopied RocksDB 파일 관리자가 추적한 대로 복사한 파일 수입니다.
customMetrics.rocksdbFilesReused RocksDB 파일 관리자가 추적한 대로 다시 사용하는 파일 수입니다.
customMetrics.rocksdbGetCount DB에 대한 호출 수get(다음에서 WriteBatch포함되지 gets 않음: 스테이징 쓰기에 사용되는 메모리 내 일괄 처리).
customMetrics.rocksdbGetLatency 기본 네이티브 RocksDB::Get 호출의 평균 시간(나노초)입니다.
customMetrics.rocksdbReadBlockCacheHitCount RocksDB의 블록 캐시의 양이 유용하거나 유용하지 않으며 로컬 디스크 읽기를 방지합니다.
customMetrics.rocksdbReadBlockCacheMissCount RocksDB의 블록 캐시의 양이 유용하거나 유용하지 않으며 로컬 디스크 읽기를 방지합니다.
customMetrics.rocksdbSstFileSize 모든 SST 파일의 크기입니다. SST는 RocksDB가 데이터를 저장하는 데 사용하는 테이블 형식 구조인 정적 정렬 테이블을 의미합니다.
customMetrics.rocksdbTotalBytesRead 작업에서 읽 get 은 압축되지 않은 바이트 수입니다.
customMetrics.rocksdbTotalBytesReadByCompaction 압축 프로세스가 디스크에서 읽는 바이트 수입니다.
customMetrics.rocksdbTotalBytesReadThroughIterator 상태 저장 작업(예: 시간 제한 처리 FlatMapGroupsWithState 및 워터마크)에는 반복기를 통해 DB에서 데이터를 읽는 작업이 필요합니다. 이 메트릭은 반복기를 사용하여 읽은 압축되지 않은 데이터의 크기를 나타냅니다.
customMetrics.rocksdbTotalBytesWritten 작업에서 작성한 put 압축되지 않은 바이트 수입니다.
customMetrics.rocksdbTotalBytesWrittenByCompaction 압축 프로세스가 디스크에 쓰는 바이트 수입니다.
customMetrics.rocksdbTotalCompactionLatencyMs 백그라운드 압축 및 커밋 중에 시작된 선택적 압축을 포함하여 RocksDB 압축의 시간(밀리초)입니다.
customMetrics.rocksdbTotalFlushLatencyMs 백그라운드 플러시를 포함한 플러시 시간입니다. 플러시 작업은 MemTable이 가득 차면 스토리지로 플러시되는 프로세스입니다. MemTable은 데이터가 RocksDB에 저장되는 첫 번째 수준입니다.
customMetrics.rocksdbZipFileBytesUncompressed RocksDB 파일 관리자는 물리적 SST 파일 디스크 공간 사용률 및 삭제를 관리합니다. 이 메트릭은 파일 관리자가 보고한 압축되지 않은 zip 파일을 바이트 단위로 나타냅니다.

sources 개체(Kafka)

메트릭 설명
sources.description 스트리밍 쿼리에서 읽는 원본의 이름입니다. 예들 들어 “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”입니다.
sources.startOffset 개체 스트리밍 작업이 시작된 Kafka 토픽 내의 시작 오프셋 번호입니다.
sources.endOffset 개체 마이크로배치에서 처리된 최신 오프셋입니다. 이는 진행 중인 마이크로배치 실행의 경우와 같을 latestOffset 수 있습니다.
sources.latestOffset 개체 마이크로배치에 의해 파악된 최신 오프셋입니다. 제한이 있는 경우 마이크로 일괄 처리 프로세스가 모든 오프셋을 처리하지 않아 원인이 endOffset 되고 latestOffset 다를 수 있습니다.
sources.numInputRows 이 원본에서 처리된 입력 행의 수입니다.
sources.inputRowsPerSecond 이 원본에 대한 처리를 위해 데이터가 도착하는 속도입니다.
sources.processedRowsPerSecond Spark가 이 원본에 대한 데이터를 처리하는 속도입니다.

sources.metrics 개체(Kafka)

메트릭 설명
sources.metrics.avgOffsetsBehindLatest 스트리밍 쿼리가 구독된 모든 항목 중에서 사용 가능한 최신 오프셋 뒤에 있는 평균 오프셋 수입니다.
sources.metrics.estimatedTotalBytesBehindLatest 구독된 토픽에서 쿼리 프로세스가 사용하지 않은 예상 바이트 수입니다.
sources.metrics.maxOffsetsBehindLatest 스트리밍 쿼리가 구독된 모든 항목 중에서 사용 가능한 최신 오프셋 뒤에 있는 최대 오프셋 수입니다.
sources.metrics.minOffsetsBehindLatest 스트리밍 쿼리가 구독된 모든 항목 중에서 사용 가능한 최신 오프셋 뒤에 있는 최소 오프셋 수입니다.

sink 개체(Kafka)

메트릭 설명
sink.description 스트리밍 쿼리가 쓰는 싱크의 이름입니다. 예들 들어 “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”입니다.
sink.numOutputRows 마이크로배치의 일부로 출력 테이블 또는 싱크에 기록된 행 수입니다. 경우에 따라 이 값은 "-1"일 수 있으며 일반적으로 "알 수 없음"으로 해석될 수 있습니다.

sources 개체(Delta Lake)

메트릭 설명
sources.description 스트리밍 쿼리에서 읽는 원본의 이름입니다. 예들 들어 “DeltaSource[table]”입니다.
sources.[startOffset/endOffset].sourceVersion 이 오프셋이 인코딩되는 serialization의 버전입니다.
sources.[startOffset/endOffset].reservoirId 읽는 테이블의 ID입니다. 쿼리를 다시 시작할 때 잘못된 구성을 검색하는 데 사용됩니다.
sources.[startOffset/endOffset].reservoirVersion 현재 처리 중인 테이블의 버전입니다.
sources.[startOffset/endOffset].index 이 버전 시퀀스의 AddFiles 인덱스입니다. 이는 큰 커밋을 여러 일괄 처리로 분리하는 데 사용됩니다. 이 인덱스는 정렬하여 modificationTimestamppath만들어집니다.
sources.[startOffset/endOffset].isStartingVersion 이 오프셋이 변경 내용을 처리하는 대신 시작하는 쿼리를 나타내는지 여부입니다. 새 쿼리를 시작할 때 시작 시 테이블에 있는 모든 데이터가 처리된 다음, 도착한 새 데이터가 처리됩니다.
sources.latestOffset 마이크로배치 쿼리에서 처리된 최신 오프셋입니다.
sources.numInputRows 이 원본에서 처리된 입력 행의 수입니다.
sources.inputRowsPerSecond 이 원본에 대한 처리를 위해 데이터가 도착하는 속도입니다.
sources.processedRowsPerSecond Spark가 이 원본에 대한 데이터를 처리하는 속도입니다.
sources.metrics.numBytesOutstanding 결합된 미해결 파일(RocksDB에서 추적한 파일)의 크기입니다. 이는 스트리밍 원본으로 델타 및 자동 로더에 대한 백로그 메트릭입니다.
sources.metrics.numFilesOutstanding 처리할 미해결 파일 수입니다. 이는 스트리밍 원본으로 델타 및 자동 로더에 대한 백로그 메트릭입니다.

sink 개체(Delta Lake)

메트릭 설명
sink.description 스트리밍 쿼리가 쓰는 싱크의 이름입니다. 예들 들어 “DeltaSink[table]”입니다.
sink.numOutputRows Spark는 Delta Lake 싱크에 대한 분류인 DSv1 싱크의 출력 행을 유추할 수 없으므로 이 메트릭의 행 수는 "-1"입니다.

예제

Kafka-to-Kafka StreamingQueryListener 이벤트 예제

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

예제 Delta Lake-To-Delta Lake StreamingQueryListener 이벤트

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Delta Lake StreamingQueryListener 이벤트에 대한 예제 속도 원본

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}