Upsert в таблицу Delta Lake с помощью слияния

Данные из исходной таблицы, представления или DataFrame можно передать с помощью операции upsert в целевую таблицу Delta с помощью операции SQL MERGE. Delta Lake поддерживает вставки, обновления и удаления, MERGEа также поддерживает расширенный синтаксис за пределами стандартов SQL для упрощения расширенных вариантов использования.

Предположим, у вас есть исходная таблица с именем people10mupdates или исходный путь /tmp/delta/people-10m-updates, содержащий новые данные для целевой таблицы с именем people10m или целевым путем /tmp/delta/people-10m. Некоторые из этих новых записей уже могут присутствовать в целевых данных. Чтобы объединить новые данные, необходимо обновить строки, в которых уже имеется пользователь id, и вставить новые строки, в которых нет соответствующих id. Вы можете выполнить следующий запрос:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Дополнительные сведения о синтаксисе Scala и Python см. в документации по API Delta Lake. Сведения о синтаксисе SQL см. в разделе MERGE INTO

Изменение всех несовпаденных строк с помощью слияния

В Databricks SQL и Databricks Runtime 12.2 LTS и более поздних версиях можно использовать WHEN NOT MATCHED BY SOURCE предложение UPDATE для или DELETE записи в целевой таблице, которые не имеют соответствующих записей в исходной таблице. Databricks рекомендует добавить необязательное условное предложение, чтобы избежать полной перезаписи целевой таблицы.

В следующем примере кода показан базовый синтаксис использования для удаления, перезапись целевой таблицы с содержимым исходной таблицы и удаление несовпаденных записей в целевой таблице. Дополнительные масштабируемые шаблоны для таблиц, в которых исходные обновления и удаления привязаны к времени, см. в разделе Добавочная синхронизация таблицы Delta с источником.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

В следующем примере добавляются условия в WHEN NOT MATCHED BY SOURCE предложение и указываются значения для обновления в несовпаденных целевых строках.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Семантика операции слияния

Ниже приведено подробное описание семантики программной merge операции.

  • Может существовать любое количество предложений whenMatched и whenNotMatched.

  • Предложения whenMatched выполняются, когда исходная строка соответствует строке целевой таблицы на основе условия соответствия. Эти предложения имеют следующую семантику.

    • Предложения whenMatched могут иметь не более одного действия update и одного действия delete. Действие update в merge обновляет указанные столбцы (аналогично операцииupdate) соответствующей целевой строки. Действие delete удаляет сопоставленную строку.

    • Каждое предложение whenMatched может иметь необязательное условие. Если это условие предложения существует, действие update или delete выполняется для любой соответствующей пары исходной и целевой строк, только если условие предложения истинно.

    • Если существует несколько предложений whenMatched, они вычисляются в том порядке, в котором указаны. Все предложения whenMatched, за исключением последнего, должны иметь условия.

    • Если ни одно из условий whenMatched не вычисляется как истинное для исходной и целевой пары строк, которая соответствует условию слияния, целевая строка остается неизменной.

    • Чтобы заменить все столбцы целевой таблицы Delta соответствующими столбцами исходного набора данных, используйте whenMatched(...).updateAll(). Это соответствует следующей записи:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      для всех столбцов целевой таблицы Delta. Следовательно, это действие предполагает, что в исходной таблице есть те же столбцы, что и в целевой таблице, иначе запрос выдаст ошибку анализа.

      Примечание.

      Эти изменения в поведении при включении автоматической миграции схемы. Дополнительные сведения см. в статье об автоматической эволюции схемы.

  • Предложения whenNotMatched выполняются, когда исходная строка не соответствует какой-либо целевой строке на основе условия соответствия. Эти предложения имеют следующую семантику.

    • Предложения whenNotMatched могут иметь только действие insert. Новая строка создается на основе указанного столбца и соответствующих выражений. Нет необходимости указывать все столбцы в целевой таблице. Для неуказанных целевых столбцов вставляется значение NULL.

    • Каждое предложение whenNotMatched может иметь необязательное условие. Если условие предложения присутствует, исходная строка вставляется, только если это условие истинно для этой строки. В противном случае исходный столбец игнорируется.

    • Если существует несколько предложений whenNotMatched, они вычисляются в том порядке, в котором указаны. Все предложения whenNotMatched, за исключением последнего, должны иметь условия.

    • Чтобы заменить все столбцы целевой таблицы Delta соответствующими столбцами исходного набора данных, используйте whenNotMatched(...).insertAll(). Это соответствует следующей записи:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      для всех столбцов целевой таблицы Delta. Следовательно, это действие предполагает, что в исходной таблице есть те же столбцы, что и в целевой таблице, иначе запрос выдаст ошибку анализа.

      Примечание.

      Эти изменения в поведении при включении автоматической миграции схемы. Дополнительные сведения см. в статье об автоматической эволюции схемы.

  • whenNotMatchedBySource предложения выполняются, если целевая строка не соответствует исходной строке в зависимости от условия слияния. Эти предложения имеют следующую семантику.

    • whenNotMatchedBySource предложения могут указывать delete и update выполнять действия.
    • Каждое предложение whenNotMatchedBySource может иметь необязательное условие. Если условие предложения присутствует, целевая строка изменяется только в том случае, если это условие имеет значение true для этой строки. В противном случае целевая строка остается без изменений.
    • Если существует несколько предложений whenNotMatchedBySource, они вычисляются в том порядке, в котором указаны. Все предложения whenNotMatchedBySource, за исключением последнего, должны иметь условия.
    • По определению whenNotMatchedBySource предложения не имеют исходной строки для извлечения значений столбцов, поэтому на исходные столбцы нельзя ссылаться. Для каждого столбца, который необходимо изменить, можно указать литерал или выполнить действие в целевом столбце, например SET target.deleted_count = target.deleted_count + 1.

