Utiliser le flux des changements de données Delta Lake sur Azure Databricks

Remarque

Le flux des changements de données permet à Azure Databricks de suivre les changements au niveau des lignes entre les versions d’une table Delta. Lorsqu’il est activé sur une table Delta, le runtime enregistre les événements de changement pour toutes les données écrites dans la table. Cela comprend les données des lignes ainsi que les métadonnées indiquant si la ligne spécifiée a été insérée, supprimée ou mise à jour.

Vous pouvez lire les événements de changement dans des requêtes par lots avec Spark SQL, Apache Spark DataFrames et Structured Streaming.

Important

Le flux des changements de données fonctionne en tandem avec l’historique des tables pour fournir des informations sur les modifications. Étant donné que le clonage d’une table Delta crée un historique distinct, le flux des changements de données sur les tables clonées ne correspond pas à celui de la table d’origine.

Cas d'utilisation

Le flux des changements de données n’est pas activé par défaut. Les cas d’usage suivants doivent motiver l’activation du flux des changements de données.

  • Tables Silver et Gold : améliorez les performances de Delta Lake en traitant uniquement les changements au niveau des lignes à la suite d’opérations MERGE, UPDATE ou DELETE initiales afin d’accélérer et de simplifier les opérations ETL et ELT.
  • Vues matérialisées : créez des vues agrégées et à jour des informations à utiliser dans BI et à des fins d’analytique sans avoir à retraiter les tables sous-jacentes complètes, au lieu de mettre à jour uniquement l’endroit où les changements ont été apportés.
  • Transmettre les changements : envoyez un flux des changements de données aux systèmes en aval, tels que Kafka ou SGBDR, qui peuvent l’utiliser pour un traitement incrémentiel lors des phases ultérieures des pipelines de données.
  • Table de piste d’audit : la capture du flux des changements de données sous forme de table Delta fournit un stockage perpétuel et une capacité de requête efficace pour voir toutes les changements au fil du temps, y compris le moment où les suppressions se produisent et quelles mises à jour sont effectuées.

Activer le flux des changements de données

Vous devez activer explicitement l’option de flux des changements de données en appliquant l’une des méthodes suivantes :

  • Nouvelle table : définissez la propriété de table delta.enableChangeDataFeed = true dans la commande CREATE TABLE.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Table existante : définissez la propriété de table delta.enableChangeDataFeed = true dans la commande ALTER TABLE.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Toutes les nouvelles tables :

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Important

Seules les changements apportés après l’activation du flux des changements de données sont enregistrés ; les changements antérieurs apportés à une table ne sont pas capturés.

Stockage des changements de données

Azure Databricks enregistre les changements de données pour les opérations UPDATE, DELETE et MERGE dans le dossier _change_data sous le répertoire de la table. Certaines opérations, telles que les opérations d’insertion uniquement et les suppressions de partitions complètes, ne génèrent pas de données dans le répertoire _change_data, car Azure Databricks peut calculer efficacement le flux des changements de données directement à partir du journal des transactions.

Les fichiers figurant dans le dossier _change_data suivent la stratégie de rétention de la table. Par conséquent, si vous exécutez la commande VACUUM, les données du flux des changements de données sont également supprimées.

Lire les changements dans des requêtes par lots

Vous pouvez fournir une version ou un horodatage pour le début et la fin. Les versions et horodatages de début et de fin sont inclusifs dans les requêtes. Pour lire les changements entre une version de début particulière et la version la plus récente de la table, spécifiez uniquement la version ou l’horodatage de début.

Vous spécifiez une version sous la forme d’un entier et un horodatage sous la forme d’une chaîne au format yyyy-MM-dd[ HH:mm:ss[.SSS]].

Si vous fournissez une version inférieure ou un horodatage antérieur à une version ou un horodatage qui contient des événements de changements enregistrés (autrement dit, antérieur au moment où le flux des changements de données a été activé), une erreur est générée et indique que le flux des changements de données n’a pas été activé.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Lire les changements dans des requêtes de streaming

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Pour récupérer les changements de données lors de la lecture de la table, affectez la valeur true à l’option readChangeFeed. startingVersion et startingTimestamp sont facultatifs et, s’ils ne sont pas fournis, le flux retourne l’instantané le plus récent de la table au moment du streaming en tant que INSERT et les changements ultérieurs en tant que changements de données. Des options telles que les limites de débit (maxFilesPerTrigger, maxBytesPerTrigger) et excludeRegex sont également prises en charge lors de la lecture des changements de données.

Notes

La limitation du débit peut être atomique pour les versions autres que la version de l’instantané de départ. Autrement dit, l’intégralité de la version de validation aura un débit limité ou la validation entière sera retournée.

