Konfigurace odvození schématu a vývoje v automatickém zavaděči

Automatický zavaděč můžete nakonfigurovat tak, aby automaticky rozpoznal schéma načtených dat, což umožňuje inicializovat tabulky bez explicitního deklarování schématu dat a vyvíjet schéma tabulky při zavádění nových sloupců. To eliminuje potřebu ručního sledování a použití změn schématu v průběhu času.

Automatický zavaděč může také "zachránit" data, která byla neočekávaná (například u různých datových typů) ve sloupci objektů blob JSON, ke kterým můžete později přistupovat pomocí částečně strukturovaných rozhraní API pro přístup k datům.

Pro odvozování a vývoj schématu se podporují následující formáty:

File format Podporované verze
JSON Všechny verze
CSV Všechny verze
XML Databricks Runtime 14.3 LTS a vyšší
Avro Databricks Runtime 10.4 LTS a vyšší
Parquet Databricks Runtime 11.3 LTS a vyšší
ORC Nepodporované
Text Nepoužitelné (pevné schéma)
Binaryfile Nepoužitelné (pevné schéma)

Syntaxe pro odvozování a vývoj schématu

Určení cílového adresáře pro možnost cloudFiles.schemaLocation umožňuje odvozování a vývoj schématu. Můžete použít stejný adresář, který zadáte pro checkpointLocation. Pokud používáte rozdílové živé tabulky, Azure Databricks spravuje umístění schématu a další informace o kontrolních bodech automaticky.

Poznámka:

Pokud do cílové tabulky načítáte více než jedno zdrojové umístění dat, každá úloha příjmu automatického zavaděče vyžaduje samostatný kontrolní bod streamování.

Následující příklad používá parquet pro cloudFiles.format. Použijte csv, avronebo json pro jiné zdroje souborů. Všechna ostatní nastavení pro čtení a zápis zůstávají stejná pro výchozí chování jednotlivých formátů.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Jak funguje odvození schématu automatického zavaděče?

Pro odvození schématu při prvním čtení dat auto loader vzorkuje prvních 50 GB nebo 1000 souborů, které zjistí, podle toho, který limit je překročen jako první. Auto Loader ukládá informace o schématu v adresáři _schemas nakonfigurované cloudFiles.schemaLocation tak, aby sledoval změny schématu vstupních dat v průběhu času.

Poznámka:

Pokud chcete změnit velikost použité ukázky, můžete nastavit konfigurace SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(bajtový řetězec, například 10gb)

a

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(celé číslo)

Ve výchozím nastavení se při odvozování schématu automatického zavaděče snaží vyhnout problémům s vývojem schématu kvůli neshodám typů. U formátů, které nekódují datové typy (JSON, CSV a XML), auto loader odvodí všechny sloupce jako řetězce (včetně vnořených polí v souborech JSON). U formátů se zadaným schématem (Parquet a Avro) auto loader vzorkuje podmnožinu souborů a slučuje schémata jednotlivých souborů. Toto chování je shrnuto v následující tabulce:

File format Výchozí odvozený datový typ
JSON String
CSV Řetězcové
XML Řetězcové
Avro Typy kódované ve schématu Avro
Parquet Typy kódované ve schématu Parquet

Apache Spark DataFrameReader používá pro odvozování schématu různé chování, výběr datových typů pro sloupce ve zdrojích JSON, CSV a XML na základě ukázkových dat. Chcete-li toto chování povolit pomocí automatického zavaděče, nastavte možnost cloudFiles.inferColumnTypes na truehodnotu .

Poznámka:

Při odvozování schématu dat sdíleného svazku clusteru automaticky zavaděč předpokládá, že soubory obsahují hlavičky. Pokud soubory CSV neobsahují záhlaví, zadejte možnost .option("header", "false"). Kromě toho automaticky zavaděč slučuje schémata všech souborů v ukázce, aby vznikla globální schéma. Auto Loader pak může číst každý soubor podle záhlaví a správně parsovat sdílený svazek clusteru.

Poznámka:

Pokud sloupec obsahuje různé datové typy ve dvou souborech Parquet, automatický zavaděč vybere nejširší typ. Tuto volbu můžete přepsat pomocí funkce schemaHints . Když zadáte rady schématu, automatický zavaděč nepřetypuje sloupec na zadaný typ, ale řekne čtenáři Parquet, aby sloupec přečetl jako zadaný typ. V případě neshody se sloupec zachrání ve sloupci zachovaných dat.

Jak funguje vývoj schématu automatického zavaděče?

Auto Loader zjistí přidání nových sloupců při zpracování dat. Když Auto Loader zjistí nový sloupec, datový proud se zastaví pomocí .UnknownFieldException Než datový proud vyvolá tuto chybu, auto loader provede odvozování schématu v nejnovější mikrodávce dat a aktualizuje umístění schématu s nejnovějším schématem sloučením nových sloupců na konec schématu. Datové typy existujících sloupců zůstávají beze změny.

Databricks doporučuje nakonfigurovat streamy automatického zavaděče s pracovními postupy tak, aby se po takových změnách schématu automaticky restartoval.

Auto Loader podporuje pro vývoj schématu následující režimy, které jste nastavili v možnosti cloudFiles.schemaEvolutionMode:

Režim Chování při čtení nového sloupce
addNewColumns (výchozí) Stream selže. Do schématu se přidají nové sloupce. Existující sloupce se nevyvíjí z datových typů.
rescue Schéma se nikdy nevyvíjí a datový proud se nezdaří kvůli změnám schématu. Všechny nové sloupce jsou zaznamenány ve sloupci zachráněných dat.
failOnNewColumns Stream selže. Stream se nerestartuje, pokud není aktualizované zadané schéma nebo se neodebere datový soubor.
none Nevyvíjí schéma, nové sloupce se ignorují a data se nezachovají, pokud není nastavená rescuedDataColumn možnost. Stream selžou kvůli změnám schématu.

