Share via


Tutoriel : exécuter votre premier pipeline Delta Live Tables

Important

Les pipelines DLT serverless sont en préversion publique. Pour en savoir plus sur l’activation des pipelines DLT serverless, contactez votre équipe de compte Azure Databricks.

Ce tutoriel vous montre comment configurer un pipeline Delta Live Tables à partir du code dans un notebook Databricks et exécuter le pipeline en déclenchant une mise à jour de pipeline. Ce tutoriel inclut un exemple de pipeline pour ingérer et traiter un exemple de jeu de données avec un exemple de code utilisant les interfaces Python et SQL. Vous pouvez également utiliser les instructions de ce tutoriel pour créer un pipeline avec n’importe quel notebook avec une syntaxe Delta Live Tables correctement définie.

Vous pouvez configurer des pipelines Delta Live Tables et déclencher des mises à jour en tirant parti de l’interface utilisateur d’espace de travail Azure Databricks ou des options d’outil automatisé, telles que l’API ou l’interface CLI, les packs de ressources Databricks ou comme tâche dans un flux de travail Databricks. Pour vous familiariser avec les fonctionnalités de Delta Live Tables, Databricks recommande d’abord d’utiliser l’interface utilisateur pour créer et exécuter des pipelines. En outre, lorsque vous configurez un pipeline dans l’interface utilisateur, Delta Live Tables génère une configuration JSON pour votre pipeline qui peut être utilisée pour mettre en œuvre vos flux de travail programmatiques.

Pour illustrer la fonctionnalité Delta Live Tables, les exemples de ce tutoriel téléchargent un jeu de données disponible publiquement. Toutefois, Databricks dispose de plusieurs façons de se connecter à des sources de données et d’ingérer des données qui seront utilisées par les pipelines qui mettent en œuvre des cas d’usage réels. Consultez Ingérer des données avec Delta Live Tables.

Spécifications

  • Pour démarrer un pipeline non serverless, vous devez disposer de l’autorisation de création de cluster ou d’un accès à une stratégie de cluster définissant un cluster Delta Live Tables. Le runtime Delta Live Tables crée un cluster avant d’exécuter votre pipeline et échoue si vous n’avez pas l’autorisation appropriée.

  • Pour utiliser les exemples de ce tutoriel, Unity Catalog doit être activé dans votre espace de travail.

  • Vous devez disposer des autorisations suivantes dans Unity Catalog :

    • READ VOLUME et WRITE VOLUME, ou ALL PRIVILEGES, pour le volume my-volume.
    • USE SCHEMA ou ALL PRIVILEGES pour le schéma default.
    • USE CATALOG ou ALL PRIVILEGES pour le catalogue main.

    Pour définir ces autorisations, contactez votre administrateur Databricks ou consultez Privilèges et objets sécurisables dans Unity Catalog.

  • Les exemples de ce tutoriel utilisent un volume Unity Catalog pour stocker des exemples de données. Pour utiliser ces exemples, créez un volume et utilisez les noms de catalogue, de schéma et de volume associés à ce volume afin de définir le chemin d’accès au volume utilisé par les exemples.

Remarque

Si votre espace de travail n’est pas activé pour Unity Catalog, les notebooks avec des exemples qui ne nécessitent pas Unity Catalog sont attachés à cet article. Pour utiliser ces exemples, sélectionnez Hive metastore comme option de stockage lorsque vous créez le pipeline.

Où exécutez-vous des requêtes Delta Live Tables ?

Les requêtes Delta Live Tables sont principalement implémentées dans les notebooks Databricks, mais Delta Live Tables n’est pas conçu pour être exécuté de manière interactive dans les cellules du notebook. L’exécution d’une cellule qui contient la syntaxe Delta Live Tables dans un notebook Databricks génère un message d’erreur. Pour exécuter vos requêtes, vous devez configurer vos notebooks dans le cadre d’un pipeline.

