Поделиться через


Настройка хранилища состояний RocksDB в Azure Databricks

Вы можете включить управление состоянием на основе RocksDB, задав следующую конфигурацию в SparkSession перед началом потокового запроса.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Вы можете включить RocksDB в конвейерах Delta Live Tables. См. раздел "Включить хранилище состояний RocksDB" для разностных динамических таблиц.

Включение проверка назначения журнала изменений

В Databricks Runtime 13.3 LTS и более поздних версиях можно включить проверка для изменения, чтобы уменьшить длительность проверка точки и сквозную задержку для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует включать контрольные точки журнала изменений для всех запросов структурированных потоков с отслеживанием состояния.

Традиционно моментальные снимки хранилища состояний RocksDB и передают файлы данных во время проверка назначения. Чтобы избежать этой стоимости, журнал изменений проверка точечная запись записывает только записи, которые изменились с момента последнего проверка точки на устойчивое хранилище".

По умолчанию проверка назначение журнала изменений отключено. Вы можете включить проверка назначение журнала изменений на уровне SparkSession с помощью следующего синтаксиса:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Вы можете включить проверка отслеживания изменений в существующем потоке и сохранить сведения о состоянии, хранящиеся в точке проверка.

Внимание

Запросы, которые включили ведение журнала изменений, проверка pointing, могут выполняться только в Databricks Runtime 13.3 LTS и более поздних версиях. Вы можете отключить проверка назначение журнала изменений для отменить изменения в устаревшее поведение проверка назначения, но эти запросы необходимо продолжать выполнять в Databricks Runtime 13.3 LTS или более поздней версии. Для выполнения этих изменений необходимо перезапустить задание.

Метрики хранилища состояний RocksDB

Каждый оператор состояния собирает метрики, связанные с операциями управления состоянием, выполненными на экземпляре RocksDB, для наблюдения за хранилищем состояний и, возможно, помощью в отладке медленной работы заданий. Эти метрики агрегируются (суммируются) для каждого оператора состояния в задании по всем задачам, в которых выполняется оператор состояния. Эти метрики являются частью сопоставлений customMetrics внутри полей stateOperators в StreamingQueryProgress. Ниже приведен пример StreamingQueryProgress в форме JSON (получен с помощью StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Ниже приведены подробные описания метрик.

Имя метрики Description
rocksdbCommitWriteBatchLatency Время (в миллисекундах), которое понадобилось для применения промежуточных операций записи в структуре в памяти (WriteBatch) к собственному RocksDB.
rocksdbCommitFlushLatency Время (в миллисекундах), которое понадобилось для очистки изменений в памяти RocksDB на локальном диске.
rocksdbCommitCompactLatency Время (в миллисекундах), которое понадобилось для сжатия (необязательного) во время фиксации контрольной точки.
rocksdbCommitPauseLatency Время (в миллисекундах), которое понадобилось для остановки фоновых рабочих потоков (для сжатия и т. д.) в рамках фиксации контрольной точки.
rocksdbCommitCheckpointLatency Время (в миллисекундах), которое понадобилось для создания моментального снимка собственного RockDB и запись его в локальный каталог.
rocksdbCommitFileSyncLatencyMs Время (в миллисекундах), которое понадобилось для синхронизации файлов, связанных с моментальным снимком собственного RocksDB, со внешним хранилищем (расположением контрольной точки).
rocksdbGetLatency Среднее время (в наносекундах), затраченное на базовый вызов собственного RocksDB::Get.
rocksdbPutCount Среднее время (в наносекундах), затраченное на базовый вызов собственного RocksDB::Put.
rocksdbGetCount Число собственных вызовов RocksDB::Get (без учета Gets из WriteBatch — в пакете памяти, используемом для промежуточной записи).
rocksdbPutCount Число собственных вызовов RocksDB::Put (без учета Puts в WriteBatch — в пакете памяти, используемом для промежуточной записи).
rocksdbTotalBytesReadByGet Число несжатых байтов, считанных с помощью собственных вызовов RocksDB::Get.
rocksdbTotalBytesWrittenByPut Число несжатых байтов, записанных с помощью собственных вызовов RocksDB::Put.
rocksdbReadBlockCacheHitCount Количество использований собственного кэша блоков RocksDB, чтобы избежать считывания данных с локального диска.
rocksdbReadBlockCacheMissCount Количество промахов собственного кэша блоков RocksDB, которые требовались для считывания данных с локального диска.
rocksdbTotalBytesReadByCompaction Число байтов, считанных с локального диска с помощью собственного процесса сжатия RocksDB.
rocksdbTotalBytesWrittenByCompaction Число байтов, записанных на локальный диск с помощью собственного процесса сжатия RocksDB.
rocksdbTotalCompactionLatencyMs Время (в миллисекундах), потраченное на сжатие RocksDB (как фоновое, так и дополнительное сжатие, инициированное во время фиксации).
rocksdbWriterStallLatencyMs Время (в миллисекундах), которое модуль записи простаивал из-за фонового сжатия или сброса объектов memtable на диск.
rocksdbTotalBytesReadThroughIterator Некоторые из операций с отслеживанием состояния (например, обработка времени ожидания в flatMapGroupsWithState или добавление пределов в агрегатах с окнами) требуют считывания всех данных в базе данных с помощью итератора. Общий размер несжатых данных, считанных с помощью итератора.