Partage via


Utiliser le flux des changements de données Delta Lake sur Azure Databricks

Le flux des changements de données permet à Azure Databricks de suivre les changements au niveau des lignes entre les versions d’une table Delta. Lorsqu’il est activé sur une table Delta, le runtime enregistre les événements de changement pour toutes les données écrites dans la table. Cela comprend les données des lignes ainsi que les métadonnées indiquant si la ligne spécifiée a été insérée, supprimée ou mise à jour.

Important

Le flux des changements de données fonctionne en tandem avec l’historique des tables pour fournir des informations sur les modifications. Étant donné que le clonage d’une table Delta crée un historique distinct, le flux des changements de données sur les tables clonées ne correspond pas à celui de la table d’origine.

Traiter les changements de données de façon incrémentielle

Databricks recommande d’utiliser le flux de changements de données en combinaison avec Flux structuré pour traiter de manière incrémentielle les modifications de tables Delta. Vous devez utiliser Flux structuré pour qu’Azure Databricks suive automatiquement les versions du flux de changements de données de votre table.

Remarque

Delta Live Tables fournit des fonctionnalités permettant de propager facilement les changements de données et de stocker les résultats sous forme de tables SCD (dimension à variation lente) de type 1 ou de type 2. Consultez API APPLY CHANGES : simplifier la capture des changements de données dans Delta Live Tables.

Pour lire le flux de changements de données d’une table, vous devez activer le flux de changements de données sur cette table. Consultez Activer le flux des changements de données.

Définissez l’option readChangeFeed sur true lors de la configuration d’un flux sur une table pour lire le flux de changements de données, comme illustré dans l’exemple de syntaxe suivant :

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Par défaut, le flux retourne la dernière capture instantanée de la table lorsque le flux commence en tant que INSERT et les modifications futures en tant que changements de données.

Les validations de changements de données dans le cadre de la transaction Delta Lake sont disponibles en même temps que les nouvelles validations de données dans la table.

Vous pouvez éventuellement spécifier une version de départ. Consultez Dois-je spécifier une version de départ ?.

Le flux de changements de données prend également en charge l’exécution par lots, ce qui nécessite la spécification d’une version de départ. Consultez Lire les changements dans des requêtes par lots.

Des options telles que les limites de débit (maxFilesPerTrigger, maxBytesPerTrigger) et excludeRegex sont également prises en charge lors de la lecture des changements de données.

La limitation du débit peut être atomique pour les versions autres que la version de l’instantané de départ. Autrement dit, l’intégralité de la version de validation aura un débit limité ou la validation entière sera retournée.

Dois-je spécifier une version de départ ?

Vous pouvez éventuellement spécifier une version de départ si vous souhaitez ignorer les modifications qui se sont produites avant une certaine version. Vous pouvez spécifier une version à l’aide d’un horodatage ou du numéro d’ID de version enregistré dans le journal des transactions Delta.

Remarque

Une version de départ est requise pour les lectures par lots, et de nombreux modèles par lots peuvent tirer parti de la définition de version de fin facultative.

Lorsque vous configurez des charges de travail Flux structuré impliquant un flux de changements de données, il est important de comprendre pourquoi la spécification d’une version de départ a un impact sur le traitement.

De nombreuses charges de travail de flux, en particulier les nouveaux pipelines de traitement des données, bénéficient du comportement par défaut. Avec le comportement par défaut, le premier lot est traité lorsque le flux enregistre pour la première fois tous les enregistrements existants dans la table en tant qu’opérations INSERT dans le flux de changements de données.

Si votre table cible contient déjà tous les enregistrements avec des modifications appropriées jusqu’à un certain point, indiquez une version de départ pour éviter de traiter l’état de la table source en tant qu’événements INSERT.

L’exemple de syntaxe suivant récupère à partir d’un échec de flux dans lequel le point de contrôle a été endommagé. Dans cet exemple, supposons les conditions suivantes :

  1. Le flux de changements de données a été activé sur la table source lors de la création de la table.
  2. La table en aval cible a traité toutes les modifications jusqu’à la version 75.
  3. L’historique des versions de la table source est disponible pour les versions 70 et ultérieures.

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

