Configuration de l’inférence et de l’évolution de schéma dans Auto Loader

Vous pouvez configurer Auto Loader pour détecter automatiquement le schéma des données chargées, ce qui vous permet d’initialiser des tables sans déclarer explicitement le schéma de données et faire évoluer le schéma de table à mesure que de nouvelles colonnes sont introduites. Cela élimine la nécessité de suivre et d’appliquer manuellement les modifications de schéma au fil du temps.

Le chargeur automatique peut également « récupérer » les données inattendues (par exemple, de différents types de données) dans une colonne BLOB JSON, que vous pouvez choisir d’accéder ultérieurement à l’aide des API d’accès aux données semi-structurées.

Les formats suivants sont pris en charge pour l’inférence de schéma et l’évolution :

Format de fichier Versions prises en charge
JSON Toutes les versions
CSV Toutes les versions
XML Dans Databricks Runtime 14.3 LTS et versions ultérieures
Avro Dans Databricks Runtime 10.4 LTS et versions ultérieures
Parquet Dans Databricks Runtime 11.3 LTS et versions ultérieures
ORC Non pris en charge
Text Non applicable (schéma fixe)
Binaryfile Non applicable (schéma fixe)

Syntaxe pour l’inférence et l’évolution de schéma

La spécification d’un répertoire cible pour l’option cloudFiles.schemaLocation active l’inférence et l’évolution du schéma. Vous pouvez choisir d’utiliser le même répertoire que celui que vous spécifiez pour le checkpointLocation. Si vous utilisez Delta Live Tables, Azure Databricks gère automatiquement l’emplacement du schéma et d’autres informations de point de contrôle.

Notes

Si vous avez plusieurs emplacements de données sources chargés dans la table cible, chaque charge de travail d’ingestion Auto Loader nécessite un point de contrôle de streaming distinct.

L’exemple suivant utilise parquet pour le cloudFiles.format. Utilisez csv, avro ou json pour d’autres sources de fichiers. Tous les autres paramètres de lecture et d’écriture restent les mêmes pour les comportements par défaut pour chaque format.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Comment fonctionne l’inférence de schéma Auto Loader ?

Pour déduire le schéma lors de la première lecture de données, Auto Loader échantillonne les premiers 50 Go ou 1 000 fichiers qu’il découvre, selon la limite atteinte en premier. Auto Loader stocke les informations de schéma dans un répertoire _schemas au cloudFiles.schemaLocation configuré pour suivre les modifications apportées aux données d’entrée au fil du temps.

Notes

