Share via


Azure Databricks에서 RocksDB 상태 저장소 구성

스트리밍 쿼리를 시작하기 전에 SparkSession에서 다음 구성을 설정하여 RocksDB 기반 상태 관리를 사용하도록 설정할 수 있습니다.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Delta Live Tables 파이프라인에서 RocksDB를 사용하도록 설정할 수 있습니다. Delta Live Tables에 대해 RocksDB 상태 저장소 사용을 참조 하세요.

changelog 검사pointing 사용

Databricks Runtime 13.3 LTS 이상에서는 구조적 스트리밍 워크로드에 대한 검사포인트 기간 및 엔드 투 엔드 대기 시간을 줄이도록 변경 로그 검사포인트를 사용하도록 설정할 수 있습니다. Databricks는 모든 구조적 스트리밍 상태 저장 쿼리에 대해 changelog 검사포인트를 사용하도록 설정할 것을 권합니다.

일반적으로 RocksDB State Store는 검사 지정하는 동안 데이터 파일을 스냅샷 업로드합니다. 이 비용을 방지하려면 변경 로그 검사지난 검사지속 스토리지로"이후 변경된 레코드만 기록합니다.

Changelog 검사pointing은 기본적으로 사용하지 않도록 설정됩니다. 다음 구문을 사용하여 SparkSession 수준에서 changelog 검사pointing을 사용하도록 설정할 수 있습니다.

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

기존 스트림에서 changelog 검사pointing을 사용하도록 설정하고 검사point에 저장된 상태 정보를 기본 수 있습니다.

Important

changelog 검사pointing을 사용하도록 설정한 쿼리는 Databricks Runtime 13.3 LTS 이상에서만 실행할 수 있습니다. 레거시 검사포인트링 동작으로 되돌리기 변경 로그 검사포인트링을 사용하지 않도록 설정할 수 있지만 Databricks Runtime 13.3 LTS 이상에서 이러한 쿼리를 계속 실행해야 합니다. 이러한 변경이 수행되려면 작업을 다시 시작해야 합니다.

RocksDB 상태 저장소 메트릭

각 상태 연산자는 해당 RocksDB 인스턴스에서 수행된 상태 관리 작업과 관련된 메트릭을 수집하여 상태 저장소를 관찰하고 작업 속도 저하 문제의 디버그를 지원할 수 있습니다. 상태 연산자가 실행되고 있는 모든 작업에서 작업 중인 상태 연산자별로 메트릭이 집계(합계)됩니다. 이러한 메트릭은 StreamingQueryProgressstateOperators 필드 내 customMetrics 맵의 일부입니다. 다음은 JSON 형식의 StreamingQueryProgress 예제입니다(StreamingQueryProgress.json()을 사용하여 가져옴).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

메트릭에 대한 자세한 설명은 다음과 같습니다.

메트릭 이름 설명
rocksdbCommitWriteBatchLatency 메모리 내 구조체(WriteBatch)에 스테이징된 쓰기를 네이티브 RocksDB에 적용하는 데 걸린 시간(밀리초)입니다.
rocksdbCommitFlushLatency RocksDB 메모리 내 변경 내용을 로컬 디스크로 플러시하는 데 걸린 시간(밀리초)입니다.
rocksdbCommitCompactLatency 검사점 커밋 중 압축(선택 사항)에 걸린 시간(밀리초)입니다.
rocksdbCommitPauseLatency 검사점 커밋의 일부로 압축 등을 위해 백그라운드 작업자 스레드를 중지하는 데 걸린 시간(밀리초)입니다.
rocksdbCommitCheckpointLatency 네이티브 RocksDB 스냅샷을 만들고 로컬 디렉터리에 쓰는 데 걸린 시간(밀리초)입니다.
rocksdbCommitFileSyncLatencyMs 네이티브 RocksDB 스냅샷 관련 파일을 외부 스토리지(검사점 위치)에 동기화하는 데 걸린 시간(밀리초)입니다.
rocksdbGetLatency 기본 네이티브 RocksDB::Get 호출당 평균 소요 시간(나노초)입니다.
rocksdbPutCount 기본 네이티브 RocksDB::Put 호출당 평균 소요 시간(나노초)입니다.
rocksdbGetCount 네이티브 RocksDB::Get 호출 수입니다(쓰기 스테이징에 사용되는 메모리 내 일괄 처리인 WriteBatch에서의 Gets은 포함되지 않음).
rocksdbPutCount 네이티브 RocksDB::Put 호출 수입니다(쓰기 스테이징에 사용되는 메모리 내 일괄 처리인 WriteBatch에 대한 Puts은 포함되지 않음).
rocksdbTotalBytesReadByGet 네이티브 RocksDB::Get 호출을 통해 읽은, 압축되지 않은 바이트 수입니다.
rocksdbTotalBytesWrittenByPut 네이티브 RocksDB::Put 호출을 통해 쓴, 압축되지 않은 바이트 수입니다.
rocksdbReadBlockCacheHitCount 로컬 디스크에서 데이터를 읽지 않도록 네이티브 RocksDB 블록 캐시가 사용된 횟수입니다.
rocksdbReadBlockCacheMissCount 네이티브 RocksDB 블록 캐시가 누락되어 로컬 디스크에서 데이터를 읽어야 했던 횟수입니다.
rocksdbTotalBytesReadByCompaction 네이티브 RocksDB 압축 프로세스가 로컬 디스크에서 읽은 바이트 수입니다.
rocksdbTotalBytesWrittenByCompaction 네이티브 RocksDB 압축 프로세스가 로컬 디스크에 쓴 바이트 수입니다.
rocksdbTotalCompactionLatencyMs RocksDB 압축(커밋 중에 시작된 선택적 압축과 백그라운드 압축 포함)에 걸린 시간(밀리초)입니다.
rocksdbWriterStallLatencyMs 백그라운드 압축 또는 memtable을 디스크로 플러시하는 작업으로 인해 기록기가 중단된 시간(밀리초)입니다.
rocksdbTotalBytesReadThroughIterator 일부 상태 저장 작업(예: flatMapGroupsWithState의 시간 제한 처리 또는 기간 이동 집계의 워터마크 처리)은 반복기를 통해 DB의 전체 데이터를 읽어야 합니다. 반복기를 사용하여 읽은, 압축되지 않은 데이터의 총 크기입니다.