Important

  • Vous ne pouvez pas vous appuyer sur l’ordre d’exécution de cellule par cellule des notebooks lors de l’écriture de requêtes pour Delta Live Tables. Delta Live Tables évalue et exécute tout le code défini dans les notebooks, mais a un modèle d’exécution différent de celui d’une commande de notebook Exécuter tout.
  • Vous ne pouvez pas combiner de langages dans un seul fichier de code source Delta Live Tables. Par exemple, un notebook peut contenir uniquement des requêtes Python ou des requêtes SQL. Si vous devez utiliser plusieurs langues dans un pipeline, utilisez plusieurs notebooks ou fichiers spécifiques à la langue dans le pipeline.

Vous pouvez également utiliser du code Python stocké dans des fichiers. Par exemple, vous pouvez créer un module Python qui peut être importé dans vos pipelines Python ou définir des fonctions définies par l’utilisateur Python à utiliser dans les requêtes SQL. Pour en savoir plus sur l’importation de modules Python, consultez Importer des modules Python à partir de dossiers Git ou de fichiers d’espace de travail. Pour en savoir plus sur les fonctions définies par l’utilisateur en langage Python, veuillez consulter la rubrique Fonctions scalaires définies par l’utilisateur - Python.

Exemple : ingérer et traiter les données sur les prénoms des bébés à New York

L’exemple de cet article utilise un jeu de données disponible publiquement qui contient des enregistrements de prénoms de bébés dans l’État de New York. Ces exemples illustrent l’utilisation d’un pipeline Delta Live Tables pour :

  • Lire les données CSV brutes d’un jeu de données publiquement disponible dans une table.
  • Lire les enregistrements de la table de données brutes et utiliser les attentes de Delta Live Tables pour créer une table contenant des données nettoyées.
  • Utiliser les enregistrements nettoyés comme entrées pour les requêtes Delta Live Tables qui créent des jeux de données dérivés.

Ce code illustre un exemple simplifié de l’architecture de médaillon. Consultez Qu’est-ce que l’architecture de médaillon dans un lakehouse ?.

Les mises en œuvre de cet exemple sont fournies pour les interfaces Python et SQL . Vous pouvez suivre les étapes de création de notebooks qui contiennent l’exemple de code, ou vous pouvez passer directement à Créer un pipeline et utiliser l’un des notebooks fournis sur cette page.

Implémenter un pipeline Delta Live Tables avec Python

Le code Python qui crée des jeux de données Delta Live Tables doit retourner des DataFrames, ce qui est familier aux utilisateurs ayant de l'expérience avec PySpark ou Pandas pour Spark. Pour les utilisateurs qui ne connaissent pas les DataFrames, Databricks recommande d’utiliser l’interface SQL. Consultez Implémenter un pipeline Delta Live Tables avec SQL.

Toutes les API Python Delta Live Tables sont implémentées dans le module dlt. Votre code de pipeline Delta Live Tables implémenté avec Python doit importer explicitement le module dlt en haut des notebooks et fichiers Python. Delta Live Tables diffère de nombreux scripts Python d’une façon clé : vous n’appelez pas les fonctions qui effectuent l’ingestion et la transformation des données pour créer des jeux de données Delta Live Tables. Au lieu de cela, Delta Live Tables interprète les fonctions de décorateur du module dlt dans tous les fichiers chargés dans un pipeline et génère un graphe de flux de données.

Pour implémenter l’exemple dans ce tutoriel, copiez et collez le code Python suivant dans un nouveau notebook Python. Vous devez ajouter chaque exemple d’extrait de code à sa propre cellule dans le notebook, dans l’ordre décrit. Pour consulter les options de création de notebooks, consultez Créer un notebook.

Remarque

Lorsque vous créez un pipeline avec l’interface Python, par défaut, les noms des tables sont définis par les noms des fonctions. Par exemple, l’exemple Python suivant crée trois tables nommées baby_names_raw, baby_names_prepared et top_baby_names_2021. Vous pouvez remplacer le nom de la table à l’aide du paramètre name. Consultez Créer une vue matérialisée ou une table de diffusion en continu Delta Live Tables.

Importer le module Delta Live Tables

Toutes les API Python Delta Live Tables sont implémentées dans le module dlt. Importez explicitement le module dlt en haut des notebooks et fichiers Python.

