Állapotalapú strukturált streamelési lekérdezések optimalizálása

Az állapotalapú strukturált streamelési lekérdezések köztes állapotadatainak kezelése segíthet megelőzni a váratlan késést és az éles problémákat.

A Databricks a következőket javasolja:

  • Számítási szempontból optimalizált példányok használata feldolgozóként.
  • Állítsa az shuffle partíciók számát a fürt magjainak 1–2-szeresére.
  • Állítsa be a spark.sql.streaming.noDataMicroBatches.enabled konfigurációt false a SparkSessionban. Ez megakadályozza, hogy a streamelt mikrokötegmotor feldolgozzon olyan mikro kötegeket, amelyek nem tartalmaznak adatokat. Vegye figyelembe azt is, hogy a konfiguráció beállítása false olyan állapotalapú műveleteket eredményezhet, amelyek vízjeleket vagy időtúllépések feldolgozását használják fel, hogy ne kapják meg az adatkimenetet, amíg új adatok nem érkeznek meg azonnal.

A Databricks a RocksDB használatát javasolja változásnapló-ellenőrzőpontokkal az állapotalapú streamek állapotának kezeléséhez. Lásd: A RocksDB állapottároló konfigurálása az Azure Databricksben.

Feljegyzés

Az állapotkezelési séma nem módosítható a lekérdezések újraindítása között. Vagyis ha egy lekérdezés az alapértelmezett felügyelettel lett elindítva, akkor nem módosítható anélkül, hogy a lekérdezést teljesen új ellenőrzőpont-hellyel kezdené.

Több állapotalapú operátor használata a strukturált streamelésben

A Databricks Runtime 13.3 LTS-ben és újabb verziókban az Azure Databricks fejlett támogatást nyújt a strukturált streamelési számítási feladatok állapotalapú operátorai számára. Mostantól több állapotalapú operátort is összekapcsolhat, ami azt jelenti, hogy egy művelet kimenetét, például egy ablakos összesítést egy másik állapotalapú művelethez, például egy illesztéshez táplálhatja.

Az alábbi példák számos használható mintát mutatnak be.

Fontos

Több állapotalapú operátor használatakor a következő korlátozások érvényesek:

  • FlatMapGroupWithState nem támogatott.
  • Csak a hozzáfűző kimeneti mód támogatott.

Láncolt időablak-összesítés

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Időablak-összesítés két különböző streamben, majd stream-stream ablak illesztése

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Stream-stream időintervallum-illesztés, majd időablak-összesítés

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Állapotkiegyenlítés strukturált streameléshez

Az állapot-újraegyensúlyozás alapértelmezés szerint engedélyezve van a Delta Live Tables összes streamelési számítási feladatához. A Databricks Runtime 11.3 LTS-ben és újabb verziókban az alábbi konfigurációs beállítást állíthatja be a Spark-fürtkonfigurációban az állapot-újraegyensúlyozás engedélyezéséhez:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Az állapot-újraegyensúlyozás az állapotalapú strukturált streamelési folyamatokkal jár, amelyek fürt átméretezési eseményeken mennek keresztül. Az állapot nélküli streamelési műveletek a fürt méretének megváltoztatásától függetlenül nem előnyösek.

Feljegyzés

A számítási automatikus skálázás korlátozásokkal rendelkezik a strukturált streamelési számítási feladatok fürtméretének leskálázásával. A Databricks a Delta Live Tables with Enhanced Autoscaling használatát ajánlja a streaming munkaterhelésekhez. Lásd: A Delta Live Tables-folyamatok fürtkihasználtságának optimalizálása továbbfejlesztett automatikus skálázással.

A fürt átméretezési eseményei az állapotok újraegyensúlyozását váltják ki. Az események újraegyensúlyozása során a mikro kötegek nagyobb késéssel járhatnak, mivel az állapot betöltődik a felhőbeli tárolóból az új végrehajtókba.

Adja meg a kezdeti állapotot a következőhöz: mapGroupsWithState

Megadhatja a felhasználó által meghatározott kezdeti állapotot a strukturált streamelési állapotalapú feldolgozáshoz a mapGroupsWithStatevagy a flatMapGroupsWithStatehasználatával. Így elkerülheti az adatok újrafeldolgozását, ha egy állapotalapú adatfolyamot érvényes ellenőrzőpont nélkül indít el.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Példa az operátor kezdeti állapotát meghatározó használati esetre flatMapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Példa az operátor kezdeti állapotát meghatározó használati esetre mapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

A frissítési függvény tesztelése mapGroupsWithState

Az TestGroupState API lehetővé teszi a használt állapotfrissítési függvény tesztelésétDataset.groupByKey(...).mapGroupsWithState(...).Dataset.groupByKey(...).flatMapGroupsWithState(...)

Az állapotfrissítési függvény az előző állapotot veszi bemenetként egy típusú GroupStateobjektum használatával. Tekintse meg az Apache Spark GroupState referenciadokumentációját. Példa:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}