Optimiser des requêtes Structured Streaming avec état

La gestion des informations d’état intermédiaire des requêtes de Structured Streaming avec état peut aider à éviter des problèmes inattendus de latence et de production.

Les recommandations de Databricks sont les suivantes :

  • Utilisez des instances optimisées pour le calcul en tant que workers.
  • Définissez le nombre de partitions aléatoires sur 1 à 2 fois le nombre de cœurs dans le cluster.
  • Définissez la configuration spark.sql.streaming.noDataMicroBatches.enabled sur false dans la SparkSession. Cela empêche le moteur de traitement par micro-lots de diffusion en continu de traiter des micro-lots qui ne contiennent pas de données. Notez également que la définition de cette configuration sur false peut entraîner des opérations avec état qui tirent parti des limites ou des délais de traitement pour ne pas obtenir de sortie de données tant que de nouvelles données n’arrivent, plutôt que des les obtenir immédiatement.

Databricks recommande l’utilisation de RocksDB avec les points de contrôle du journal des modifications pour gérer l’état des flux avec état. Consultez Configurer un stockage d’état RocksDB sur Azure Databricks.

Remarque

Le schéma de gestion d’état ne peut pas être modifié entre les redémarrages de la requête. Autrement dit, si une requête a été démarrée avec la gestion par défaut, il n’est possible de la modifier qu’en la démarrant à partir de zéro avec un nouvel emplacement de point de contrôle.

Utiliser plusieurs opérateurs avec état dans Structured Streaming

Dans Databricks Runtime 13.3 LTS et versions ultérieures, Azure Databricks offre une prise en charge avancée des opérateurs avec état dans les charges de travail Flux structuré. Vous pouvez désormais chaîner plusieurs opérateurs avec état, ce qui signifie que vous pouvez alimenter la sortie d’une opération telle qu’une agrégation fenêtrée vers une autre opération avec état telle qu’une jointure.

Les exemples suivants illustrent plusieurs modèles que vous pouvez utiliser.

Important

Les limitations suivantes existent lors de l’utilisation de plusieurs opérateurs avec état :

  • La fonction FlatMapGroupWithState n'est pas prise en charge.
  • Seul le mode de sortie d’ajout est pris en charge.

Agrégation de fenêtres de temps chaînées

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agrégation de fenêtres de temps dans deux flux différents suivis d’une jointure de fenêtre flux-flux

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Jointure de l’intervalle de temps flux-flux suivie de l’agrégation de fenêtre de temps

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Rééquilibrage d’état pour le flux structuré

Le rééquilibrage d’état est activé par défaut pour toutes les charges de travail de streaming dans les tables Delta Live. Dans Databricks Runtime 11.3 LTS et versions ultérieures, vous pouvez définir l’option de configuration suivante dans la configuration du cluster Spark pour activer le rééquilibrage de l’état :

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Le rééquilibrage d’état bénéficie aux pipelines Structured Streaming avec état qui connaissent des événements de redimensionnement de cluster. Les opérations de streaming sans état n’en bénéficient pas, même si la taille de cluster varie.

Remarque

La mise à l’échelle automatique du calcul présente des limitations pour la réduction de la taille du cluster pour les charges de travail Structured Streaming. Databricks recommande d’utiliser Delta Live Tables avec mise à l’échelle automatique améliorée pour les charges de travail de diffusion en continu. Consultez Optimiser l’utilisation des clusters des pipelines Delta Live Tables avec la mise à l’échelle automatique améliorée.

Un événement de redimensionnement de cluster entraîne le déclenchement d’un rééquilibrage d’état. Lors d’événements de rééquilibrage, vous pouvez noter que la latence d’un micro-lot peut être plus élevée lors du chargement de l’état du stockage cloud vers le nouvel exécuteur.

Spécifier l’état initial pour mapGroupsWithState

Vous pouvez spécifier un état initial défini par l’utilisateur pour le traitement avec état de Structured Streaming à l’aide de flatMapGroupsWithState ou mapGroupsWithState. Cela vous permet d’éviter de re-traiter des données au démarrage d’un flux avec état sans point de contrôle valide.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Exemple de cas d’usage spécifiant un état initial pour l’opérateur flatMapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Exemple de cas d’usage spécifiant un état initial pour l’opérateur mapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Tester la fonction de mise à jour mapGroupsWithState

LAPI TestGroupState vous permet de tester la fonction de mise à jour d’état utilisée pour Dataset.groupByKey(...).mapGroupsWithState(...) et Dataset.groupByKey(...).flatMapGroupsWithState(...).

La fonction de mise à jour d’état prend l’état précédent comme entrée en utilisant un objet de type GroupState. Consultez la documentation de référence sur GroupState d’Apache Spark. Par exemple :

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}