L’exemple suivant montre cette importation, ainsi que les instructions d’importation pour pyspark.sql.functions.

import dlt
from pyspark.sql.functions import *

Télécharger les données

Pour obtenir les données de cet exemple, vous téléchargez un fichier CSV et le stockez dans le volume, comme suit :

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

Remplacez <catalog-name>, <schema-name> et <volume-name> par les noms de catalogue, de schéma et de volume d’un volume Unity Catalog.

Créer une table à partir de fichiers dans le stockage d’objets

Delta Live Tables prend en charge le chargement de données à partir de tous les formats pris en charge par Azure Databricks. Consultez Options de format de données.

L’élément décoratif @dlt.table indique à Delta Live Tables de créer une table qui contient le résultat d’un DataFrame retourné par une fonction. Ajoutez l’élément décoratif @dlt.table avant toute définition de fonction Python qui retourne un DataFrame Spark pour inscrire une nouvelle table dans Delta Live Tables. L’exemple suivant illustre l’utilisation du nom de la fonction comme nom de table et l’ajout d’un commentaire descriptif à la table :

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Ajouter une table à partir d’un jeu de données en amont dans le pipeline

Vous pouvez utiliser dlt.read() pour lire des données à partir d’autres jeux de données déclarés dans votre pipeline Delta Live Tables actuel. La déclaration de nouvelles tables de cette façon crée une dépendance que Delta Live Tables résout automatiquement avant l’exécution des mises à jour. Le code suivant inclut également des exemples de surveillance et d’application de la qualité des données avec des attentes. Consultez Gérer la qualité des données avec Delta Live Tables.

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

Créer une table avec des vues de données enrichies

Étant donné que Delta Live Tables traite les mises à jour des pipelines sous la forme d’une série de graphes de dépendances, vous pouvez déclarer des vues hautement enrichies qui alimentent les tableaux de bord, la BI et l’analytique en déclarant des tables avec une logique métier spécifique.

Des tables dans Delta Live Tables sont, du point de vue conceptuel, similaires aux vues matérialisées. Alors que les vues traditionnelles sur Spark exécutent la logique chaque fois que la vue est interrogée, une table Delta Live Tables stocke la version la plus récente des résultats de requête dans les fichiers de données. Étant donné que Delta Live Tables gère les mises à jour de tous les jeux de données d’un pipeline, vous pouvez planifier des mises à jour de pipeline pour répondre aux exigences de latence pour les vues matérialisées et savoir que les requêtes sur ces tables contiennent la version la plus récente des données disponibles.

La table définie par le code suivant illustre la similarité conceptuelle à une vue matérialisée dérivée des données en amont dans votre pipeline :

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

Pour configurer un pipeline qui utilise le notebook, consultez Créer un pipeline.

Implémenter un pipeline Delta Live Tables avec SQL

Databricks recommande Delta Live Tables avec SQL comme moyen préféré pour les utilisateurs SQL de créer de nouveaux pipelines ETL, d’ingestion et de transformation sur Azure Databricks. L’interface SQL pour Delta Live Tables étend le SQL Spark standard avec de nombreux nouveaux mots clés, constructions et fonctions table. Ces ajouts à SQL standard permettent aux utilisateurs de déclarer des dépendances entre des jeux de données et de déployer une infrastructure de niveau production sans apprendre de nouveaux outils ou des concepts supplémentaires.

Pour les utilisateurs familiarisés avec les Spark DataFrames et qui ont besoin de prendre en charge des tests et des opérations plus étendus qui sont difficiles à implémenter avec SQL, comme les opérations de métaprogrammation, Databricks recommande d’utiliser l’interface Python. Consultez Exemple : ingérer et traiter les données sur les prénoms des bébés à New York.

Télécharger les données

Pour obtenir les données de cet exemple, copiez le code suivant, collez-le dans un nouveau notebook, puis exécutez le notebook. Pour consulter les options de création de notebooks, consultez Créer un notebook.

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

Remplacez <catalog-name>, <schema-name> et <volume-name> par les noms de catalogue, de schéma et de volume d’un volume Unity Catalog.

Créer une table à partir de fichiers dans Unity Catalog