Dans cet exemple, vous devez également spécifier un nouvel emplacement de point de contrôle.

Important

Si vous spécifiez une version de départ, le flux ne parvient pas à démarrer à partir d’un nouveau point de contrôle si la version de départ n’est plus présente dans l’historique des tables. Delta Lake nettoie automatiquement les versions historiques, ce qui signifie que toutes les versions de départ spécifiées sont finalement supprimées.

Consultez Puis-je utiliser le flux de changements de données pour relire l’historique complet d’une table ?.

Lire les changements dans des requêtes par lots

Vous pouvez utiliser la syntaxe de requête par lots pour lire toutes les modifications à partir d’une version particulière ou pour lire les modifications dans une plage de versions spécifiée.

Vous spécifiez une version sous la forme d’un entier et un horodatage sous la forme d’une chaîne au format yyyy-MM-dd[ HH:mm:ss[.SSS]].

Les versions de début et de fin sont inclusives dans les requêtes. Pour lire les changements entre une version de début particulière et la version la plus récente de la table, spécifiez uniquement la version de début.

Si vous fournissez une version inférieure ou un horodatage antérieur à une version ou un horodatage qui contient des événements de changements enregistrés (autrement dit, antérieur au moment où le flux des changements de données a été activé), une erreur est générée et indique que le flux des changements de données n’a pas été activé.

Les exemples de syntaxe suivants illustrent l’utilisation des options de version de début et de fin avec des lectures par lots :

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Remarque

Par défaut, si un utilisateur passe une version ou un horodatage dépassant la dernière validation sur une table, l’erreur timestampGreaterThanLatestCommit est levée. Dans Databricks Runtime 11.3 LTS et version ultérieure, le flux des changements de données peut gérer le cas de version hors plage si l’utilisateur définit la configuration suivante sur true :

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Si vous fournissez une version de début supérieure à la dernière validation sur une table ou un horodatage de début plus récent que la dernière validation sur une table, lorsque la configuration précédente est activée, un résultat de lecture vide est retourné.

Si vous fournissez une version de fin supérieure à la dernière validation sur une table ou un horodatage de fin plus récent que la dernière validation sur une table, lorsque la configuration précédente est activée en mode lecture en bloc, tous les changements entre la version de début et la dernière validation doivent être retournés.

Quel est le schéma du flux des changements de données ?

Lorsque vous lisez à partir du flux des changements de données d’une table, le schéma de la dernière version de table est utilisé.

Notes

La plupart des opérations de modification et d’évolution du schéma sont entièrement prises en charge. La table avec le mappage de colonnes activé ne prend pas en charge tous les cas d’usage et fait preuve d’un comportement différent. Consultez Limitations du flux des changements de données pour des tables sur lesquelles le mappage de colonnes est activé.

Outre les colonnes de données du schéma de la table Delta, le flux des changements de données contient des colonnes de métadonnées qui identifient le type d’événement de modification :

Nom de la colonne Type Valeurs
_change_type String insert, update_preimage , update_postimage, delete(1)
_commit_version Long Journal Delta ou version de la table contenant le changement.
_commit_timestamp Timestamp Horodatage associé à la création de la validation.

(1)preimage est la valeur avant la mise à jour, postimage est la valeur après la mise à jour.

Notes

Vous ne pouvez pas activer le flux des changements de données sur une table si le schéma contient des colonnes portant les mêmes noms que ces colonnes ajoutées. Renommez les colonnes de la table pour résoudre ce conflit avant d’essayer d’activer le flux des changements de données.

Activer le flux des changements de données

Vous pouvez uniquement lire le flux de changements de données pour les tables activées. Vous devez activer explicitement l’option de flux des changements de données en appliquant l’une des méthodes suivantes :

  • Nouvelle table : définissez la propriété de table delta.enableChangeDataFeed = true dans la commande CREATE TABLE.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Table existante : définissez la propriété de table delta.enableChangeDataFeed = true dans la commande ALTER TABLE.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Toutes les nouvelles tables :

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Important

Seules les modifications apportées après avoir activé le flux de changements de données sont enregistrées. Les modifications apportées à une table dans le passé ne sont pas capturées.

Stockage des changements de données

