チュートリアル: Delta Lake
このチュートリアルでは、Azure Databricks に対する Delta Lake の次のような一般的な操作について説明します。
- テーブルを作成する。
- テーブルへのアップサート。
- テーブルからの読み取り。
- テーブル履歴の表示。
- 以前のバージョンのテーブルに対してクエリを実行する。
- テーブルの最適化。
- Z オーダー インデックスを追加する。
- 参照されていないファイルをバキュームする。
Azure Databricks クラスターにアタッチされているノートブック内から、Python、R、Scala、または SQL のサンプル コードを実行できます。 また、Databricks SQL の SQL ウェアハウスに関連付けられているクエリ内から SQL コードを実行することもできます。
注意
次のコード例の一部では、スキーマ (データベースとも呼ばれます) とテーブルまたはビュー (たとえば default.people10m
) で構成される 2 レベルの名前空間表記を使用しています。 これらの例を Unity Catalog で使用するには、2 レベルの名前空間を、カタログ、スキーマ、およびテーブルまたはビュー (たとえば main.default.people10m
) で構成される Unity Catalog の 3 レベルの名前空間表記に置き換えます。
テーブルの作成
Azure Databricks で作成されたすべてのテーブルでは、既定で Delta Lake が使用されます。
Note
Delta Lake は、Azure Databricks のすべての読み取り、書き込み、テーブル作成コマンドのデフォルトです。
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
前の操作では、データから推論されたスキーマを使用して、新しいマネージ テーブルを作成します。 Delta テーブルを作成するときに使用できるオプションの詳細については、「テーブルの作成」を参照してください。
マネージド テーブルの場合、Azure Databricks はデータの場所を判断します。 場所を取得するには、DESCRIBE DETAIL ステートメントを使用できます。次に例を示します。
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
データを挿入する前にスキーマを指定してテーブルを作成する必要がある場合があります。 これを実行するには、次の SQL コマンドを使用します。
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
Databricks Runtime 13.3 LTS 以降では、CREATE TABLE LIKE
を使用して、ソース Delta テーブルのスキーマとテーブルのプロパティを複製する新しい空の Delta テーブルを作成できます。 これは、次のコード例のように、開発環境から運用環境にテーブルを昇格する場合に特に役立ちます。
CREATE TABLE prod.people10m LIKE dev.people10m
Delta Lake の DeltaTableBuilder
API を使用してテーブルを作成することもできます。 DataFrameWriter API と比較して、この API を使用すると、列のコメント、テーブルのプロパティ、 生成された列のような追加情報の指定が簡単になります。
重要
この機能はパブリック プレビュー段階にあります。
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
テーブルへのアップサート
一連の更新と挿入を既存の Delta テーブルにマージするには、MERGE INTO ステートメントを使用します。 たとえば、次のステートメントは、ソース テーブルからデータを取得し、ターゲット Delta テーブルにマージします。 両方のテーブルに一致する行がある場合、Delta Lake は指定された式を使用してデータ列を更新します。 一致する行がない場合、Delta Lake によって新しい行が追加されます。 この操作はアップサートと呼ばれます。
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
*
を指定すると、ターゲット テーブル内のすべての列が更新または挿入されます。 このアクションでは、ソース テーブルの列がターゲット テーブルと同じであることを前提とします。それ以外の場合、クエリでは分析エラーがスローされます。
INSERT
操作を実行する際には、テーブル内のすべての列に値を指定する必要があります (たとえば、既存のデータセットに一致する行がない場合など)。 ただし、すべての値を更新する必要はありません。
結果を表示するには、テーブルに対してクエリを実行します。
SELECT * FROM people_10m WHERE id >= 9999998
テーブルの読み取り
次の例に示すように、テーブル名またはテーブル パスによって Delta テーブルのデータにアクセスします。
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
テーブルへの書き込み
Delta Lake では、テーブルにデータを書き込むために標準構文が使用されます。
既存の Delta テーブルに新しいデータをアトミックに追加するには、次の例のように append
モードを使用します。
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
テーブル内のすべてのデータをアトミックに置き換えるには、次の例のように overwrite
モードを使用 します。
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
テーブルの更新
Delta テーブルの述語に一致するデータを更新できます。 たとえば、people10m
という名前のテーブルか /tmp/delta/people-10m
のパスで、gender
列の省略形を M
または F
から、Male
または Female
に変更するには、次を実行できます。
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
テーブルからの削除
Delta テーブルから述語に一致するデータを削除できます。 たとえば、people10m
という名前のテーブルまたは /tmp/delta/people-10m
のパスで、birthDate
列の値を持つユーザーに対応する行すべてを 1955
の前から削除するには、次のコマンドを実行します。
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
重要
delete
では、最新バージョンの Delta テーブルからデータが削除されますが、前のバージョンが明示的にバキュームされるまで、物理ストレージからデータは削除されません。 詳細についてはバキュームに関するページを参照してください。
テーブル履歴の表示
テーブルの履歴を表示するには、DESCRIBE HISTORY ステートメントを使用します。このステートメントは、テーブルへの書き込みごとに、テーブルのバージョン、操作、ユーザーなどの実証情報を提供します。
DESCRIBE HISTORY people_10m
以前のバージョンのテーブルに対してクエリを実行する (タイム トラベル)
Delta Lake タイム トラベル機能を使用すると、Delta テーブルのスナップショットを過去にさかのぼって照会することができます。
以前のバージョンのテーブルに対してクエリを実行するには、ステートメントで SELECT
バージョンまたはタイムスタンプを指定します。 たとえば、上記の履歴からバージョン 0 に対してクエリを実行するには、以下を使用します。
SELECT * FROM people_10m VERSION AS OF 0
or
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
タイムスタンプの場合、"2019-01-01"
や "2019-01-01'T'00:00:00.000Z"
など、日付またはタイムスタンプ文字列のみを使用できます。
DataFrameReader オプションを使用すると、特定のバージョンのテーブル (Python など) に固定されている Delta テーブルから DataFrame を作成できます。
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
または、次のようにします。
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
詳細については、「Delta Lake テーブル履歴の処理」を参照してください。
テーブルの最適化
テーブルに対して複数の変更を実行すると、多数の小さなファイルができる可能性があります。 読み取りクエリの速度を向上させるために、OPTIMIZE
を使用して小さなファイルをより大きなファイルに折りたたむことができます。
OPTIMIZE people_10m
列による Z オーダー
読み取りパフォーマンスをさらに向上させるために、同じファイル セット内の関連情報を Z オーダー別に併置することができます。 読み取る必要があるデータの量を大幅に削減するため、Delta Lake のデータのスキップ アルゴリズムでこのコロケーションが自動的に使用されます。 データを Z オーダーで並べ替えるには、ZORDER BY
句の順に列を指定します。 たとえば、gender
で併置するには、次のように実行します。
OPTIMIZE people_10m
ZORDER BY (gender)
OPTIMIZE
を実行するときに使用できるオプションの完全なセットについては、「Delta Lake で最適化を使用してデータ ファイルを圧縮する」を参照してください。
VACUUM
を使用してスナップショットをクリーンアップする
デルタレイクでは読み取りのスナップショット分離が提供されるため、他のユーザーまたはジョブがテーブルに対してクエリを実行している間でも安全に OPTIMIZE
を実行できます。 ただし、最終的には、以前のスナップショットをクリーンアップする必要があります。 これを行うには、VACUUM
コマンドを実行します。
VACUUM people_10m
VACUUM
を効果的に使用する方法の詳細については、「VACUUM を使用して未使用のデータ ファイルを削除する」を参照してください。