Créer un pipeline de données de bout en bout dans Databricks

Cet article vous montre comment créer et déployer un pipeline de traitement des données de bout en bout, et notamment comment ingérer des données brutes, transformer les données et exécuter des analyses sur les données traitées.

Notes

Bien que cet article montre comment créer un pipeline de données complet en utilisant des notebooks Databricks et un travail Azure Databricks pour orchestrer un flux de travail, Databricks recommande d’utiliser Delta Live Tablesqui est une interface déclarative pour créer des pipelines de traitement de données fiables, gérables et testables.

Qu’est-ce qu’un pipeline de données ?

Un pipeline de données implémente les étapes nécessaires pour déplacer des données à partir de systèmes sources, transformer ces données en fonction des exigences et stocker les données dans un système cible. Un pipeline de données comprend tous les processus nécessaires pour transformer les données brutes en données préparées que les utilisateurs peuvent consommer. Par exemple, un pipeline de données peut préparer des données pour que les analystes de données et les scientifiques des données puissent en extraire des insights en utilisant l’analyse et la création de rapports.

Un workflow d’extraction, de transformation et de chargement (ETL) est un exemple courant de pipeline de données. Dans le traitement ETL, les données sont ingérées à partir des systèmes sources et écrites dans une zone intermédiaire, transformées en fonction des exigences (garantir la qualité des données, déduplication des enregistrements, etc.), puis écrites dans un système cible comme un entrepôt de données ou un lac de données.

Étapes de pipeline de données

Pour vous aider à bien démarrer avec la création de pipelines de données sur Azure Databricks, l’exemple présenté dans cet article décrit la création d’un workflow de traitement des données :

  • Utiliser les fonctionnalités Azure Databricks pour explorer un jeu de données brut.
  • Créer un notebook Databricks pour ingérer des données sources brutes et écrire les données brutes dans une table cible.
  • Créer un notebook Databricks pour transformer les données sources brutes et écrire les données transformées dans une table cible.
  • Créer un notebook Databricks pour interroger les données transformées.
  • Automatiser le pipeline de données avec un travail Azure Databricks.

Spécifications

Exemple : Jeu de données Million Song

Le jeu de données utilisé dans cet exemple est un sous-ensemble du jeu de données Million Song, une collection de caractéristiques et de métadonnées pour des morceaux de musique contemporains. Ce jeu de données est disponible dans les exemples de jeux de données compris dans votre espace de travail Azure Databricks.

Étape 1 : Créer un cluster

Afin d’effectuer le traitement et l’analyse des données dans cet exemple, créez un cluster pour fournir les ressources de calcul nécessaires à l’exécution des commandes.

Remarque

Étant donné que cet exemple utilise un exemple de jeu de données stocké dans DBFS et recommande la persistance des tables dans Unity Catalog, vous créez un cluster configuré en mode d’accès mono-utilisateur. Un mode d’accès mono-utilisateur permet un accès complet à DBFS tout en permettant également l’accès à Unity Catalog. Reportez-vous aux meilleures pratiques pour DBFS et Unity Catalog.

  1. Cliquez sur Calcul dans la barre latérale.
  2. Sur la page Calcul, cliquez sur Créer un cluster.
  3. Dans la page Nouveau cluster, entrez un nom unique pour le cluster.
  4. Dans Mode d'accès, sélectionnez Utilisateur unique.
  5. Dans Accès utilisateur unique ou principal de service, sélectionnez votre nom d’utilisateur.
  6. Laissez les valeurs restantes dans leur état par défaut, puis cliquez sur Créer un cluster.

Si vous souhaitez obtenir plus d’informations sur les clusters Databricks, consultez Compute.

Étape 2 : Explorer les données sources

Pour savoir comment utiliser l’interface Azure Databricks pour explorer les données sources brutes, consultez Explorer les données sources d’un pipeline de données. Si vous souhaitez passer directement à l’ingestion et à la préparation des données, passez à l’étape 3 : Ingérer les données brutes.

Étape 3 : Ingérer les données brutes

Dans cette étape, vous chargez les données brutes dans une table pour les rendre disponibles pour un traitement ultérieur. Pour gérer les actifs de données sur la plateforme Databricks, tels que les tables, Databricks recommande l'utilisation d'Unity Catalog. Toutefois, si vous ne disposez pas des autorisations nécessaires pour créer le catalogue et le schéma nécessaires pour publier des tables dans Unity Catalog, vous pouvez toujours effectuer les étapes suivantes en publiant des tables dans le metastore Hive.

