Linux Foundation Delta Lake overview

This article has been adapted for more clarity from its original counterpart here. This article helps you quickly explore the main features of Delta Lake. The article provides code snippets that show how to read from and write to Delta Lake tables from interactive, batch, and streaming queries. The code snippets are also available in a set of notebooks PySpark here, Scala here, and C# here

Here's what we will cover:

  • Create a table
  • Read data
  • Update table data
  • Overwrite table data
  • Conditional update without overwrite
  • Read older versions of data using Time Travel
  • Write a stream of data to a table
  • Read a stream of changes from a table
  • SQL Support

Configuration

Make sure you modify the below as appropriate for your environment.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Results in:

'/delta/delta-table-335323'

Create a table

To create a Delta Lake table, write a DataFrame out a DataFrame in the delta format. You can change the format from Parquet, CSV, JSON, and so on, to delta.

The code that follows shows you how to create a new Delta Lake table using the schema inferred from your DataFrame.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Results in:

ID
0
1
2
3
4

Read data

You read data in your Delta Lake table by specifying the path to the files and the delta format.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Results in:

ID
1
3
4
0
2

The order of the results is different from above as there was no order explicitly specified before outputting the results.

Update table data

Delta Lake supports several operations to modify tables using standard DataFrame APIs, this is one of the big enhancements that delta format adds. The following example runs a batch job to overwrite the data in the table.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Results in:

ID
7
8
5
9
6

Here you can see that all five records have been updated to hold new values.

Save as catalog tables

Delta Lake can write to managed or external catalog tables.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Results in:

database tableName isTemporary
default externaldeltatable false
default manageddeltatable false

With this code, you created a new table in the catalog from an existing dataframe, referred to as a managed table. Then you defined a new external table in the catalog that uses an existing location, referred to as an external table. In the output you can see both tables, no matter how they were created, are listed in the catalog.

Now you can look at the extended properties of both of these tables

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Results in:

col_name data_type comment
id bigint null
Detailed Table Information
Database default
Table manageddeltatable
Owner trusted-service-user
Created Time Sat Apr 25 00:35:34 UTC 2020
Last Access Thu Jan 01 00:00:00 UTC 1970
Created By Spark 2.4.4.2.6.99.201-11401300
Type MANAGED
Provider delta
Table Properties [transient_lastDdlTime=1587774934]
Statistics 2407 bytes
Location abfss://data@<data lake>.dfs.core.windows.net/synapse/workspaces/<workspace name>/warehouse/manageddeltatable
Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Storage Properties [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Results in:

col_name data_type comment
id bigint null
Detailed Table Information
Database default
Table externaldeltatable
Owner trusted-service-user
Created Time Sat Apr 25 00:35:38 UTC 2020
Last Access Thu Jan 01 00:00:00 UTC 1970
Created By Spark 2.4.4.2.6.99.201-11401300
Type EXTERNAL
Provider DELTA
Table Properties [transient_lastDdlTime=1587774938]
Location abfss://data@<data lake>.dfs.core.windows.net/delta/delta-table-587152
Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Storage Properties [serialization.format=1]

Conditional update without overwrite

Delta Lake provides programmatic APIs to conditional update, delete, and merge (this is commonly referred to as an upsert) data into tables.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Results in:

ID
106
108
5
7
9

Here you just added 100 to every even ID.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Results in:

ID
5
7
9

Notice that every even row has been deleted.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Results in:

ID
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Here you have a combination of the existing data. The existing data has been assigned the value -1 in the update(WhenMatched) code path. The new data that was created at the top of the snippet and was added via the insert code path (WhenNotMatched), was also added.

History

Delta Lake's has the ability to allow looking into history of a table. That is, the changes that were made to the underlying Delta Table. The cell below shows how simple it is to inspect the history.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Results in:

version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend
4 2020-04-25 00:36:27 null null MERGE [predicate -> (oldData.ID = newData.ID)] null null null 3 null false
3 2020-04-25 00:36:08 null null DELETE [predicate -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] null null null 2 null false
2 2020-04-25 00:35:51 null null UPDATE [predicate -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] null null null 1 null false
1 2020-04-25 00:35:05 null null WRITE [mode -> Overwrite, partitionBy -> []] null null null 0 null false
0 2020-04-25 00:34:34 null null WRITE [mode -> ErrorIfExists, partitionBy -> []] null null null null null true

Here you can see all of the modifications made over the above code snippets.

Read older versions of data using Time Travel

It's possible to query previous snapshots of your Delta Lake table by using a feature called Time Travel. If you want to access the data that you overwrote, you can query a snapshot of the table before you overwrote the first set of data using the versionAsOf option.

Once you run the cell below, you should see the first set of data from before you overwrote it. Time Travel is an extremely powerful feature that takes advantage of the power of the Delta Lake transaction log to access data that is no longer in the table. Removing the version 0 option (or specifying version 1) would let you see the newer data again. For more information, see Query an older snapshot of a table.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Results in:

ID
0
1
4
3
2

Here you can see you have gone back to the earliest version of the data.

Write a stream of data to a table

You can also write to a Delta Lake table using Spark's Structured Streaming. The Delta Lake transaction log guarantees exactly once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table.

For more information about Delta Lake integration with Structured Streaming, see Table Streaming Reads and Writes.

In the cells below, here's what we are doing:

  • Cell 30 Show the newly appended data
  • Cell 31 Inspect history
  • Cell 32 Stop the structured streaming job
  • Cell 33 Inspect history <--You'll notice appends have stopped

First you are going to set up a simple Spark Streaming job to generate a sequence and make the job write to your Delta Table.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Read a stream of changes from a table

While the stream is writing to the Delta Lake table, you can also read from that table as a streaming source. For example, you can start another streaming query that prints all the changes made to the Delta Lake table.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Results in:

ID
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Results in:

version timestamp operation operationParameters readVersion
5 2020-04-25 00:37:09 STREAMING UPDATE [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 MERGE [predicate -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] null

Here you are dropping some of the less interesting columns to simplify the viewing experience of the history view.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Results in:

version timestamp operation operationParameters readVersion
5 2020-04-25 00:37:09 STREAMING UPDATE [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 MERGE [predicate -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] null

Convert Parquet to Delta

You can do an in-place conversion from the Parquet format to Delta.

Here you are going to test if the existing table is in delta format or not.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Results in:

False

Now you are going to convert the data to delta format and verify it worked.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Results in:

True

SQL support

Delta supports table utility commands through SQL. You can use SQL to:

  • Get a DeltaTable's history
  • Vacuum a DeltaTable
  • Convert a Parquet file to Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Results in:

version timestamp userId userName operation operationParameters job notebook clusterId readVersion isolationLevel isBlindAppend
5 2020-04-25 00:37:09 null null STREAMING UPDATE [outputMode -> Ap... null null null 4 null true
4 2020-04-25 00:36:27 null null MERGE [predicate -> (ol... null null null 3 null false
3 2020-04-25 00:36:08 null null DELETE [predicate -> ["(... null null null 2 null false
2 2020-04-25 00:35:51 null null UPDATE [predicate -> ((i... null null null 1 null false
1 2020-04-25 00:35:05 null null WRITE [mode -> Overwrit... null null null 0 null false
0 2020-04-25 00:34:34 null null WRITE [mode -> ErrorIfE... null null null null null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Results in:

path
abfss://data@arca...

Now you are going to verify that a table is not a delta format table, then convert it to delta format using Spark SQL and confirm it was converted correctly.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Results in:

True

For full documentation, see the Delta Lake Documentation Page

For more information, see Delta Lake Project.

Next steps