Внимание

  • Операция merge может завершиться ошибкой, если несколько строк исходного набора данных совпадают и операция слияния пытается обновить одни и те же строки целевой таблицы Delta. В соответствии с семантикой слияния SQL такая операция обновления неоднозначна, так как неясно, какую исходную строку следует использовать для обновления соответствующей целевой строки. Вы можете предварительно обработать исходную таблицу, чтобы исключить возможность множественных совпадений.
  • Вы можете применить операцию SQL MERGE к представлению SQL, только если представление определено как CREATE VIEW viewName AS SELECT * FROM deltaTable.

Дедупликация данных при записи в таблицы Delta

Распространенным вариантом использования извлечения. преобразования и загрузки является сбору журналов в таблице Delta путем добавления их в таблицу. Однако часто источники могут создавать дублирующиеся записи журнала, и тогда требуются нижестоящие шаги по дедупликации. С помощью merge можно избежать вставки дублирующихся записей.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Примечание.

Набор данных, содержащий новые журналы, нужно дедуплицировать внутри самого себя. По семантике SQL слияния оно сопоставляет новые данные с существующими данными в таблице и выполняет дедупликацию, но если дублирующиеся данные есть в новом наборе данных, они вставляются. Таким образом, перед слиянием в таблицу новых данных следует провести их дедупликацию.

Если вы знаете, что вы можете получить повторяющиеся записи только в течение нескольких дней, вы можете оптимизировать запрос дальше, секционируя таблицу по дате, а затем указывая диапазон дат целевой таблицы для сопоставления.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Это более эффективно, чем предыдущая команда, так как она ищет дубликаты только за последние семь дней журналов, а не во всей таблице. Кроме того, вы можете использовать слияние только со вставкой со структурированной потоковой передачей для выполнения непрерывной дедупликации журналов.

  • В запросах потоковой передачи можно использовать операцию слияния в foreachBatch для непрерывной записи потоковых данных в таблицу Delta с дедупликацией. Дополнительные сведения о foreachBatch см. в следующем примере потоковой передачи.
  • В другом запросе потоковой передачи вы можете постоянно считывать дедуплицированные данные из этой таблицы Delta. Это возможно, поскольку слияние только со вставкой добавляет новые данные в таблицу Delta.

Медленно меняющиеся данные (SCD) и запись измененных данных (CDC) с Delta Lake

Delta Live Tables имеет встроенную поддержку отслеживания и применения SCD Type 1 и Type 2. Используйте APPLY CHANGES INTO с разностными динамическими таблицами, чтобы обеспечить правильную обработку записей вне порядка при обработке веб-каналов CDC. См . РАЗДЕЛ API APPLY CHANGES: Упрощение записи измененных данных в разностных динамических таблицах.

Добавочная синхронизация таблицы Delta с источником

В Databricks SQL и Databricks Runtime 12.2 LTS и более поздних версиях можно использовать WHEN NOT MATCHED BY SOURCE для создания произвольных условий для атомарного удаления и замены части таблицы. Это может быть особенно полезно, если у вас есть исходная таблица, в которой записи могут изменяться или удаляться в течение нескольких дней после начальной записи данных, но в конечном итоге урегулируются до окончательного состояния.

Следующий запрос показывает использование этого шаблона для выбора 5 дней записей из источника, обновления соответствующих записей в целевом объекте, вставки новых записей из источника в целевой объект и удаления всех несовпаденных записей за последние 5 дней в целевом объекте.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Предоставляя тот же логический фильтр в исходных и целевых таблицах, вы можете динамически распространять изменения из источника в целевые таблицы, включая удаления.

Примечание.

Хотя этот шаблон можно использовать без каких-либо условных предложений, это приведет к полной перезаписи целевой таблицы, которая может быть дорогой.