Pour modifier la taille de l'échantillon utilisé, vous pouvez définir les configurations SQL :

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(chaîne d'octets, par exemple 10gb)

and

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(entier)

Par défaut, l’inférence de schéma Auto Loader cherche à éviter les problèmes d’évolution du schéma en raison d’incompatibilités de type. Pour les formats qui n’encodent pas les types de données (JSON, CSV et XML), le chargeur automatique déduit toutes les colonnes sous forme de chaînes (y compris les champs imbriqués dans les fichiers JSON). Pour les formats avec schéma typé (Parquet et Avro), Auto Loader échantillonne un sous-ensemble de fichiers et fusionne les schémas de fichiers individuels. Ce comportement est résumé dans le tableau suivant :

Format de fichier Type de données déduites par défaut
JSON String
CSV String
XML String
Avro Types encodés dans le schéma Avro
Parquet Types encodés dans le schéma Parquet

Le DataFrameReader Apache Spark utilise un comportement différent pour l’inférence de schéma, en sélectionnant des types de données pour les colonnes dans des sources JSON, CSV et XML en fonction des exemples de données. Pour activer ce comportement avec Auto Loader, définissez l’option cloudFiles.inferColumnTypes sur true.

Remarque

Lors de l’inférence du schéma des données CSV, Auto Loader part du principe que les fichiers contiennent des en-têtes. Si vos fichiers CSV ne contiennent pas d’en-têtes, fournissez l’option .option("header", "false"). De plus, Auto Loader fusionne les schémas de tous les fichiers de l’exemple pour produire un schéma global. Auto Loader peut ensuite lire chaque fichier en fonction de son en-tête, et analyser le CSV correctement.

Remarque

Lorsqu’une colonne a différents types de données dans deux fichiers Parquet, Auto Loader choisit le type le plus large. Vous pouvez utiliser schemaHints pour remplacer ce choix. Lorsque vous spécifiez des conseils de schéma, Auto Loader ne convertit pas la colonne vers le type spécifié, mais indique au lecteur Parquet de lire la colonne comme type spécifié. Dans le cas d’une incompatibilité, la colonne est sauvée dans la colonne de données sauvée.

Comment fonctionne l’évolution du schéma Auto Loader ?

Auto Loader détecte l’ajout de nouvelles colonnes lors du traitement de vos données. Lorsque le chargeur automatique détecte une nouvelle colonne, le flux s’arrête avec un UnknownFieldException. Avant que votre stream ne génère cette erreur, Auto Loader effectue l’inférence de schéma sur le dernier micro-batch de données et met à jour l’emplacement du schéma avec le schéma le plus récent en fusionnant les nouvelles colonnes à la fin du schéma. Les types de données des colonnes existantes restent inchangés.

Databricks recommande de configurer des flux de chargement automatique avec des workflows pour redémarrer automatiquement après ces modifications de schéma.

Auto Loader prend en charge les modes suivants pour l'évolution des schémas, que vous définissez dans l'option cloudFiles.schemaEvolutionMode :

Mode Comportement lors de la lecture de la nouvelle colonne
addNewColumns (valeur par défaut) Échec du flux. De nouvelles colonnes sont ajoutées au schéma. Les colonnes existantes ne font pas évoluer les types de données.
rescue Le schéma n’a jamais évolué et le flux n’échoue pas en raison de modifications de schéma. Toutes les nouvelles colonnes sont enregistrées dans la colonne de données sauvée.
failOnNewColumns Échec du flux. Le flux ne redémarre pas, sauf si le schéma fourni est mis à jour ou si le fichier de données incriminé est supprimé.
none Ne fait pas évoluer le schéma, les nouvelles colonnes sont ignorées et les données ne sont pas récupérés, sauf si l’option rescuedDataColumn est définie. Le flux n’échoue pas en raison des modifications de schéma.

Comment fonctionnent les partitions avec Auto Loader ?

Auto Loader tente de déduire les colonnes de partition à partir de la structure de répertoire sous-jacente des données si les données sont présentées dans un partitionnement de style Hive. Par exemple, le chemin de fichier base_path/event=click/date=2021-04-01/f0.json entraîne l'inférence de date et event comme colonnes de partition. Si la structure de répertoire sous-jacente contient des partitions Hive conflictuelles ou ne contient pas de partition de style Hive, les colonnes de partition sont ignorées.

Les formats de fichiers binaires (binaryFile) et text ont des schémas de données fixes, mais prennent en charge l'inférence de colonnes de partition. Databricks recommande de définir cloudFiles.schemaLocation pour ces formats de fichier. Cela évite toute erreur potentielle ou perte d’informations et empêche l’inférence des colonnes de partitions chaque fois qu’Auto Loader commence.

Les colonnes de partition ne sont pas prises en compte pour l’évolution du schéma. Si vous aviez une structure de répertoire initiale comme base_path/event=click/date=2021-04-01/f0.json, et que vous commencez ensuite à recevoir de nouveaux fichiers comme base_path/event=click/date=2021-04-01/hour=01/f1.json, Auto Loader ignore la colonne de l’heure. Pour capturer les informations de nouvelles colonnes de partition, définissez cloudFiles.partitionColumns sur event,date,hour.

Notes

L’option cloudFiles.partitionColumns effectue une liste de noms de colonnes séparée par des virgules Seules les colonnes qui existent en tant que paires key=value dans votre structure de répertoires sont analysées.

Qu’est-ce que la colonne de données récupérées ?

Quand Auto Loader déduit le schéma, une colonne de données récupérées est automatiquement ajoutée à votre schéma en tant que _rescued_data. Vous pouvez renommer la colonne ou l’inclure dans les cas où vous fournissez un schéma en définissant l’option rescuedDataColumn.

La colonne de données récupérées garantit que les colonnes qui ne correspondent pas au schéma sont sauvées au lieu d’être supprimées. La colonne de données récupérées contient toutes les données qui ne sont pas analysées pour les raisons suivantes :

  • La colonne est manquante dans le schéma.
  • Incompatibilités de type.
  • Incompatibilités de casse.

La colonne des données récupérées contient un fichier JSON contenant les colonnes sauvées et le chemin du fichier source de l'enregistrement.

Notes

Les analyseurs JSON et CSV prennent en charge trois modes d'analyse des enregistrements : PERMISSIVE, DROPMALFORMED, et FAILFAST. En cas d’utilisation avec rescuedDataColumn, les discordances de type de données n’entraînent pas de suppression d’enregistrements en mode DROPMALFORMED, ou de génération d’erreur en mode FAILFAST. Seuls les enregistrements corrompus, tels que des enregistrements JSON ou CSV incomplets ou mal formés, sont annulés ou génèrent des erreurs. Si vous utilisez badRecordsPath lors de l’analyse des enregistrements JSON ou CSV, les discordances de types de données ne sont pas considérées comme des enregistrements incorrects lors de l’utilisation de rescuedDataColumn. Seuls les enregistrements JSON ou CSV incomplets et mal formés sont stockés dans badRecordsPath.

Modifier le comportement sensible à la casse

À moins que la sensibilité à la casse ne soit activée, les colonnes abc, Abc et ABC sont considérées comme la même colonne aux fins d’inférence de schéma. Le choix de la casse qui sera retenue est arbitraire et dépend des données échantillonnées. Vous pouvez utiliser des indicateurs de schéma pour imposer la casse à utiliser. Une fois qu’une sélection a été effectuée et que le schéma est déduit, le chargeur automatique ne considère pas les variantes de casse qui n’ont pas été sélectionnées conformes au schéma.

Lorsque la colonne de données sauvée est activée, les champs nommés dans une casse autre que celle du schéma sont chargés dans la colonne _rescued_data. Modifiez ce comportement en définissant l’option readerCaseSensitive sur false, auquel cas Auto Loader lit les données de manière sans respect de la casse.

Remplacer l’inférence de schéma avec les indicateurs de schéma

Vous pouvez utiliser des indicateurs de schéma pour appliquer les informations de schéma que vous connaissez et attendez-vous sur un schéma déduit. Lorsque vous savez qu'une colonne est d'un type de données spécifique, ou si vous voulez choisir un type de données plus général (par exemple, un double au lieu d’un integer), vous pouvez fournir un nombre arbitraire d'indices pour les types de données de colonne sous forme de chaîne en utilisant la syntaxe de spécification de schéma SQL, comme suit :

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Consultez la documentation sur les types de données pour obtenir la liste des types de données pris en charge.

Si une colonne n’est pas présente au début du stream, vous pouvez également utiliser des indicateurs de schéma pour ajouter cette colonne au schéma déduit.

Voici un exemple de schéma déduit pour voir le comportement avec des indicateurs de schéma.

Schéma déduit:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

En spécifiant les indicateurs de schéma suivants :

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

Vous obtenez :

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Notes

La prise en charge des indicateurs de schémas Array et Map est disponible dans Databricks Runtime 9.1 LTS et versions ultérieures.

Voici un exemple de schéma déduit avec des types de données complexes pour voir le comportement avec des indicateurs de schéma.

Schéma déduit:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

En spécifiant les indicateurs de schéma suivants :

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

Vous obtenez :

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Notes

Les indicateurs de schéma sont utilisés uniquement si vous ne fournissez pas de schéma à Auto Loader. Vous pouvez utiliser les indicateurs de schéma que cloudFiles.inferColumnTypes soit activé ou désactivé.