Pour le reste de cet exemple, copiez les extraits de code SQL suivants et collez-les dans un nouveau notebook SQL, distinct du bloc-notes de la section précédente. Vous devez ajouter chaque exemple d’extrait de code SQL à sa propre cellule dans le notebook, dans l’ordre décrit.

Delta Live Tables prend en charge le chargement de données à partir de tous les formats pris en charge par Azure Databricks. Consultez Options de format de données.

Toutes les instructions SQL Delta Live Tables utilisent la syntaxe et la sémantique CREATE OR REFRESH. Lorsque vous mettez à jour un pipeline, Delta Live Tables détermine si le résultat logiquement correct de la table peut être réalisé via un traitement incrémentiel ou si un nouveau calcul complet est requis.

L’exemple suivant crée une table en chargeant des données à partir du fichier CSV stocké dans le volume Unity Catalog :

CREATE OR REFRESH LIVE TABLE baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Remplacez <catalog-name>, <schema-name> et <volume-name> par les noms de catalogue, de schéma et de volume d’un volume Unity Catalog.

Ajouter une table à partir d’un jeu de données en amont dans le pipeline

Vous pouvez utiliser le schéma virtuel live pour interroger des données à partir d’autres jeux de données déclarés dans votre pipeline Delta Live Tables actuel. La déclaration de nouvelles tables de cette façon crée une dépendance que Delta Live Tables résout automatiquement avant l’exécution des mises à jour. Le schéma live est un mot clé personnalisé implémenté dans Delta Live Tables qui peut être remplacé par un schéma cible si vous voulez publier vos jeux de données. Consultez Utiliser Unity Catalog avec vos pipelines Delta Live Tables et Publier des données à partir de Delta Live Tables sur le metastore Hive.

Le code suivant inclut également des exemples de surveillance et d’application de la qualité des données avec des attentes. Consultez Gérer la qualité des données avec Delta Live Tables.

CREATE OR REFRESH LIVE TABLE baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

Créer une vue de données enrichie

Étant donné que Delta Live Tables traite les mises à jour des pipelines sous la forme d’une série de graphes de dépendances, vous pouvez déclarer des vues hautement enrichies qui alimentent les tableaux de bord, la BI et l’analytique en déclarant des tables avec une logique métier spécifique.

Les tables dynamiques sont, du point de vue conceptuel, similaires aux vues matérialisées. Alors que les vues traditionnelles sur Spark exécutent la logique chaque fois que la vue est interrogée, les tables dynamiques stockent la version la plus récente des résultats de requête dans les fichiers de données. Étant donné que Delta Live Tables gère les mises à jour de tous les jeux de données d’un pipeline, vous pouvez planifier des mises à jour de pipeline pour répondre aux exigences de latence pour les vues matérialisées et savoir que les requêtes sur ces tables contiennent la version la plus récente des données disponibles.

Le code suivant crée une vue matérialisée enrichie des données en amont :

CREATE OR REFRESH LIVE TABLE top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Pour configurer un pipeline qui utilise le notebook, continuez à Créer un pipeline.

Créer un pipeline