Jak fungují oddíly s automatickým zavaděčem?

Automatický zavaděč se pokusí odvodit sloupce oddílů ze základní adresářové struktury dat, pokud jsou data rozložená v dělení stylu Hive. Například cesta k base_path/event=click/date=2021-04-01/f0.json souboru vede k odvozování date sloupců oddílů a event jako sloupce oddílů. Pokud základní adresářová struktura obsahuje konfliktní oddíly Hive nebo neobsahuje dělení stylu Hive, sloupce oddílů se ignorují.

Binární soubor (binaryFile) a text formáty souborů mají pevná schémata dat, ale podporují odvozování sloupců oddílů. Databricks doporučuje nastavit cloudFiles.schemaLocation pro tyto formáty souborů. Tím se zabrání případným chybám nebo ztrátě informací a zabrání odvozování sloupců oddílů při každém spuštění automatického zavaděče.

Sloupce oddílů se nepovažují za vývoj schématu. Pokud jste měli počáteční adresářovou strukturu jako base_path/event=click/date=2021-04-01/f0.jsona pak začněte přijímat nové soubory jako base_path/event=click/date=2021-04-01/hour=01/f1.json, automatický zavaděč ignoruje sloupec hodin. Pokud chcete zaznamenat informace pro nové sloupce oddílů, nastavte cloudFiles.partitionColumns hodnotu event,date,hour.

Poznámka:

Tato možnost cloudFiles.partitionColumns přebírá čárkami oddělený seznam názvů sloupců. Parsují se jenom sloupce, které existují jako key=value páry ve vaší adresářové struktuře.

Jaký je sloupec zachráněných dat?

Když auto loader odvodí schéma, záchranný datový sloupec se automaticky přidá do schématu jako _rescued_data. Sloupec můžete přejmenovat nebo zahrnout v případech, kdy zadáte schéma nastavením možnosti rescuedDataColumn.

Sloupec zachráněných dat zajišťuje, aby se sloupce, které se neshodují se schématem, zachrání místo vyřazení. Sloupec zachráněných dat obsahuje všechna data, která nejsou analyzována z následujících důvodů:

  • Ve schématu chybí sloupec.
  • Neshody typů
  • Neshody velkých a malých písmen.

Sloupec zachráněných dat obsahuje JSON obsahující záchranné sloupce a cestu ke zdrojovému souboru záznamu.

Poznámka:

Analyzátory JSON a CSV podporují při analýze záznamů tři režimy: PERMISSIVE, DROPMALFORMEDa FAILFAST. Při použití společně s datovým rescuedDataColumntypem neshody nezpůsobí vyřazení záznamů v DROPMALFORMED režimu nebo vyvolání chyby v FAILFAST režimu. Zahození nebo vyvolání chyb, jako jsou neúplné nebo poškozené soubory JSON nebo CSV, se zahodí jenom poškozené záznamy. Pokud použijete badRecordsPath při analýze FORMÁTU JSON nebo CSV, neshody datových typů se při použití objektu rescuedDataColumn. nepovažují za chybné záznamy . Jsou uloženy badRecordsPathpouze neúplné a poškozené záznamy JSON nebo CSV .

Změna chování citlivého na malá a velká písmena

Pokud není povolena citlivost písmen, sloupce abcAbca ABC jsou považovány za stejný sloupec pro účely odvození schématu. Zvolený případ je libovolný a závisí na vzorkovaných datech. Pomocí nápovědy schématu můžete vynutit použití případu. Po provedení výběru a odvození schématu automatický zavaděč nebere v úvahu varianty velikostí písmen, které nebyly vybrány v souladu se schématem.

Pokud je povolený datový sloupec s záchranou, načtou se do sloupce pole pojmenovaná v jiném případě než v případě schématu _rescued_data . Toto chování změňte nastavením možnosti readerCaseSensitive na false, v takovém případě automatický zavaděč čte data bez rozlišování velkých a malých písmen.

Přepsání odvození schématu pomocí tipů schématu

Pomocí nápovědy schématu můžete vynutit informace o schématu, které znáte a očekáváte u odvozeného schématu. Pokud víte, že sloupec je konkrétní datový typ, nebo pokud chcete zvolit obecnější datový typ (například double místo integer), můžete zadat libovolný počet tipů pro datové typy sloupců jako řetězec pomocí syntaxe specifikace schématu SQL, například:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Seznam podporovaných datových typů najdete v dokumentaci k datovým typům .

Pokud sloupec není na začátku datového proudu, můžete tento sloupec přidat do odvozeného schématu pomocí tipů schématu.

Tady je příklad odvozeného schématu pro zobrazení chování s nápovědou schématu.

Odvozené schéma:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Zadáním následujících tipů schématu:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

získáte:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Poznámka:

Podpora nápovědy pro schéma polí a map je dostupná v Databricks Runtime 9.1 LTS a novějších.

Tady je příklad odvozeného schématu se složitými datovými typy, abyste viděli chování pomocí tipů schématu.

Odvozené schéma:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Zadáním následujících tipů schématu:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

získáte:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Poznámka:

Nápovědy schématu se používají pouze v případě, že nezadáte schéma automatickému zavaděče. Můžete použít nápovědu schématu, jestli cloudFiles.inferColumnTypes je povolená nebo zakázaná.