Tutorial: Build an ETL pipeline with DLT

Learn how to create and deploy an ETL (extract, transform, and load) pipeline for data orchestration using DLT and Auto Loader. An ETL pipeline implements the steps to read data from source systems, transform that data based on requirements, such as data quality checks and record de-duplication, and write the data to a target system, such as a data warehouse or a data lake.

In this tutorial, you will use DLT and Auto Loader to:

  • Ingest raw source data to a target table.
  • Transform the raw source data and write the transformed data to two target materialized views.
  • Query the transformed data.
  • Automate the ETL pipeline with a Databricks job.

For more information about DLT and Auto Loader, see What is DLT? and What is Auto Loader?

Requirements

To complete this tutorial, you must meet the following requirements:

About the dataset

The dataset used in this example is a subset of the Million Song Dataset, a collection of features and metadata for contemporary music tracks. This dataset is available in the sample datasets included in your Azure Databricks workspace.

Step 1: Create a pipeline

First, you will create an ETL pipeline in DLT. DLT creates pipelines by resolving dependencies defined in notebooks or files (called source code) using DLT syntax. Each source code file can contain only one language, but you can add multiple language-specific notebooks or files in the pipeline. To learn more, see What is DLT?

Important

Leave the Source code field blank to automatically create and configure a notebook for source code authoring.

This tutorial uses serverless compute and Unity Catalog. For all configuration options not specified, use the default settings. If serverless compute is not enabled or supported in your workspace, you can complete the tutorial as written using default compute settings. If you use default compute settings, you must manually select Unity Catalog under Storage options in the Destination section of the Create pipeline UI.

To create a new ETL pipeline in DLT, follow these steps:

  1. In the sidebar, click Pipelines.
  2. Click Create pipeline and ETL pipeline.
  3. In Pipeline name, type a unique pipeline name.
  4. Select the Serverless checkbox.
  5. In Destination, to configure a Unity Catalog location where tables are published, select an existing Catalog and write a new name in Schema to create a new schema in your catalog.
  6. Click Create.
  7. Click on the source code notebook link under the Source code field in the Pipeline details panel.

The pipeline UI appears for the new pipeline.

Step 2: Develop a DLT pipeline

Important

Notebooks can only contain a single programming language. Do not mix Python and SQL code in pipeline source code notebooks.

In this step, you will use Databricks notebooks to interactively develop and validate source code for DLT pipelines.

The code uses Auto Loader for incremental data ingestion. Auto Loader automatically detects and processes new files as they arrive in cloud object storage. To learn more, see What is Auto Loader?

A blank source code notebook is automatically created and configured for the pipeline. The notebook is created in a new directory in your user directory. The name of the new directory and file match the name of your pipeline. For example, /Users/someone@example.com/my_pipeline/my_pipeline.

When developing a DLT pipeline, you can choose either Python or SQL. Examples are included for both languages. Based on your language choice, make sure you select the default notebook language. To learn more about notebook support for DLT pipeline code development, see Develop and debug DLT pipelines in notebooks.

  1. A link to access this notebook is under the Source code field in the Pipeline details panel. Click the link to open the notebook before proceeding to the next step.

  2. Click Connect in the upper-right to open the compute configuration menu.

  3. Hover over the name of the pipeline you created in Step 1.

  4. Click Connect.

  5. Next to your notebook's title at the top, select the notebook's default language (Python or SQL).

  6. Copy and paste the following code into a cell in the notebook.

    Python

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists each year who released most songs."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year who released most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    

Step 3: Query the transformed data

In this step, you will query the data processed in the ETL pipeline to analyze the song data. These queries use the prepared records created in the previous step.

First, run a query that finds the artists who have released the most songs each year starting with 1990.

  1. In the sidebar, click SQL Editor Icon SQL Editor.

  2. Click the Add or plus icon new tab icon and select Create new query from the menu.

  3. Enter the following:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    Replace <catalog> and <schema> with the name of the catalog and schema the table is in. For example, data_pipelines.songs_data.top_artists_by_year.

  4. Click Run selected.

Now, run another query that finds songs with a 4/4 beat and danceable tempo.

  1. Click the Add or plus icon new tap icon and select Create new query from the menu.

  2. Enter the following code:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    Replace <catalog> and <schema> with the name of the catalog and schema the table is in. For example, data_pipelines.songs_data.songs_prepared.

  3. Click Run selected.

Step 4: Create a job to run the DLT pipeline

Next, create a workflow to automate running the data ingestion, processing, and analysis steps using a Databricks job.

  1. In your workspace, click Workflows Icon Workflows in the sidebar and click Create job.
  2. In the task title box, replace New Job <date and time> with your job name. For example, Songs workflow.
  3. In Task name, enter a name for the first task, for example, ETL_songs_data.
  4. In Type, select Pipeline.
  5. In Pipeline, select the DLT pipeline you created in step 1.
  6. Click Create.
  7. To run the workflow, Click Run Now. To view the details for the run, click the Runs tab. Click the task to view details for the task run.
  8. To view the results when the workflow is completed, click Go to the latest successful run or the Start time for the job run. The Output page appears and displays the query results.

For more information about job runs, see Monitoring and observability for Databricks Jobs.

Step 5: Schedule the DLT pipeline job

To run the ETL pipeline on a schedule, follow these steps:

  1. Click Workflows Icon Workflows in the sidebar.
  2. In the Name column, click the job name. The side panel displays the Job details.
  3. Click Add trigger in the Schedules & Triggers panel and select Scheduled in Trigger type.
  4. Specify the period, starting time, and time zone.
  5. Click Save.

Learn more


Additional resources