Delta Live Tables crée des pipelines en résolvant des dépendances définies dans des notebooks ou des fichiers (appelés code source ou bibliothèques) à l’aide d’une syntaxe Delta Live Tables. Chaque fichier de code source ne peut contenir qu’un seul langage, mais vous pouvez mélanger des bibliothèques de langage différent dans votre pipeline.

  1. Cliquez sur Delta Live Tables dans la barre latérale, puis cliquez sur Créer un pipeline.
  2. Donnez un nom au pipeline.
  3. (Facultatif) Cochez la case Serverless pour utiliser le calcul entièrement managé pour ce pipeline. Lorsque vous sélectionnez Serverless, les paramètres de calcul sont supprimés de l’interface utilisateur.
  4. (Facultatif) Sélectionnez une édition de produit.
  5. Sélectionnez Déclenché pour Mode de pipeline.
  6. Configurez un ou plusieurs notebooks contenant le code source du pipeline. Dans la zone de texte Chemins d’accès, entrez le chemin d’accès à un bloc-notes ou cliquez sur icône Sélecteur de fichiers pour sélectionner un bloc-notes.
  7. Sélectionnez une destination pour les jeux de données publiés par le pipeline, le metastore Hive ou le catalogue Unity. Consultez Publier des jeux de données.
    • Metastore Hive :
      • (Facultatif) Entrez un Emplacement de stockage pour les données de sortie du pipeline. Le système utilise un emplacement par défaut si vous laissez le paramètre Emplacement de stockage vide.
      • (Facultatif) Spécifiez un schéma cible pour publier votre jeu de données dans le metastore Hive.
    • Catalogue Unity : spécifiez un catalogue et un schéma cible pour publier votre jeu de données dans Unity Catalog.
  8. (Facultatif) Si vous n’avez pas sélectionné Serverless, vous pouvez configurer les paramètres de calcul du pipeline. Pour en savoir plus sur les options des paramètres de calcul, consultez Configurer les paramètres de pipeline pour Delta Live Tables.
  9. (Facultatif) Cliquez sur Ajouter une notification pour configurer un ou plusieurs adresses e-mail afin de recevoir des notifications pour des événements de pipeline. Consultez Ajouter des notifications par e-mail pour les événements de pipeline.
  10. (Facultatif) Configurez les paramètres avancés pour le pipeline. Pour en savoir plus sur les options des paramètres avancés, consultez Configurer les paramètres de pipeline pour Delta Live Tables.
  11. Cliquez sur Créer.

Lorsque vous cliquez sur Créer, le système affiche la page Détails du pipeline. Vous pouvez également accéder à votre pipeline en cliquant sur le nom du pipeline sous l’onglet Delta Live Tables.

Démarrer une mise à jour de pipeline

Pour démarrer une mise à jour de pipeline, cliquez sur le bouton Icône de démarrage Delta Live Tables dans le panneau supérieur. Le système renvoie un message confirmant le démarrage de votre pipeline.

Après le démarrage de la mise à jour, le système Delta Live Tables :

  1. démarre un cluster à l’aide d’une configuration de cluster créée par le système Delta Live Tables. Vous pouvez également spécifier une configuration de cluster personnalisée.
  2. crée toutes les tables qui n’existent pas et s’assure que le schéma est correct pour toutes les tables existantes.
  3. met à jour les tables avec les dernières données disponibles.
  4. arrête le cluster lorsque la mise à jour est terminée.

Remarque

Le mode d’exécution est défini sur Production par défaut, ce qui déploie des ressources de calcul éphémères pour chaque mise à jour. Vous pouvez utiliser le mode Développement pour changer ce comportement, ce qui permet l’utilisation des mêmes ressources de calcul pour plusieurs mises à jour de pipeline pendant le développement et les tests. Consultez Modes de développement et de production.

Publier des jeux de données

Vous pouvez mettre les jeux de données Delta Live Tables à disposition pour interroger le metastore Hive ou Unity Catalog par table de publication. Si vous ne spécifiez pas une cible pour la publication de données, les tables créées dans des pipelines Delta Live Tables peuvent uniquement être accessibles par d’autres opérations dans ce même pipeline. Consultez Publier des données à partir de Delta Live Tables sur le metastore Hive ou Utiliser Unity Catalog avec vos pipelines Delta Live Tables.

Exemple de notebooks de code source

Vous pouvez importer ces notebooks dans un espace de travail Azure Databricks et les utiliser pour déployer un pipeline Delta Live Tables. Voir Créer un pipeline.

Notebook Python de prise en main de Delta Live Tables

Obtenir le notebook

Notebook SQL de prise en main de Delta Live Tables

Obtenir le notebook

Exemple de notebooks de code source pour les espaces de travail sans Unity Catalog

Vous pouvez importer ces notebooks dans un espace de travail Azure Databricks sans Unity Catalog activé et les utiliser pour déployer un pipeline Delta Live Tables. Voir Créer un pipeline.

Notebook Python de prise en main de Delta Live Tables

Obtenir le notebook

Notebook SQL de prise en main de Delta Live Tables

Obtenir le notebook