Par défaut, si un utilisateur passe une version ou un horodatage dépassant la dernière validation sur une table, l’erreur timestampGreaterThanLatestCommit est levée. Dans Databricks Runtime 11.3 LTS et version ultérieure, le flux des changements de données peut gérer le cas de version hors plage si l’utilisateur définit la configuration suivante sur true :

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Si vous fournissez une version de début supérieure à la dernière validation sur une table ou un horodatage de début plus récent que la dernière validation sur une table, lorsque la configuration précédente est activée, un résultat de lecture vide est retourné.

Si vous fournissez une version de fin supérieure à la dernière validation sur une table ou un horodatage de fin plus récent que la dernière validation sur une table, lorsque la configuration précédente est activée en mode lecture en bloc, tous les changements entre la version de début et la dernière validation doivent être retournés.

Quel est le schéma du flux des changements de données ?

Lorsque vous lisez à partir du flux des changements de données d’une table, le schéma de la dernière version de table est utilisé.

Notes

La plupart des opérations de modification et d’évolution du schéma sont entièrement prises en charge. La table avec le mappage de colonnes activé ne prend pas en charge tous les cas d’usage et fait preuve d’un comportement différent. Consultez Limitations du flux des changements de données pour des tables sur lesquelles le mappage de colonnes est activé.

Outre les colonnes de données du schéma de la table Delta, le flux des changements de données contient des colonnes de métadonnées qui identifient le type d’événement de modification :

Nom de la colonne Type Valeurs
_change_type String insert, update_preimage , update_postimage, delete(1)
_commit_version Long Journal Delta ou version de la table contenant le changement.
_commit_timestamp Timestamp Horodatage associé à la création de la validation.

(1)preimage est la valeur avant la mise à jour, postimage est la valeur après la mise à jour.

Notes

Vous ne pouvez pas activer le flux des changements de données sur une table si le schéma contient des colonnes portant les mêmes noms que ces colonnes ajoutées. Renommez les colonnes de la table pour résoudre ce conflit avant d’essayer d’activer le flux des changements de données.

Limitations du flux des changements de données pour des tables sur lesquelles le mappage de colonnes est activé

Une fois le mappage de colonnes activé sur une table Delta, vous pouvez supprimer ou renommer des colonnes dans la table sans réécrire des fichiers de données pour des données existantes. Avec le mappage de colonnes activé, le flux des changements de données présente des limitations après avoir effectué des modifications de schéma non additives, telles que le renommage ou la suppression d’une colonne, la modification du type de données ou des modifications de possibilité de valeurs nulles.

Important

  • Vous ne pouvez pas lire le flux des changements de données pour une transaction ou une plage dans laquelle une modification de schéma non additive se produit à l’aide de la sémantique incrémentielle.
  • Dans Databricks Runtime 13.0 et versions antérieures, les tables sur lesquelles le mappage de colonnes est activé et qui ont fait l’objet de modifications de schéma non additives ne prennent pas en charge les lectures de diffusion en continu sur le flux des changements de données. Consultez Diffusion en continu avec un mappage de colonnes et des modifications de schéma.
  • Dans Databricks Runtime 12.0 et versions antérieures, vous ne pouvez pas lire le flux des changements de données pour des tables pour lesquelles le mappage de colonnes est activé et qui ont fait l’objet d’un changement de nom ou d’une suppression de colonne.

Dans Databricks Runtime 12.1 et versions ultérieures, vous pouvez effectuer des lectures par lot sur le flux des changements de données pour des tables sur lesquelles le mappage de colonnes est activé et qui ont fait l’objet de modifications de schéma non additives. Au lieu d’utiliser le schéma de la dernière version de la table, les opérations de lecture utilisent le schéma de la version de fin de la table spécifiée dans la requête. Les requêtes échouent toujours si la plage de versions spécifiée s’étend sur une modification de schéma non additive.

Forum Aux Questions (FAQ)

Quelle est la surcharge liée à l’activation du flux des changements de données ?

Il n’y a pas d’impact significatif. Les enregistrements de changement de données sont générés inline pendant le processus d’exécution de la requête, et sont généralement bien plus petits que la taille totale des fichiers réécrits.

Quelle est la stratégie de rétention pour les enregistrements de changement ?

Les enregistrements de changement suivent la même stratégie de rétention que les versions de table obsolètes, et sont nettoyés par le biais de VACUUM s’ils sont en dehors de la période de rétention spécifiée.

Quand les nouveaux enregistrements sont-ils disponibles dans le flux des changements de données ?

Les changements de données sont validés avec la transaction Delta Lake, et sont disponibles en même temps que les nouvelles données sont disponibles dans la table.

Exemple de Notebook : Propager les changements avec le flux de données de changement Delta

Ce notebook montre comment propager les changements apportés à une table Silver du nombre absolu de vaccinations vers une table Gold des taux de vaccination.

Notebook de flux des changements de données

Obtenir le notebook