Sdílet prostřednictvím


Zotavení z selhání dotazů strukturovaného streamování pomocí pracovních postupů

Strukturované streamování poskytuje odolnost proti chybám a konzistenci dat pro dotazy streamování; pomocí pracovních postupů Azure Databricks můžete snadno nakonfigurovat dotazy strukturovaného streamování tak, aby se automaticky restartoval při selhání. Povolením kontrolních bodů pro streamovací dotaz můžete po selhání dotaz restartovat. Restartovaný dotaz pokračuje v případě, že jeden neúspěšný dotaz skončil.

Povolení vytváření kontrolních bodů pro dotazy strukturovaného streamování

Databricks doporučuje, abyste před spuštěním dotazu vždy zadali checkpointLocation možnost cesty ke cloudovému úložišti. Příklad:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

Toto umístění kontrolního bodu zachovává všechny základní informace, které identifikují dotaz. Každý dotaz musí mít jiné umístění kontrolního bodu. Více dotazů by nikdy nemělo mít stejné umístění. Další informace najdete v průvodci programováním strukturovaného streamování.

Poznámka:

I když checkpointLocation je vyžadována pro většinu typů výstupních jímek, některé jímky, jako je například jímka paměti, mohou automaticky generovat dočasné umístění kontrolního bodu, když nezadáte checkpointLocation. Tato dočasná umístění kontrolních bodů nezajistí žádnou odolnost proti chybám nebo záruky konzistence dat a nemusí se správně vyčistit. Vyhněte se potenciálním nástrahám tím, že vždy zadáte checkpointLocation.

Konfigurace úloh strukturovaného streamování pro restartování dotazů streamování při selhání

Úlohu Azure Databricks můžete vytvořit s poznámkovým blokem nebo souborem JAR, který obsahuje dotazy streamování, a nakonfigurovat ho na:

  • Vždy používejte nový cluster.
  • Vždy opakujte pokus o selhání.

Automatické restartování při selhání úlohy je zvlášť důležité při konfiguraci úloh streamování s vývojem schématu. Vývoj schématu funguje v Azure Databricks tak, že při zjištění změny schématu vyvolá očekávanou chybu a při restartování úlohy pomocí nového schématu správně zpracovává data. Databricks doporučuje vždy konfigurovat úlohy streamování, které obsahují dotazy s vývojem schématu, aby se automaticky restartoval v pracovních postupech Databricks.

Úlohy mají úzkou integraci s rozhraními API strukturovaného streamování a můžou monitorovat všechny dotazy streamování, které jsou aktivní při spuštění. Tato konfigurace zajišťuje, že pokud jakákoli část dotazu selže, úlohy automaticky ukončí spuštění (spolu se všemi ostatními dotazy) a spustí nové spuštění v novém clusteru. Tím znovu spustíte poznámkový blok nebo kód JAR a znovu restartuje všechny dotazy. To je nejbezpečnější způsob, jak se vrátit do dobrého stavu.

Poznámka:

  • Selhání v některém z aktivních dotazů streamování způsobí selhání aktivního spuštění a ukončení všech ostatních streamovaných dotazů.
  • Nemusíte používat streamingQuery.awaitTermination() ani spark.streams.awaitAnyTermination() na konci poznámkového bloku. Úlohy automaticky brání dokončení spuštění, když je aktivní streamovací dotaz.
  • Databricks místo orchestrace dbutils.notebook.run() poznámkových bloků strukturovaného %run streamování doporučuje používat úlohy. Viz Spuštění poznámkového bloku Databricks z jiného poznámkového bloku.

Následuje příklad doporučené konfigurace úlohy.

  • Cluster: Nastavte ho vždy tak, aby používal nový cluster a používal nejnovější verzi Sparku (nebo alespoň verzi 2.1). Dotazy spuštěné ve Sparku 2.1 a novějších verzích se po upgradu dotazů a verzí Sparku dají obnovit.
  • Oznámení: Tuto možnost nastavte, pokud chcete e-mailové oznámení o selháních.
  • Plán: Nenastavujte plán.
  • Časový limit: Nenastavujte časový limit. Dotazy streamování běží po neomezenou dobu.
  • Maximální počet souběžných spuštění: Nastaveno na hodnotu 1. Současně musí být aktivní pouze jedna instance každého dotazu.
  • Opakování: Nastaveno na neomezenou hodnotu.

Informace o těchto konfiguracích najdete v tématu Vytvoření a spuštění úloh Azure Databricks.

Obnovení po změnách v dotazu strukturovaného streamování

