Stateful structured streaming-query's optimaliseren

Het beheren van de informatie over de tussenliggende status van stateful structured streaming-query's kan helpen bij het voorkomen van onverwachte latentie en productieproblemen.

Databricks raadt het volgende aan:

  • Gebruik voor rekenkracht geoptimaliseerde exemplaren als werkrollen.
  • Stel het aantal willekeurige partities in op 1-2 keer aantal kernen in het cluster.
  • Stel de spark.sql.streaming.noDataMicroBatches.enabled configuratie false in op in SparkSession. Hiermee voorkomt u dat de streaming-microbatch-engine microbatches verwerkt die geen gegevens bevatten. Houd er ook rekening mee dat het instellen van deze configuratie false kan leiden tot stateful bewerkingen die gebruikmaken van time-outs voor watermerken of verwerkingstime-outs om geen gegevensuitvoer te krijgen totdat nieuwe gegevens binnenkomen in plaats van onmiddellijk.

Databricks raadt het gebruik van RocksDB aan met controlepunten voor wijzigingenlogboeken om de status voor stateful stromen te beheren. Zie Opslag van RocksDB-status in Azure Databricks configureren.

Notitie

Het statusbeheerschema kan niet worden gewijzigd tussen het opnieuw opstarten van de query. Als een query is gestart met het standaardbeheer, kan deze dus niet worden gewijzigd zonder de query helemaal opnieuw te starten met een nieuwe controlepuntlocatie.

Werken met meerdere stateful operators in Structured Streaming

In Databricks Runtime 13.3 LTS en hoger biedt Azure Databricks geavanceerde ondersteuning voor stateful operators in structured streaming-workloads. U kunt nu meerdere stateful operators samenvoegen, wat betekent dat u de uitvoer van een bewerking, zoals een gevensterde aggregatie, kunt doorvoeren naar een andere stateful bewerking, zoals een join.

In de volgende voorbeelden ziet u verschillende patronen die u kunt gebruiken.

Belangrijk

De volgende beperkingen gelden voor het werken met meerdere stateful operators:

  • FlatMapGroupWithState wordt niet ondersteund.
  • Alleen de uitvoermodus voor toevoegen wordt ondersteund.

Aggregatie van geketende tijdvensters

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Tijdvensteraggregatie in twee verschillende streams gevolgd door stream-stream window join

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Tijdsinterval voor streamstreaming gevolgd door tijdvensteraggregatie

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Statusherverdeling voor gestructureerd streamen

Statusherverdeling is standaard ingeschakeld voor alle streamingworkloads in Delta Live Tables. In Databricks Runtime 11.3 LTS en hoger kunt u de volgende configuratieoptie instellen in de Configuratie van het Spark-cluster om statusherverdeling mogelijk te maken:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Statusherverdeling biedt stateful Structured Streaming-pijplijnen die het formaat van het cluster wijzigen. Staatloze streamingbewerkingen profiteren niet, ongeacht het wijzigen van clustergrootten.

Notitie

Automatisch schalen van berekeningen heeft beperkingen bij het omlaag schalen van clustergrootte voor structured streaming-workloads. Databricks raadt het gebruik van Delta Live Tables aan met verbeterde automatische schaalaanpassing voor streamingworkloads. Zie Het clustergebruik van Delta Live Tables-pijplijnen optimaliseren met verbeterde automatische schaalaanpassing.

Het wijzigen van het formaat van clusters zorgt ervoor dat statusherverdeling wordt geactiveerd. Tijdens het opnieuw verdelen van gebeurtenissen kunnen microbatches een hogere latentie hebben wanneer de status van cloudopslag naar de nieuwe uitvoerders wordt geladen.

Geef de initiële status op voor mapGroupsWithState

U kunt een door de gebruiker gedefinieerde initiële status opgeven voor structured streaming stateful verwerking met behulp van flatMapGroupsWithStateof mapGroupsWithState. Hierdoor kunt u voorkomen dat gegevens opnieuw worden verwerkt bij het starten van een stateful stream zonder een geldig controlepunt.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Voorbeeld van use case die een initiële status aangeeft aan de flatMapGroupsWithState operator:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Voorbeeld van use case die een initiële status aangeeft aan de mapGroupsWithState operator:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState De updatefunctie testen

Met TestGroupState de API kunt u de functie voor statusupdates testen die wordt gebruikt voor Dataset.groupByKey(...).mapGroupsWithState(...) en Dataset.groupByKey(...).flatMapGroupsWithState(...).

De functie statusupdate heeft de vorige status als invoer met behulp van een object van het type GroupState. Raadpleeg de referentiedocumentatie voor Apache Spark GroupState. Voorbeeld:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}