L’activation du flux de changement de données entraîne une petite augmentation des coûts de stockage d’une table. Les enregistrements de changements de données sont générés à mesure que la requête s’exécute et sont généralement bien plus petits que la taille totale des fichiers réécrits.

Azure Databricks enregistre les changements de données pour les opérations UPDATE, DELETE et MERGE dans le dossier _change_data sous le répertoire de la table. Certaines opérations, telles que les opérations d’insertion uniquement et les suppressions de partitions complètes, ne génèrent pas de données dans le répertoire _change_data, car Azure Databricks peut calculer efficacement le flux des changements de données directement à partir du journal des transactions.

Toutes les lectures sur les fichiers de données dans le dossier _change_data doivent passer par les API Delta Lake prises en charge.

Les fichiers figurant dans le dossier _change_data suivent la stratégie de rétention de la table. Les données de flux de changements de données sont supprimées lorsque la commande VACUUM s’exécute.

Puis-je utiliser le flux de changements de données pour relire l’historique complet d’une table ?

Le flux de changements de données n’est pas destiné à servir d’enregistrement permanent de toutes les modifications apportées à une table. Le flux de changements de données enregistre uniquement les modifications qui se produisent après son activation.

Le flux de changements de données et Delta Lake vous permettent de toujours reconstruire un instantané complet d’une table source, ce qui signifie que vous pouvez démarrer une nouvelle lecture en flux sur une table avec un flux de changements de données activé et capturer la version actuelle de cette table et toutes les modifications qui se produisent après.

Vous devez traiter les enregistrements dans le flux de changements de données comme temporaires et uniquement accessibles pour une fenêtre de rétention spécifiée. Le journal des transactions Delta supprime les versions de table et leurs versions de flux de changements de données correspondantes à intervalles réguliers. Lorsqu’une version est supprimée du journal des transactions, vous ne pouvez plus lire le flux de changements de données de cette version.

Si votre cas nécessite de conserver un historique permanent de toutes les modifications apportées à une table, vous devez utiliser la logique incrémentielle pour écrire des enregistrements du flux de changements de données vers une nouvelle table. L’exemple de code suivant illustre l’utilisation de trigger.AvailableNow, qui tire parti du traitement incrémentiel de Flux structuré, mais traite les données disponibles en tant que charge de travail par lots. Vous pouvez planifier cette charge de travail de manière asynchrone avec vos principaux pipelines de traitement pour créer une sauvegarde du flux de changements de données à des fins d’audit ou pour une relecture complète.

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Limitations du flux des changements de données pour des tables sur lesquelles le mappage de colonnes est activé

Une fois le mappage de colonnes activé sur une table Delta, vous pouvez supprimer ou renommer des colonnes dans la table sans réécrire des fichiers de données pour des données existantes. Avec le mappage de colonnes activé, le flux des changements de données présente des limitations après avoir effectué des modifications de schéma non additives, telles que le renommage ou la suppression d’une colonne, la modification du type de données ou des modifications de possibilité de valeurs nulles.

Important

  • Vous ne pouvez pas lire le flux des changements de données pour une transaction ou une plage dans laquelle une modification de schéma non additive se produit à l’aide de la sémantique incrémentielle.
  • Dans Databricks Runtime 12.2 LTS et versions antérieures, les tables sur lesquelles le mappage de colonnes est activé et qui ont fait l’objet de modifications de schéma non additives ne prennent pas en charge les lectures de diffusion en continu sur le flux des changements de données. Consultez Diffusion en continu avec un mappage de colonnes et des modifications de schéma.
  • Dans Databricks Runtime 11.3 LTS et versions antérieures, vous ne pouvez pas lire le flux des changements de données pour des tables pour lesquelles le mappage de colonnes est activé et qui ont fait l’objet d’un changement de nom ou d’une suppression de colonne.

Dans Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez effectuer des lectures par lot sur le flux des changements de données pour des tables sur lesquelles le mappage de colonnes est activé et qui ont fait l’objet de modifications de schéma non additives. Au lieu d’utiliser le schéma de la dernière version de la table, les opérations de lecture utilisent le schéma de la version de fin de la table spécifiée dans la requête. Les requêtes échouent toujours si la plage de versions spécifiée s’étend sur une modification de schéma non additive.