Руководство по Delta Lake

В этом руководстве представлены общие операции Delta Lake в Azure Databricks, в том числе следующие:

Примеры кода Python, R, Scala и SQL в этой статье можно запустить из записной книжки, подключенной к кластеру Azure Databricks. Вы также можете выполнить приведенный в этой статье код SQL из запроса, связанного с хранилищем SQL в Databricks SQL.

Примечание.

В некоторых из следующих примеров кода используется двухуровневая нотация пространства имен, состоящая из схемы (также называемой базой данных) и таблицы или представления (например, default.people10m). Чтобы использовать эти примеры с каталогом Unity, замените двухуровневое пространство имен трехуровневой нотацией пространства имен каталога Unity, состоящей из каталога, схемы и таблицы или представления (например, main.default.people10m).

Создание таблицы

Таблицы, созданные в Azure Databricks, по умолчанию используют Delta Lake.

Примечание.

Delta Lake — это значение по умолчанию для всех команд создания таблиц и операций чтения, записи и таблицы Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Предыдущие операции создают новую управляемую таблицу с использованием схемы, выведенной из данных. Сведения о доступных параметрах при создании разностной таблицы см. в статье CREATE TABLE.

Для управляемых таблиц Azure Databricks определяет расположение данных. Чтобы получить расположение, можно использовать инструкцию DESCRIBE DETAIL, например:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Иногда может потребоваться создать таблицу, указав схему перед вставкой данных. Вы можете выполнить это с помощью следующих команд SQL:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

В Databricks Runtime 13.3 LTS и более поздних версиях можно создать CREATE TABLE LIKE пустую таблицу Delta, которая дублирует свойства схемы и таблицы для исходной таблицы Delta. Это может быть особенно полезно при продвижении таблиц из среды разработки в рабочую среду, например в следующем примере кода:

CREATE TABLE prod.people10m LIKE dev.people10m

Вы также можете использовать DeltaTableBuilder API в Delta Lake для создания таблиц. В отличие от API-интерфейсов DataFrameWriter этот API упрощает указание дополнительных сведений, таких как комментарии к столбцам, свойства таблиц и созданные столбцы.

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert в таблицу

Чтобы объединить набор операций обновления и вставки в существующей разностной таблице, используйте инструкцию MERGE INTO. Например, следующая инструкция принимает данные из исходной таблицы и объединяет их с целевой разностной таблицей. Если в обеих таблицах есть совпадающая строка, Delta Lake обновляет столбец данных с использованием заданного выражения. Если совпадающая строка отсутствует, Delta Lake добавляет новую строку. Эта операция называется upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Если указать *, будут обновлены или вставлены все столбцы в целевой таблице. При этом предполагается, что в исходной таблице есть те же столбцы, что и в целевой, иначе запрос выдаст ошибку анализа.

При выполнении операции INSERT необходимо указать значение для каждого столбца в таблице (например, если в существующем наборе данных нет соответствующей строки). Однако обновлять все значения не требуется.

Чтобы просмотреть результаты, отправьте к таблице запрос.

SELECT * FROM people_10m WHERE id >= 9999998

Чтение из таблицы

Доступ к данным в разностных таблицах можно получить по имени таблицы или пути к таблице, как показано в следующих примерах:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Запись в таблицу

Delta Lake использует стандартный синтаксис для записи данных в таблицы.

Чтобы атомарно добавить новые данные в существующую таблицу Delta, используйте append режим, как в следующих примерах:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Чтобы атомарно заменить все данные в таблице, используйте overwrite режим, как показано в следующих примерах:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Обновление таблицы

Вы можете обновить данные, соответствующие предикату в таблице Delta. Например, в таблице с именем people10m или путем /tmp/delta/people-10m для изменения аббревиатуры в столбце gender с M или F на Male или Female можно выполнить следующее:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Удаление из таблицы

Вы можете удалить данные, соответствующие предикату из таблицы Delta. Например, в таблице с именем people10m или путем /tmp/delta/people-10m, чтобы удалить все строки, соответствующие людям со значением в столбце birthDate, предшествующим 1955:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Внимание

delete удаляет данные из последней версии таблицы Delta, но не удаляет их из физического хранилища до тех пор, пока старые версии не будут явным образом очищены. См. подробные сведения о команде vacuum.

Отображение журнала таблиц

Чтобы просмотреть журнал таблицы, используйте инструкцию DESCRIBE HISTORY, которая возвращает сведения о происхождении, включая версию таблицы, операцию, пользователя и т. д. для каждой операции записи в таблицу.

DESCRIBE HISTORY people_10m

Запрос более ранней версии таблицы (путешествия по времени)

Переход по временем в Delta Lake позволяет запрашивать более ранний моментальный снимок таблицы Delta.

Чтобы запросить более раннюю версию таблицы, укажите эту версию или метку времени в операторе SELECT. Например, чтобы запросить версию 0 из журнала выше, используйте следующую инструкцию:

SELECT * FROM people_10m VERSION AS OF 0

or

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

В качестве меток времени принимаются только строки даты или меток времени, например "2019-01-01" и "2019-01-01'T'00:00:00.000Z".

Параметры DataFrameReader позволяют создать кадр данных из таблицы Delta, зафиксированной для определенной версии таблицы, например в Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

или, кроме того, можно также:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Дополнительные сведения см. в разделе " Работа с журналом таблиц Delta Lake".

Оптимизация таблицы

После внесения в таблицу нескольких изменений может образоваться много небольших файлов. Чтобы повысить скорость обработки запросов чтения, можно с помощью OPTIMIZE свернуть мелкие файлы в более крупные:

OPTIMIZE people_10m

Z-порядок по столбцам

Чтобы дополнительно повысить производительность операций чтения, можно размещать связанную информацию в одном наборе файлов с помощью Z-упорядочения. Такое совместное размещение автоматически применяется в алгоритмах пропуска данных в Delta Lake, чтобы значительно уменьшить объем данных, которые необходимо считывать. Для Z-упорядочения данных необходимо указать столбцы для упорядочения в предложении ZORDER BY. Например, для совместного размещение по признаку gender выполните следующую команду:

OPTIMIZE people_10m
ZORDER BY (gender)

Полный набор параметров, доступных при запуске OPTIMIZE, см. в разделе "Компактные файлы данных" с оптимизацией в Delta Lake.

Очистка моментальных снимков с помощью VACUUM

Delta Lake обеспечивает изоляцию моментальных снимков для операций чтения, и это означает, что OPTIMIZE можно безопасно выполнять даже тогда, когда другие пользователи или задания осуществляют запросы к таблице. Однако в конечном итоге старые моментальные снимки нужно удалять. Для этого выполните следующую команду VACUUM:

VACUUM people_10m

Дополнительные сведения об использовании VACUUM эффективно см. в разделе "Удаление неиспользуемых файлов данных с помощью вакуума".