Existují omezení toho, jaké změny v dotazu streamování jsou povolené mezi restartováními ze stejného umístění kontrolního bodu. Tady je několik změn, které buď nejsou povolené, nebo efekt změny není dobře definovaný. Pro všechny:

  • Povolený termín znamená, že zadanou změnu můžete provést, ale to, jestli je sémantika jejího účinku dobře definovaná, závisí na dotazu a změně.
  • Nepovolený termín znamená, že byste neměli provést zadanou změnu, protože restartovaný dotaz pravděpodobně selže s nepředvídatelnými chybami.
  • sdf představuje streamovaný datový rámec nebo datovou sadu vygenerovanou pomocí sparkSession.readStream.

Typy změn v dotazech strukturovaného streamování

  • Změny čísla nebo typu (tj. jiného zdroje) vstupních zdrojů: To není povoleno.
  • Změny parametrů vstupních zdrojů: Jestli je to povoleno a zda jsou sémantika změny dobře definovaná, závisí na zdroji a dotazu. Tady je pár příkladů.
    • Přidání, odstranění a úprava omezení četnosti je povolené:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      na

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Změny odebíraných článků a souborů se obecně nepovolují, protože výsledky jsou nepředvídatelné: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Změny v intervalu aktivační události: Mezi přírůstkovými dávkami a časovými intervaly můžete změnit triggery. Viz Změna intervalů aktivačních událostí mezi spuštěními.
  • Změny typu výstupní jímky: Změny mezi několika konkrétními kombinacemi jímek jsou povolené. To je potřeba ověřit na základě případu. Tady je pár příkladů.
    • Je povolená jímka souborů do jímky Kafka. Kafka uvidí jenom nová data.
    • Jímka Kafka do jímky souborů není povolená.
    • Je povolená jímka Kafka na foreach nebo naopak.
  • Změny parametrů výstupní jímky: Jestli je tato možnost povolená a jestli jsou sémantika změny dobře definovaná, závisí na jímce a dotazu. Tady je pár příkladů.
    • Změny výstupního adresáře jímky souborů nejsou povoleny: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Změny v tématu výstupu jsou povolené: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Změny uživatelem definované jímky foreach (tj ForeachWriter . kódu) jsou povolené, ale sémantika změny závisí na kódu.
  • Změny v projekci / filtru / operacích podobných mapování: Některé případy jsou povolené. Příklad:
    • Přidání nebo odstranění filtrů je povoleno: sdf.selectExpr("a") do sdf.where(...).selectExpr("a").filter(...).
    • Změny v projekcích se stejným schématem výstupu jsou povoleny: sdf.selectExpr("stringColumn AS json").writeStream do sdf.select(to_json(...).as("json")).writeStream.
    • Změny v projekcích s různým výstupním schématem jsou podmíněně povoleny: sdf.selectExpr("a").writeStream je sdf.selectExpr("b").writeStream povoleno pouze v případě, že výstupní jímka umožňuje změnu schématu z "a" na "b".
  • Změny ve stavových operacích: Některé operace v dotazech streamování musí udržovat stavová data, aby bylo možné průběžně aktualizovat výsledek. Strukturované streamování automaticky kontroluje stavová data do úložiště odolného proti chybám (například DBFS, Azure Blob Storage) a po restartování je obnoví. Předpokládá se však, že schéma stavových dat zůstává v restartech stejné. To znamená, že mezi restartováními nejsou povolené všechny změny (tj. přidání, odstranění nebo úpravy schématu) stavových operací streamovacího dotazu. Tady je seznam stavových operací, jejichž schéma by se nemělo mezi restartováními měnit, aby se zajistilo obnovení stavu:
    • Agregace streamování: Například sdf.groupBy("a").agg(...). Jakákoli změna počtu nebo typů klíčů nebo agregací není povolena.
    • Odstranění duplicitních dat streamování: Například sdf.dropDuplicates("a"). Jakákoli změna počtu nebo typů klíčů nebo agregací není povolena.
    • Spojení datových proudů: Například sdf1.join(sdf2, ...) (tj. oba vstupy se generují pomocí sparkSession.readStream). Změny ve schématu nebo sloupcích spojování koňovitých nejsou povoleny. Změny typu spojení (vnější nebo vnitřní) nejsou povoleny. Jiné změny v podmínce spojení jsou špatně definované.
    • Libovolná stavová operace: Například sdf.groupByKey(...).mapGroupsWithState(...) nebo sdf.groupByKey(...).flatMapGroupsWithState(...). Jakákoli změna schématu uživatelem definovaného stavu a typ časového limitu není povolený. Všechny změny v rámci uživatelem definované funkce mapování stavu jsou povolené, ale sémantický účinek změny závisí na uživatelsky definované logice. Pokud opravdu chcete podporovat změny schématu stavu, můžete explicitně zakódovat nebo dekódovat složité stavové datové struktury do bajtů pomocí schématu kódování/dekódování, které podporuje migraci schématu. Pokud například uložíte stav jako bajty s kódováním Avro, můžete změnit schéma stavu Avro mezi restartováním dotazu, protože tím se obnoví binární stav.