Databricks recommande d’utiliser Auto Loader pour l’ingestion des données. Auto Loader détecte et traite automatiquement les nouveaux fichiers à mesure qu’ils arrivent dans le stockage d’objets cloud.

Vous pouvez configurer Auto Loader pour détecter automatiquement le schéma des données chargées, ce qui vous permet d’initialiser des tables sans déclarer explicitement le schéma de données et faire évoluer le schéma de table à mesure que de nouvelles colonnes sont introduites. Cela élimine la nécessité de suivre et d’appliquer manuellement les modifications de schéma au fil du temps. Databricks recommande l’inférence de schéma quand vous utilisez Auto Loader. Toutefois, comme indiqué à l’étape d’exploration des données, les données des chansons ne contiennent pas d’informations d’en-tête. Comme l’en-tête n’est pas stocké avec les données, vous devez définir explicitement le schéma, comme illustré dans l’exemple suivant.

  1. Cliquez sur New IconNew (Nouveau) dans la barre latérale, puis sélectionnez Notebook dans le menu. La boîte de dialogue Create Notebook s’affiche.

  2. Entrez un nom pour le notebook, par exemple, Ingest songs data. Par défaut :

    • Python est le langage sélectionné.
    • Le notebook est attaché au dernier cluster que vous avez utilisé. Dans ce cas, le cluster que vous avez créé à l’étape 1 : Créer un cluster.
  3. Entrez ce qui suit dans la première cellule du notebook :

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Si vous utilisez Unity Catalog, remplacez <table-name> par un catalogue, un schéma et un nom de table pour contenir les enregistrements ingérés (par exemple, data_pipelines.songs_data.raw_song_data). Sinon, remplacez <table-name> avec le nom d’une table qui doit contenir les enregistrements ingérés, par exemple, raw_song_data.

    Remplacez <checkpoint-path> avec un chemin vers un répertoire dans DBFS pour gérer les fichiers de point de contrôle, par exemple, /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Cliquez sur Run Menu, puis sélectionnez Run Cell (Exécuter la cellule). Cet exemple définit le schéma de données à partir des informations du README, ingère les données de chansons de tous les fichiers contenus dans file_path et écrit les données dans la table Delta spécifiée par table_name.

Étape 4 : préparation des données brutes

Pour préparer les données brutes en vue de l’analyse, les étapes suivantes transformeront les données de chansons brutes en masquant les colonnes inutiles et en ajoutant un nouveau champ contenant un horodatage pour la création du nouvel enregistrement.

  1. Cliquez sur New IconNew (Nouveau) dans la barre latérale, puis sélectionnez Notebook dans le menu. La boîte de dialogue Create Notebook s’affiche.

  2. Entrez un nom au notebook. Par exemple : Prepare songs data. Remplacez la langue par défaut par SQL.

  3. Entrez ce qui suit dans la première cellule du notebook :

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Si vous utilisez Unity Catalog, remplacez <table-name> par un catalogue, un schéma et un nom de table pour contenir les enregistrements ingérés (par exemple, data_pipelines.songs_data.prepared_song_data). Sinon, remplacez <table-name> avec le nom de la table qui doit contenir les enregistrements filtrés et transformés (par exemple, prepared_song_data).

    Remplacez <raw-songs-table-name> avec le nom de la table contenant les enregistrements de chansons brutes ingérés à l’étape précédente.

  4. Cliquez sur Run Menu, puis sélectionnez Run Cell (Exécuter la cellule).

Étape 5 : Interroger les données transformées

Dans cette étape, vous étendez le pipeline de traitement en ajoutant des requêtes pour analyser les données des chansons. Ces requêtes utilisent les enregistrements préparés créés à l’étape précédente.

  1. Cliquez sur New IconNew (Nouveau) dans la barre latérale, puis sélectionnez Notebook dans le menu. La boîte de dialogue Create Notebook s’affiche.

  2. Entrez un nom au notebook. Par exemple : Analyze songs data. Remplacez la langue par défaut par SQL.

  3. Entrez ce qui suit dans la première cellule du notebook :

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Remplacez <prepared-songs-table-name> avec le nom de la table contenant les données préparées. Par exemple : data_pipelines.songs_data.prepared_song_data.

  4. Cliquez sur Down Caret dans le menu d’actions de cellule, sélectionnez Add Cell Below (Ajouter une cellule en dessous), puis entrez ce qui suit dans la nouvelle cellule :

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Remplacez <prepared-songs-table-name> avec le nom de la table préparée créée à l’étape précédente. Par exemple : data_pipelines.songs_data.prepared_song_data.

  5. Pour exécuter les requêtes et voir la sortie, cliquez sur Tout exécuter.

