병합을 사용하여 Delta Lake 테이블에 upsert

MERGE SQL 작업을 사용하여 원본 테이블, 뷰 또는 DataFrame에서 대상 델타 테이블에 데이터를 upsert할 수 있습니다. Delta Lake는 삽입, 업데이트 및 삭제를 MERGE지원하며 고급 사용 사례를 용이하게 하기 위해 SQL 표준을 초과하는 확장 구문을 지원합니다.

people10mupdates 테이블 또는 /tmp/delta/people-10m-updates 원본 경로에 대상 테이블 people10m 또는 대상 경로 /tmp/delta/people-10m에 대한 새 데이터가 포함되어 있다고 가정하겠습니다. 새 레코드 중 일부는 대상 데이터에 이미 있을 수 있습니다. 새 데이터를 병합하려면 이 사람의 id가 이미 있는 행은 업데이트하고, 일치하는 id가 없는 경우에는 새 행을 삽입해야 합니다. 다음 쿼리를 실행할 수 있습니다.

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Scala 및 Python의 구문 세부 사항은 Delta Lake API 문서를 참조하세요. Spark SQL 구문 세부 정보는 MERGE INTO를 참조하세요.

병합을 사용하여 일치하지 않는 모든 행 수정

Databricks SQL 및 Databricks Runtime 12.2 LTS 이상에서는 원본 테이블에 해당 레코드가 없는 대상 테이블의 레코드나 DELETEUPDATE 을 사용할 WHEN NOT MATCHED BY SOURCE 수 있습니다. Databricks는 대상 테이블을 완전히 다시 작성하지 않도록 선택적 조건부 절을 추가하는 것이 좋습니다.

다음 코드 예제에서는 삭제에 이것을 사용하고, 대상 테이블을 원본 테이블의 내용으로 덮어쓰고, 대상 테이블에서 일치하지 않는 레코드를 삭제하는 기본 구문을 보여 줍니다. 원본 업데이트 및 삭제가 시간 제한인 테이블에 대한 확장성 있는 패턴은 델타 테이블을 원본과 증분 동기화를 참조하세요.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

다음 예제에서는 절에 WHEN NOT MATCHED BY SOURCE 조건을 추가하고 일치하지 않는 대상 행에서 업데이트할 값을 지정합니다.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

작업 의미 체계 병합

다음은 프로그래밍 방식 작업 의미 체계에 merge 대한 자세한 설명입니다.

  • whenMatchedwhenNotMatched 절은 원하는 개수만큼 가질 수 있습니다.

  • whenMatched 절은 일치 조건에 따라 원본 행이 대상 테이블 행과 일치할 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.

    • whenMatched 절에는 최대 1개의 update 작업과 1개의 delete 작업이 포함될 수 있습니다. mergeupdate 동작은 (update작업과 마찬가지로) 일치된 대상 행의 지정된 열만 업데이트합니다. delete 동작은 일치된 행을 삭제합니다.

    • whenMatched 절에 선택적 조건이 포함될 수 있습니다. 이 절 조건이 존재하는 경우, 절 조건이 true인 경우에만 일치하는 원본-대상 행 쌍에 대해 update 또는 delete 동작이 실행됩니다.

    • 여러 whenMatched 절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든 whenMatched 절에 조건이 있어야 합니다.

    • whenMatched 조건 중 병합 조건과 일치하는 원본-대상 행 쌍에 대해 true인 조건이 없는 경우, 대상 행이 변경되지 않습니다.

    • 대상 델타 테이블의 모든 열을 원본 데이터 세트의 해당 열로 업데이트하려면 whenMatched(...).updateAll()를 사용합니다. 다음 코드와 동일합니다.

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      대상 델타 데이블의 모든 열에 대해 위 코드와 동일합니다. 따라서 이 동작은 원본 테이블이 대상 테이블과 동일한 열을 갖는다고 가정하며, 동일한 열을 갖지 않는 경우 쿼리가 분석 오류를 throw합니다.

      참고 항목

      이 동작은 자동 스키마 마이그레이션을 사용하도록 설정하면 변경됩니다. 자세한 내용은 자동 스키마 개선을 참조하세요.

  • whenNotMatched 절은 일치 조건에 따라 원본 행이 대상 행과 일치하지 않을 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.

    • whenNotMatched 절은 하나의 insert 동작만 가질 수 있습니다. 지정한 열과 해당 식에 따라 새 행이 생성됩니다. 대상 테이블의 모든 열을 지정할 필요는 없습니다. 지정하지 않은 대상 열의 경우 NULL이 삽입됩니다.

    • whenNotMatched 절에 선택적 조건이 포함될 수 있습니다. 절 조건이 있는 경우 해당 행에 대한 조건이 true일 때만 원본 행이 삽입됩니다. 그렇지 않으면 원본 열이 무시됩니다.

    • 여러 whenNotMatched 절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든 whenNotMatched 절에 조건이 있어야 합니다.

    • 대상 델타 테이블의 모든 열을 원본 데이터 세트의 해당 열과 함께 삽입하려면 whenNotMatched(...).insertAll()를 사용합니다. 다음 코드와 동일합니다.

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      대상 델타 데이블의 모든 열에 대해 위 코드와 동일합니다. 따라서 이 동작은 원본 테이블이 대상 테이블과 동일한 열을 갖는다고 가정하며, 동일한 열을 갖지 않는 경우 쿼리가 분석 오류를 throw합니다.

      참고 항목

      이 동작은 자동 스키마 마이그레이션을 사용하도록 설정하면 변경됩니다. 자세한 내용은 자동 스키마 개선을 참조하세요.

  • whenNotMatchedBySource 절은 대상 행이 병합 조건에 따라 원본 행과 일치하지 않을 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.

    • whenNotMatchedBySource 절은 지정 delete 하고 update 작업을 수행할 수 있습니다.
    • whenNotMatchedBySource 절에 선택적 조건이 포함될 수 있습니다. 절 조건이 있는 경우 해당 행에 대해 해당 조건이 true인 경우에만 대상 행이 수정됩니다. 그렇지 않으면 대상 행이 변경되지 않은 상태로 유지됩니다.
    • 여러 whenNotMatchedBySource 절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든 whenNotMatchedBySource 절에 조건이 있어야 합니다.
    • 정의 whenNotMatchedBySource 에 따라 절에는 열 값을 끌어올 원본 행이 없으므로 원본 열을 참조할 수 없습니다. 수정할 각 열에 대해 리터럴을 지정하거나 대상 열에 대해 작업을 수행할 수 있습니다(예: SET target.deleted_count = target.deleted_count + 1.).

