Руководство по Delta Lake
В этом руководстве представлены общие операции Delta Lake в Azure Databricks, в том числе следующие:
- Создать таблицу.
- Upsert к таблице.
- Чтение из таблицы.
- Показать журнал таблицы.
- Запрос более ранней версии таблицы.
- Оптимизация таблицы.
- Добавление индекса Z-порядка.
- Очистка файлов без ссылок.
Примеры кода Python, R, Scala и SQL в этой статье можно запустить из записной книжки, подключенной к кластеру Azure Databricks. Вы также можете выполнить приведенный в этой статье код SQL из запроса, связанного с хранилищем SQL в Databricks SQL.
Примечание.
В некоторых из следующих примеров кода используется двухуровневая нотация пространства имен, состоящая из схемы (также называемой базой данных) и таблицы или представления (например, default.people10m
). Чтобы использовать эти примеры с каталогом Unity, замените двухуровневое пространство имен трехуровневой нотацией пространства имен каталога Unity, состоящей из каталога, схемы и таблицы или представления (например, main.default.people10m
).
Создание таблицы
Таблицы, созданные в Azure Databricks, по умолчанию используют Delta Lake.
Примечание.
Delta Lake — это значение по умолчанию для всех команд создания таблиц и операций чтения, записи и таблицы Azure Databricks.
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
Предыдущие операции создают новую управляемую таблицу с использованием схемы, выведенной из данных. Сведения о доступных параметрах при создании разностной таблицы см. в статье CREATE TABLE.
Для управляемых таблиц Azure Databricks определяет расположение данных. Чтобы получить расположение, можно использовать инструкцию DESCRIBE DETAIL, например:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
Иногда может потребоваться создать таблицу, указав схему перед вставкой данных. Вы можете выполнить это с помощью следующих команд SQL:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
В Databricks Runtime 13.3 LTS и более поздних версиях можно создать CREATE TABLE LIKE
пустую таблицу Delta, которая дублирует свойства схемы и таблицы для исходной таблицы Delta. Это может быть особенно полезно при продвижении таблиц из среды разработки в рабочую среду, например в следующем примере кода:
CREATE TABLE prod.people10m LIKE dev.people10m
Вы также можете использовать DeltaTableBuilder
API в Delta Lake для создания таблиц. В отличие от API-интерфейсов DataFrameWriter этот API упрощает указание дополнительных сведений, таких как комментарии к столбцам, свойства таблиц и созданные столбцы.
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
Upsert в таблицу
Чтобы объединить набор операций обновления и вставки в существующей разностной таблице, используйте инструкцию MERGE INTO. Например, следующая инструкция принимает данные из исходной таблицы и объединяет их с целевой разностной таблицей. Если в обеих таблицах есть совпадающая строка, Delta Lake обновляет столбец данных с использованием заданного выражения. Если совпадающая строка отсутствует, Delta Lake добавляет новую строку. Эта операция называется upsert.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Если указать *
, будут обновлены или вставлены все столбцы в целевой таблице. При этом предполагается, что в исходной таблице есть те же столбцы, что и в целевой, иначе запрос выдаст ошибку анализа.
При выполнении операции INSERT
необходимо указать значение для каждого столбца в таблице (например, если в существующем наборе данных нет соответствующей строки). Однако обновлять все значения не требуется.
Чтобы просмотреть результаты, отправьте к таблице запрос.
SELECT * FROM people_10m WHERE id >= 9999998
Чтение из таблицы
Доступ к данным в разностных таблицах можно получить по имени таблицы или пути к таблице, как показано в следующих примерах:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
Запись в таблицу
Delta Lake использует стандартный синтаксис для записи данных в таблицы.
Чтобы атомарно добавить новые данные в существующую таблицу Delta, используйте append
режим, как в следующих примерах:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
Чтобы атомарно заменить все данные в таблице, используйте overwrite
режим, как показано в следующих примерах:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
Обновление таблицы
Вы можете обновить данные, соответствующие предикату в таблице Delta. Например, в таблице с именем people10m
или путем /tmp/delta/people-10m
для изменения аббревиатуры в столбце gender
с M
или F
на Male
или Female
можно выполнить следующее:
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
Удаление из таблицы
Вы можете удалить данные, соответствующие предикату из таблицы Delta. Например, в таблице с именем people10m
или путем /tmp/delta/people-10m
, чтобы удалить все строки, соответствующие людям со значением в столбце birthDate
, предшествующим 1955
:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Внимание
delete
удаляет данные из последней версии таблицы Delta, но не удаляет их из физического хранилища до тех пор, пока старые версии не будут явным образом очищены. См. подробные сведения о команде vacuum.
Отображение журнала таблиц
Чтобы просмотреть журнал таблицы, используйте инструкцию DESCRIBE HISTORY, которая возвращает сведения о происхождении, включая версию таблицы, операцию, пользователя и т. д. для каждой операции записи в таблицу.
DESCRIBE HISTORY people_10m
Запрос более ранней версии таблицы (путешествия по времени)
Переход по временем в Delta Lake позволяет запрашивать более ранний моментальный снимок таблицы Delta.
Чтобы запросить более раннюю версию таблицы, укажите эту версию или метку времени в операторе SELECT
. Например, чтобы запросить версию 0 из журнала выше, используйте следующую инструкцию:
SELECT * FROM people_10m VERSION AS OF 0
or
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
В качестве меток времени принимаются только строки даты или меток времени, например "2019-01-01"
и "2019-01-01'T'00:00:00.000Z"
.
Параметры DataFrameReader позволяют создать кадр данных из таблицы Delta, зафиксированной для определенной версии таблицы, например в Python:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
или, кроме того, можно также:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
Дополнительные сведения см. в разделе " Работа с журналом таблиц Delta Lake".
Оптимизация таблицы
После внесения в таблицу нескольких изменений может образоваться много небольших файлов. Чтобы повысить скорость обработки запросов чтения, можно с помощью OPTIMIZE
свернуть мелкие файлы в более крупные:
OPTIMIZE people_10m
Z-порядок по столбцам
Чтобы дополнительно повысить производительность операций чтения, можно размещать связанную информацию в одном наборе файлов с помощью Z-упорядочения. Такое совместное размещение автоматически применяется в алгоритмах пропуска данных в Delta Lake, чтобы значительно уменьшить объем данных, которые необходимо считывать. Для Z-упорядочения данных необходимо указать столбцы для упорядочения в предложении ZORDER BY
. Например, для совместного размещение по признаку gender
выполните следующую команду:
OPTIMIZE people_10m
ZORDER BY (gender)
Полный набор параметров, доступных при запуске OPTIMIZE
, см. в разделе "Компактные файлы данных" с оптимизацией в Delta Lake.
Очистка моментальных снимков с помощью VACUUM
Delta Lake обеспечивает изоляцию моментальных снимков для операций чтения, и это означает, что OPTIMIZE
можно безопасно выполнять даже тогда, когда другие пользователи или задания осуществляют запросы к таблице. Однако в конечном итоге старые моментальные снимки нужно удалять. Для этого выполните следующую команду VACUUM
:
VACUUM people_10m
Дополнительные сведения об использовании VACUUM
эффективно см. в разделе "Удаление неиспользуемых файлов данных с помощью вакуума".