Étape 6 : Créer un travail Azure Databricks pour exécuter le pipeline

Vous pouvez créer un workflow pour automatiser l’exécution des étapes d’ingestion, de traitement et d’analyse des données en utilisant un travail Azure Databricks.

  1. Dans votre espace de travail Ingénierie et Science des données, effectuez l’une des opérations suivantes :
    • Cliquez sur Jobs IconWorkflows dans la barre latérale et cliquez sur Create Job Button.
    • Dans la barre latérale, cliquez sur New IconNouveau, puis sélectionnez Travail.
  2. Dans la boîte de dialogue tâche qui s’affiche sous l’onglet Tâches, remplacez Ajouter un nom à votre travail... par votre nom de travail. Par exemple, « Workflow Songs ».
  3. Dans Nom de la tâche, entrez un nom pour la première tâche, par exemple, Ingest_songs_data.
  4. Dans Type, sélectionnez le type de tâche Notebook.
  5. Dans Source, sélectionnez Espace de travail.
  6. Utilisez l’Explorateur de fichiers pour rechercher le notebook d’ingestion des données, cliquez sur le nom du notebook, puis cliquez sur Confirmer.
  7. Dans Cluster, sélectionnez Shared_job_cluster ou le cluster que vous avez créé à l’étape Create a cluster.
  8. Cliquez sur Créer.
  9. Cliquez sur Add Task Button sous la tâche que vous venez de créer et sélectionnez Notebook.
  10. Pour Nom de la tâche, entrez un nom pour la tâche, par exemple Prepare_songs_data.
  11. Dans Type, sélectionnez le type de tâche Notebook.
  12. Dans Source, sélectionnez Espace de travail.
  13. Utilisez l’Explorateur de fichiers pour rechercher le notebook de préparation des données, cliquez sur le nom du notebook, puis cliquez sur Confirmer.
  14. Dans Cluster, sélectionnez Shared_job_cluster ou le cluster que vous avez créé à l’étape Create a cluster.
  15. Cliquez sur Créer.
  16. Cliquez sur Add Task Button sous la tâche que vous venez de créer et sélectionnez Notebook.
  17. Pour Nom de la tâche, entrez un nom pour la tâche, par exemple Analyze_songs_data.
  18. Dans Type, sélectionnez le type de tâche Notebook.
  19. Dans Source, sélectionnez Espace de travail.
  20. Utilisez l’Explorateur de fichiers pour rechercher le notebook d’analyse des données, cliquez sur le nom du notebook, puis cliquez sur Confirmer.
  21. Dans Cluster, sélectionnez Shared_job_cluster ou le cluster que vous avez créé à l’étape Create a cluster.
  22. Cliquez sur Créer.
  23. Pour exécuter le workflow, cliquez sur Run Now Button. Pour voir les détails de l’exécution, cliquez sur le lien dans la colonne Heure de début de l’exécution dans la vue Exécutions du travail. Cliquez sur chaque tâche pour voir les détails de leur exécution.
  24. Pour voir les résultats une fois le workflow terminé, cliquez sur la tâche d’analyse de données finale. La page Sortie s’affiche et montre les résultats de la requête.

Étape 7 : Planifier le travail du pipeline de données

Notes

Pour illustrer l’utilisation d’un travail Azure Databricks pour orchestrer un flux de travail planifié, cet exemple de prise en main sépare les étapes d’ingestion, de préparation et d’analyse en notebooks distincts, et chaque notebook est ensuite utilisé pour créer une tâche dans le travail. Si tout le traitement est contenu dans un seul notebook, vous pouvez facilement planifier le notebook directement à partir de l’interface utilisateur du notebook Azure Databricks. Consultez Créer et gérer des tâches de notebooks planifiées.

Une exigence courante est d’exécuter un pipeline de données de manière planifiée. Afin de définir une planification pour le travail qui exécute le pipeline :

  1. Cliquez sur Jobs IconWorkflows dans la barre latérale.
  2. Dans la colonne Nom, cliquez sur le nom d’un travail. Le volet latéral affiche les Détails du travail.
  3. Cliquez sur Add trigger (Ajouter un déclencheur) dans le volet Job details (Détails du travail) et sélectionnez Scheduled (Planifié) dans Trigger type (Type de déclencheur).
  4. Indiquez la période, l’heure de début et le fuseau horaire. Si vous le souhaitez, activez la case à cocher Show Cron Syntax (Afficher la syntaxe Cron) pour afficher et modifier la planification dans la syntaxe Quartz Cron.
  5. Cliquez sur Enregistrer.

En savoir plus