Important

  • 원본 데이터 세트의 여러 행이 일치하고 병합이 대상 델타 테이블의 동일한 행을 업데이트하려고 시도하면 merge 작업이 실패할 수 있습니다. 병합의 SQL 의미 체계에 따르면 일치하는 대상 행을 업데이트하는 데 사용해야 하는 원본 행이 명확하지 않으므로 이러한 업데이트 작업이 모호합니다. 원본 테이블을 전처리하여 여러 일치 항목이 발생할 가능성을 제거할 수 있습니다.
  • 뷰가 CREATE VIEW viewName AS SELECT * FROM deltaTable로 정의된 경우에만 SQL VIEW에 SQL MERGE 작업을 적용할 수 있습니다.

델타 테이블에 쓸 때 데이터 중복 제거

일반적인 ETL 사용 사례는 로그를 테이블에 추가하여 델타 테이블로 수집하는 것입니다. 그러나 원본이 중복된 로그 레코드를 생성하기 때문에 이를 처리하기 위해 다운스트림 중복 제거 단계가 필요하게 되는 경우가 많습니다. merge를 사용할 때는 중복된 레코드가 삽입되는 것을 방지할 수 있습니다.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

참고 항목

새 로그를 포함하는 데이터 세트는 자체적으로 중복 제거되어야 합니다. 새 로그를 포함하는 데이터 세트는 병합의 SQL 의미 체계에 따라 새 데이터를 테이블의 기존 데이터와 매칭하고 중복 제거하지만, 새 데이터 세트 내에 중복 데이터가 있는 경우에는 삽입됩니다. 따라서 테이블에 병합하기 전에 새 데이터를 중복 제거해야 합니다.

며칠 동안만 중복 레코드를 가져올 수 있다는 것을 알고 있는 경우 테이블을 날짜별로 분할한 다음 일치시킬 대상 테이블의 날짜 범위를 지정하여 쿼리를 추가로 최적화할 수 있습니다.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

이 명령은 테이블 전체가 아니라 마지막 7일 분량의 로그에서 중복 항목을 찾기 때문에 앞의 명령보다 효율적입니다. 또한 구조적 스트리밍으로 이 삽입 전용 병합을 사용하면 로그의 연속 중복 제거를 수행할 수 있습니다.

  • 스트리밍 쿼리에서는 foreachBatch에서 병합 작업을 사용하여 스트리밍 데이터를 중복 제거를 사용한 상태로 델타 테이블에 연속 쓰기할 수 있습니다. foreachBatch에 대한 자세한 내용은 아래의 스트리밍 예제를 참조하세요.
  • 다른 스트리밍 쿼리에서는 이 델타 테이블에서 중복 제거된 데이터를 연속으로 읽을 수 있습니다. 이것은 삽입 전용 병합은 델타 테이블에 새 데이터만 추가하기 때문에 가능합니다.

Delta Lake를 사용하여 SCD(느린 변경 데이터) 및 CDC(변경 데이터 캡처)

Delta Live Tables는 SCD Type 1 및 Type 2를 추적하고 적용하기 위한 기본 지원을 제공합니다. 델타 라이브 테이블과 함께 사용하면 APPLY CHANGES INTO CDC 피드를 처리할 때 잘못된 레코드가 올바르게 처리되는지 확인합니다. APPLY CHANGES API: Delta Live Tables에서 변경 데이터 캡처 간소화를 참조하세요.

델타 테이블을 원본과 증분 방식으로 동기화

Databricks SQL 및 Databricks Runtime 12.2 LTS 이상에서는 임의의 조건을 만들어 테이블의 일부를 원자성으로 삭제하고 바꿀 수 있습니다 WHEN NOT MATCHED BY SOURCE . 이는 초기 데이터 입력 후 며칠 동안 레코드가 변경되거나 삭제될 수 있지만 최종 상태로 정착되는 원본 테이블이 있는 경우에 특히 유용할 수 있습니다.

다음 쿼리에서는 이 패턴을 사용하여 원본에서 5일간의 레코드를 선택하고, 대상에서 일치하는 레코드를 업데이트하고, 원본에서 대상으로 새 레코드를 삽입하고, 대상에서 지난 5일 동안 일치하지 않는 모든 레코드를 삭제하는 방법을 보여 줍니다.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

원본 및 대상 테이블에 동일한 부울 필터를 제공하면 삭제를 포함하여 원본에서 대상 테이블로 변경 내용을 동적으로 전파할 수 있습니다.

참고 항목

이 패턴은 조건부 절 없이 사용할 수 있지만 이로 인해 비용이 많이 들 수 있는 대상 테이블을 완전히 다시 작성하게 됩니다.