Azure Databricks에서 RocksDB 상태 저장소 구성
스트리밍 쿼리를 시작하기 전에 SparkSession에서 다음 구성을 설정하여 RocksDB 기반 상태 관리를 사용하도록 설정할 수 있습니다.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Delta Live Tables 파이프라인에서 RocksDB를 사용하도록 설정할 수 있습니다. Delta Live Tables에 대해 RocksDB 상태 저장소 사용을 참조 하세요.
changelog 검사pointing 사용
Databricks Runtime 13.3 LTS 이상에서는 구조적 스트리밍 워크로드에 대한 검사포인트 기간 및 엔드 투 엔드 대기 시간을 줄이도록 변경 로그 검사포인트를 사용하도록 설정할 수 있습니다. Databricks는 모든 구조적 스트리밍 상태 저장 쿼리에 대해 changelog 검사포인트를 사용하도록 설정할 것을 권합니다.
일반적으로 RocksDB State Store는 검사 지정하는 동안 데이터 파일을 스냅샷 업로드합니다. 이 비용을 방지하려면 변경 로그 검사지난 검사지속 스토리지로"이후 변경된 레코드만 기록합니다.
Changelog 검사pointing은 기본적으로 사용하지 않도록 설정됩니다. 다음 구문을 사용하여 SparkSession 수준에서 changelog 검사pointing을 사용하도록 설정할 수 있습니다.
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
기존 스트림에서 changelog 검사pointing을 사용하도록 설정하고 검사point에 저장된 상태 정보를 기본 수 있습니다.
Important
changelog 검사pointing을 사용하도록 설정한 쿼리는 Databricks Runtime 13.3 LTS 이상에서만 실행할 수 있습니다. 레거시 검사포인트링 동작으로 되돌리기 변경 로그 검사포인트링을 사용하지 않도록 설정할 수 있지만 Databricks Runtime 13.3 LTS 이상에서 이러한 쿼리를 계속 실행해야 합니다. 이러한 변경이 수행되려면 작업을 다시 시작해야 합니다.
RocksDB 상태 저장소 메트릭
각 상태 연산자는 해당 RocksDB 인스턴스에서 수행된 상태 관리 작업과 관련된 메트릭을 수집하여 상태 저장소를 관찰하고 작업 속도 저하 문제의 디버그를 지원할 수 있습니다. 상태 연산자가 실행되고 있는 모든 작업에서 작업 중인 상태 연산자별로 메트릭이 집계(합계)됩니다. 이러한 메트릭은 StreamingQueryProgress
의 stateOperators
필드 내 customMetrics
맵의 일부입니다. 다음은 JSON 형식의 StreamingQueryProgress
예제입니다(StreamingQueryProgress.json()
을 사용하여 가져옴).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
메트릭에 대한 자세한 설명은 다음과 같습니다.
메트릭 이름 | 설명 |
---|---|
rocksdbCommitWriteBatchLatency | 메모리 내 구조체(WriteBatch)에 스테이징된 쓰기를 네이티브 RocksDB에 적용하는 데 걸린 시간(밀리초)입니다. |
rocksdbCommitFlushLatency | RocksDB 메모리 내 변경 내용을 로컬 디스크로 플러시하는 데 걸린 시간(밀리초)입니다. |
rocksdbCommitCompactLatency | 검사점 커밋 중 압축(선택 사항)에 걸린 시간(밀리초)입니다. |
rocksdbCommitPauseLatency | 검사점 커밋의 일부로 압축 등을 위해 백그라운드 작업자 스레드를 중지하는 데 걸린 시간(밀리초)입니다. |
rocksdbCommitCheckpointLatency | 네이티브 RocksDB 스냅샷을 만들고 로컬 디렉터리에 쓰는 데 걸린 시간(밀리초)입니다. |
rocksdbCommitFileSyncLatencyMs | 네이티브 RocksDB 스냅샷 관련 파일을 외부 스토리지(검사점 위치)에 동기화하는 데 걸린 시간(밀리초)입니다. |
rocksdbGetLatency | 기본 네이티브 RocksDB::Get 호출당 평균 소요 시간(나노초)입니다. |
rocksdbPutCount | 기본 네이티브 RocksDB::Put 호출당 평균 소요 시간(나노초)입니다. |
rocksdbGetCount | 네이티브 RocksDB::Get 호출 수입니다(쓰기 스테이징에 사용되는 메모리 내 일괄 처리인 WriteBatch에서의 Gets 은 포함되지 않음). |
rocksdbPutCount | 네이티브 RocksDB::Put 호출 수입니다(쓰기 스테이징에 사용되는 메모리 내 일괄 처리인 WriteBatch에 대한 Puts 은 포함되지 않음). |
rocksdbTotalBytesReadByGet | 네이티브 RocksDB::Get 호출을 통해 읽은, 압축되지 않은 바이트 수입니다. |
rocksdbTotalBytesWrittenByPut | 네이티브 RocksDB::Put 호출을 통해 쓴, 압축되지 않은 바이트 수입니다. |
rocksdbReadBlockCacheHitCount | 로컬 디스크에서 데이터를 읽지 않도록 네이티브 RocksDB 블록 캐시가 사용된 횟수입니다. |
rocksdbReadBlockCacheMissCount | 네이티브 RocksDB 블록 캐시가 누락되어 로컬 디스크에서 데이터를 읽어야 했던 횟수입니다. |
rocksdbTotalBytesReadByCompaction | 네이티브 RocksDB 압축 프로세스가 로컬 디스크에서 읽은 바이트 수입니다. |
rocksdbTotalBytesWrittenByCompaction | 네이티브 RocksDB 압축 프로세스가 로컬 디스크에 쓴 바이트 수입니다. |
rocksdbTotalCompactionLatencyMs | RocksDB 압축(커밋 중에 시작된 선택적 압축과 백그라운드 압축 포함)에 걸린 시간(밀리초)입니다. |
rocksdbWriterStallLatencyMs | 백그라운드 압축 또는 memtable을 디스크로 플러시하는 작업으로 인해 기록기가 중단된 시간(밀리초)입니다. |
rocksdbTotalBytesReadThroughIterator | 일부 상태 저장 작업(예: flatMapGroupsWithState 의 시간 제한 처리 또는 기간 이동 집계의 워터마크 처리)은 반복기를 통해 DB의 전체 데이터를 읽어야 합니다. 반복기를 사용하여 읽은, 압축되지 않은 데이터의 총 크기입니다. |