Training
Module
Build data pipelines with Delta Live Tables - Training
Learn how to build data pipelines with Delta Live Tables in Azure Databricks
This browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Learn how to create and deploy an ETL (extract, transform, and load) pipeline for data orchestration using DLT and Auto Loader. An ETL pipeline implements the steps to read data from source systems, transform that data based on requirements, such as data quality checks and record de-duplication, and write the data to a target system, such as a data warehouse or a data lake.
In this tutorial, you will use DLT and Auto Loader to:
For more information about DLT and Auto Loader, see What is DLT? and What is Auto Loader?
To complete this tutorial, you must meet the following requirements:
ALL PRIVILEGES
or USE CATALOG
and CREATE SCHEMA
.ALL PRIVILEGES
or USE SCHEMA
and CREATE VOLUME
.The dataset used in this example is a subset of the Million Song Dataset, a collection of features and metadata for contemporary music tracks. This dataset is available in the sample datasets included in your Azure Databricks workspace.
First, you will create an ETL pipeline in DLT. DLT creates pipelines by resolving dependencies defined in notebooks or files (called source code) using DLT syntax. Each source code file can contain only one language, but you can add multiple language-specific notebooks or files in the pipeline. To learn more, see What is DLT?
Important
Leave the Source code field blank to automatically create and configure a notebook for source code authoring.
This tutorial uses serverless compute and Unity Catalog. For all configuration options not specified, use the default settings. If serverless compute is not enabled or supported in your workspace, you can complete the tutorial as written using default compute settings. If you use default compute settings, you must manually select Unity Catalog under Storage options in the Destination section of the Create pipeline UI.
To create a new ETL pipeline in DLT, follow these steps:
The pipeline UI appears for the new pipeline.
Important
Notebooks can only contain a single programming language. Do not mix Python and SQL code in pipeline source code notebooks.
In this step, you will use Databricks notebooks to interactively develop and validate source code for DLT pipelines.
The code uses Auto Loader for incremental data ingestion. Auto Loader automatically detects and processes new files as they arrive in cloud object storage. To learn more, see What is Auto Loader?
A blank source code notebook is automatically created and configured for the pipeline. The notebook is created in a new directory in your user directory. The name of the new directory and file match the name of your pipeline. For example, /Users/someone@example.com/my_pipeline/my_pipeline
.
When developing a DLT pipeline, you can choose either Python or SQL. Examples are included for both languages. Based on your language choice, make sure you select the default notebook language. To learn more about notebook support for DLT pipeline code development, see Develop and debug DLT pipelines in notebooks.
A link to access this notebook is under the Source code field in the Pipeline details panel. Click the link to open the notebook before proceeding to the next step.
Click Connect in the upper-right to open the compute configuration menu.
Hover over the name of the pipeline you created in Step 1.
Click Connect.
Next to your notebook's title at the top, select the notebook's default language (Python or SQL).
Copy and paste the following code into a cell in the notebook.
# Import modules
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
# Define the path to source data
file_path = f"/databricks-datasets/songs/data-001/"
# Define a streaming table to ingest data from a volume
schema = StructType(
[
StructField("artist_id", StringType(), True),
StructField("artist_lat", DoubleType(), True),
StructField("artist_long", DoubleType(), True),
StructField("artist_location", StringType(), True),
StructField("artist_name", StringType(), True),
StructField("duration", DoubleType(), True),
StructField("end_of_fade_in", DoubleType(), True),
StructField("key", IntegerType(), True),
StructField("key_confidence", DoubleType(), True),
StructField("loudness", DoubleType(), True),
StructField("release", StringType(), True),
StructField("song_hotnes", DoubleType(), True),
StructField("song_id", StringType(), True),
StructField("start_of_fade_out", DoubleType(), True),
StructField("tempo", DoubleType(), True),
StructField("time_signature", DoubleType(), True),
StructField("time_signature_confidence", DoubleType(), True),
StructField("title", StringType(), True),
StructField("year", IntegerType(), True),
StructField("partial_sequence", IntegerType(), True)
]
)
@dlt.table(
comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
)
def songs_raw():
return (spark.readStream
.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("sep","\t")
.option("inferSchema", True)
.load(file_path))
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="Million Song Dataset with data cleaned and prepared for analysis."
)
@dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
@dlt.expect("valid_title", "song_title IS NOT NULL")
@dlt.expect("valid_duration", "duration > 0")
def songs_prepared():
return (
spark.read.table("songs_raw")
.withColumnRenamed("title", "song_title")
.select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of songs released by the artists each year who released most songs."
)
def top_artists_by_year():
return (
spark.read.table("songs_prepared")
.filter(expr("year > 0"))
.groupBy("artist_name", "year")
.count().withColumnRenamed("count", "total_number_of_songs")
.sort(desc("total_number_of_songs"), desc("year"))
)
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE songs_raw
(
artist_id STRING,
artist_lat DOUBLE,
artist_long DOUBLE,
artist_location STRING,
artist_name STRING,
duration DOUBLE,
end_of_fade_in DOUBLE,
key INT,
key_confidence DOUBLE,
loudness DOUBLE,
release STRING,
song_hotnes DOUBLE,
song_id STRING,
start_of_fade_out DOUBLE,
tempo DOUBLE,
time_signature INT,
time_signature_confidence DOUBLE,
title STRING,
year INT,
partial_sequence STRING,
value STRING
)
COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
AS SELECT *
FROM STREAM read_files(
'/databricks-datasets/songs/data-001/');
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
CONSTRAINT valid_duration EXPECT (duration > 0)
)
COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
FROM songs_raw;
-- Define a materialized view that has a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
COMMENT "A table summarizing counts of songs released by the artists each year who released most songs."
AS SELECT
artist_name,
year,
COUNT(*) AS total_number_of_songs
FROM songs_prepared
WHERE year > 0
GROUP BY artist_name, year
ORDER BY total_number_of_songs DESC, year DESC
In this step, you will query the data processed in the ETL pipeline to analyze the song data. These queries use the prepared records created in the previous step.
First, run a query that finds the artists who have released the most songs each year starting with 1990.
In the sidebar, click SQL Editor.
Click the new tab icon and select Create new query from the menu.
Enter the following:
-- Which artists released the most songs each year in 1990 or later?
SELECT artist_name, total_number_of_songs, year
FROM <catalog>.<schema>.top_artists_by_year
WHERE year >= 1990
ORDER BY total_number_of_songs DESC, year DESC
Replace <catalog>
and <schema>
with the name of the catalog and schema the table is in. For example, data_pipelines.songs_data.top_artists_by_year
.
Click Run selected.
Now, run another query that finds songs with a 4/4 beat and danceable tempo.
Click the new tap icon and select Create new query from the menu.
Enter the following code:
-- Find songs with a 4/4 beat and danceable tempo
SELECT artist_name, song_title, tempo
FROM <catalog>.<schema>.songs_prepared
WHERE time_signature = 4 AND tempo between 100 and 140;
Replace <catalog>
and <schema>
with the name of the catalog and schema the table is in. For example, data_pipelines.songs_data.songs_prepared
.
Click Run selected.
Next, create a workflow to automate running the data ingestion, processing, and analysis steps using a Databricks job.
Songs workflow
.ETL_songs_data
.For more information about job runs, see Monitoring and observability for Databricks Jobs.
To run the ETL pipeline on a schedule, follow these steps:
Training
Module
Build data pipelines with Delta Live Tables - Training
Learn how to build data pipelines with Delta Live Tables in Azure Databricks