Configurer un magasin d’état RocksDB sur Azure Databricks

Vous pouvez activer la gestion d'état basée sur RocksDB en définissant la configuration suivante dans la SparkSession avant de démarrer la requête de streaming.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Vous pouvez activer RocksDB sur les pipelines Delta Live Tables. Consultez Activer le stockage d’état RocksDB pour Delta Live Tables.

Activer le point de contrôle du journal des modifications

Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez activer le point de contrôle du journal des modifications pour réduire la durée des points de contrôle et la latence de bout en bout pour des charges de travail de flux structuré. Databricks recommande d’activer le point de contrôle du journal des modifications pour toutes les requêtes avec état Flux structuré.

Traditionnellement, le magasin d’état RocksDB capture des instantanés et charge les fichiers de données pendant les points de contrôle. Pour éviter ce coût, le point de contrôle du journal des modifications écrit uniquement les enregistrements qui ont été modifiés depuis le dernier point de contrôle dans le stockage durable. »

Le point de contrôle du journal des modifications est désactivé par défaut. Vous pouvez activer le point de contrôle du journal des modifications au niveau SparkSession à l’aide de la syntaxe suivante :

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Vous pouvez activer le point de contrôle du journal des modifications sur un flux existant et conserver les informations d’état stockées dans le point de contrôle.

Important

Les requêtes qui ont activé le point de contrôle du journal des modifications ne peuvent être exécutées que sur Databricks Runtime 13.3 LTS et versions ultérieures. Vous pouvez désactiver les points de contrôle du journal des modifications pour revenir au comportement de point de contrôle hérité, mais vous devez continuer à exécuter ces requêtes sur Databricks Runtime 13.3 LTS ou version ultérieure. Vous devez redémarrer le travail pour que ces modifications se produisent.

Métriques du magasin d’état RocksDB

Chaque opérateur d’état collecte des métriques liées aux opérations de gestion d’état effectuées sur son instance RocksDB pour observer le magasin d’état, voire aider à déboguer la lenteur d’un travail. Ces métriques sont agrégées (sum) par un opérateur d’état dans le travail sur toutes les tâches où l’opérateur d’état est actif. Ces métriques font partie du mappage customMetrics dans les champs stateOperators de StreamingQueryProgress. Voici un exemple de StreamingQueryProgress sous forme de JSON (obtenu à l’aide de StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Les descriptions détaillées des métriques sont les suivantes :

Nom de métrique Description
rocksdbCommitWriteBatchLatency Temps (en millisecondes) pris pour appliquer les écritures intermédiaires dans la structure en mémoire (WriteBatch) au RocksDB natif.
rocksdbCommitFlushLatency Temps (en millisecondes) pris pour vider les modifications en mémoire de RocksDB sur le disque local.
rocksdbCommitCompactLatency Temps (en millisecondes) pris pour le compactage (facultatif) au cours de la validation du point de contrôle.
rocksdbCommitPauseLatency Temps (en millisecondes) pris pour l’arrêt des threads de travail en arrière-plan (pour le compactage, etc.) dans le cadre de la validation du point de contrôle.
rocksdbCommitCheckpointLatency Temps (en millisecondes) pris pour prendre un instantané de RocksDB natif et l’écrire dans un répertoire local.
rocksdbCommitFileSyncLatencyMs Temps (en millisecondes) pris pour synchroniser les fichiers associés à l’instantané RocksDB natif sur un stockage externe (emplacement du point de contrôle).
rocksdbGetLatency Temps moyen (en nanosecondes) pris par l’appel RocksDB::Get natif sous-jacent.
rocksdbPutCount Temps moyen (en nanosecondes) pris par l’appel RocksDB::Put natif sous-jacent.
rocksdbGetCount Nombre d’appels natifs RocksDB::Get (n’inclut les Gets de WriteBatch - lot en mémoire utilisé pour les écritures intermédiaires).
rocksdbPutCount Nombre d’appels natifs RocksDB::Put (n’inclut les Puts vers WriteBatch - lot en mémoire utilisé pour les écritures intermédiaires).
rocksdbTotalBytesReadByGet Nombre d’octets non compressés lus via des appels RocksDB::Get natifs.
rocksdbTotalBytesWrittenByPut Nombre d’octets non compressés écrits via des appels RocksDB::Put natifs.
rocksdbReadBlockCacheHitCount Nombre de fois que le cache de bloc RocksDB natif est utilisé pour éviter une lecture des données à partir du disque local.
rocksdbReadBlockCacheMissCount Nombre de fois que le cache de bloc RocksDB natif a manqué et requis une lecture de données à partir du disque local.
rocksdbTotalBytesReadByCompaction Nombre d’octets lus à partir du disque local par le processus de compactage RocksDB natif.
rocksdbTotalBytesWrittenByCompaction Nombre d’octets écrits sur le disque local par le processus de compactage RocksDB natif.
rocksdbTotalCompactionLatencyMs Temps (en millisecondes) pris pour les compactages RocksDB (compactage en arrière-plan et compactage facultatif initié lors de la validation).
rocksdbWriterStallLatencyMs Temps (en millisecondes) pendant lequel l’enregistreur est resté bloqué en raison du compactage en arrière-plan ou du vidage des memtables sur le disque.
rocksdbTotalBytesReadThroughIterator Certaines opérations avec état (telles que le traitement du délai d’expiration dans flatMapGroupsWithState ou la limitation dans les agrégations fenêtrées) requièrent la lecture de données entières dans DB via un itérateur. Taille totale des données décompressées lues à l’aide de l’itérateur.