マージを使用した Delta Lake テーブルへの upsert

ソース テーブル、ビュー、または DataFrame のデータをターゲット Delta テーブルにアップサートするには、MERGE SQL 操作を使用します。 Delta Lake では、MERGE での挿入、更新、削除がサポートされ、高度なユース ケースを容易にするために、SQL 標準を超える拡張構文がサポートされています。

people10mupdates という名前のソース テーブル、または people10m という名前を持つターゲット テーブルの新しいデータを含む /tmp/delta/people-10m-updates のソース パス、または /tmp/delta/people-10m のターゲット パスがある場合を考えます。 これらの新しいレコードの一部は、ターゲット データに既に存在している可能性があります。 新しいデータをマージするには、ユーザーの id が既に存在する行を更新し、一致する id がない新しい行を挿入します。 次のクエリを実行できます。

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Scala、Python 構文の詳細については、Delta Lake API ドキュメントを参照してください。 SQL 構文の詳細については、「MERGE INTO」を参照してください

マージを使用して一致しないすべての行を変更する

Databricks SQL および Databricks Runtime 12.2 LTS 以降では、WHEN NOT MATCHED BY SOURCE 句を、ソース テーブルに対応するレコードがないターゲット テーブル内の UPDATE または DELETE レコードに使用することができます。 Databricks では、ターゲット テーブルが完全に書き直されないように、オプションの条件句を追加することを推奨しています。

次のコード例は、これを削除に使用する基本的な構文を示しており、ターゲット テーブルをソース テーブルの内容で上書きし、ターゲット テーブルの一致しないレコードを削除するものです。 ソースの更新や削除に期限があるテーブルのよりスケーラブルなパターンについては、「Delta テーブルをソースと増分同期する」を参照してください。

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

次の例では、WHEN NOT MATCHED BY SOURCE 句に条件を追加し、一致しないターゲット行で更新する値を指定しています。

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

マージ操作のセマンティクス

merge のプログラム操作のセマンティクスに関する詳しい説明を次に示します。

  • whenMatched 句と whenNotMatched 句は任意の数にできます。

  • whenMatched 句は、一致条件に基づいてソース行がターゲット テーブル行と一致する場合に実行されます。 これらの句のセマンティクスは次のとおりです。

    • whenMatched 句には、最大で 1 つの updatedelete アクションを含めることができます。 mergeupdate アクションは、一致するターゲット行の指定された列 (update操作に類似) のみを更新します。 delete アクションによって、一致した行が削除されます。

    • whenMatched 句には、省略可能な条件を指定できます。 この句条件が存在する場合、update または delete アクションは、句条件が true の場合にのみ、一致するソースとターゲットの行ペアに対して実行されます。

    • whenMatched 句が複数ある場合は、指定された順序で評価されます。 最後のものを除くすべての whenMatched 句には条件が必要です。

    • マージ条件に一致するソースとターゲットの行のペアに対して、どの whenMatched 条件も true と評価されない場合、ターゲット行はそのままです。

    • ソース データセットの対応する列を使用して、ターゲット Delta テーブルのすべての列を更新するには、whenMatched(...).updateAll() を使用します。 これは、次の内容と同じです。

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      ターゲット Delta テーブルのすべての列に対して該当します。 したがって、このアクションでは、ソース テーブルの列がターゲット テーブルの列と同じことを前提とします。それ以外の場合、クエリは分析エラーをスローします。

      注意

      この動作は、スキーマの自動移行が有効になっている場合に変わります。 詳細については、スキーマの自動展開に関する内容を参照してください。

  • whenNotMatched clauses are executed when a source row does not match any target row based on the match condition. これらの句のセマンティクスは次のとおりです。

    • whenNotMatched 句には insert アクションのみを含められます。 新しい行は、指定された列とそれに対応する式に基づいて生成されます。 ターゲット テーブル内のすべての列を指定する必要はありません。 指定されていないターゲット列の場合は、NULL が挿入されます。

    • whenNotMatched 句には、省略可能な条件を指定できます。 句の条件が存在する場合、その条件が該当する行に対して true である場合にのみ、ソース行が挿入されます。 それ以外の場合、ソース列は無視されます。

    • whenNotMatched 句が複数ある場合は、指定された順序で評価されます。 最後のものを除くすべての whenNotMatched 句には条件が必要です。

    • ソース データセットの対応する列を使用して、ターゲット Delta テーブルのすべての列を挿入するには、whenNotMatched(...).insertAll() を使用します。 これは、次の内容と同じです。

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      ターゲット Delta テーブルのすべての列に対して該当します。 したがって、このアクションでは、ソース テーブルの列がターゲット テーブルの列と同じことを前提とします。それ以外の場合、クエリは分析エラーをスローします。

      注意

      この動作は、スキーマの自動移行が有効になっている場合に変わります。 詳細については、スキーマの自動展開に関する内容を参照してください。

  • whenNotMatchedBySource 句は、一致条件に基づいてターゲット行がどのソース行とも一致しない場合に実行されます。 これらの句のセマンティクスは次のとおりです。

    • whenNotMatchedBySource 句では、delete および update アクションを指定できます。
    • whenNotMatchedBySource 句には、省略可能な条件を指定できます。 句の条件が存在する場合、その条件が該当する行に対して true である場合にのみ、ターゲット行が変更されます。 それ以外の場合、ターゲット行は変更されません。
    • whenNotMatchedBySource 句が複数ある場合は、指定された順序で評価されます。 最後のものを除くすべての whenNotMatchedBySource 句には条件が必要です。
    • 定義上、whenNotMatchedBySource 句には列の値をプルするソース行がないため、ソース列を参照することはできません。 変更する各列について、リテラルを指定するか、SET target.deleted_count = target.deleted_count + 1 など、ターゲット列に対してアクションを実行できます。

重要

  • ソース データセットの複数の行が一致し、マージがターゲット Delta テーブルの同じ行を更新しようとすると、merge 操作が失敗する可能性があります。 マージの SQL セマンティクスによれば、一致するターゲット行を更新するために使用するソース行が明確でないため、このような更新操作はあいまいです。 ソース テーブルを前処理して、複数の一致が発生する可能性をなくすことができます。
  • ビューが CREATE VIEW viewName AS SELECT * FROM deltaTable として定義されている場合にのみ、SQL VIEW に対してSQL MERGE 操作を適用できます。

Delta テーブルに書き込む場合のデータ重複排除

一般的な ETL の使用例では、ログをテーブルに追加して Delta テーブルに収集します。 しかし、多くの場合、ソースは重複するログ レコードを生成する場合があり、それらを処理するためにダウンストリームの重複排除手順が必要になります。 merge を使用すると、重複するレコードの挿入を回避できます。

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

注意

新しいログを含むデータセットは、それ自体の中で重複を排除する必要があります。 マージの SQL セマンティクスによって、新しいデータをテーブル内の既存のデータと一致させて重複排除が行われますが、新しいデータセット内に重複するデータがある場合はそれが挿入されます。 そのため、テーブルにマージする前に、新しいデータを重複排除します。

数日間だけ重複レコードが取得される可能性がある場合は、日付でテーブルをパーティション分割し、一致させる対象テーブルの日付範囲を指定すると、クエリをさらに最適化できます。

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

これは、テーブル全体ではなく、過去 7 日間のログでのみ重複を検索するため、前のコマンドよりも効率的です。 さらに、この挿入専用マージを構造化ストリーミングと共に使用して、ログの継続的重複排除を実行できます。

  • ストリーミング クエリでは、foreachBatch のマージ操作を使用して、重複排除を設定して Delta テーブルにストリーミング データを継続的に書き込めます。 foreachBatch の詳細については、次のストリーミングの例を参照してください。
  • 別のストリーミング クエリでは、この Delta テーブルから重複排除されたデータを継続的に読み取りできます。 これは、挿入専用のマージでは、Delta テーブルに新しいデータのみが追加されるため可能になります。

Delta Lake を使用して緩やかに変化するデータ (SCD) と変更データ キャプチャ (CDC)

Delta Live Tables では、SCD タイプ 1 およびタイプ 2 の追跡と適用をネイティブにサポートしています。 Delta Live Tables と共に APPLY CHANGES INTO を使用して、CDC フィードの処理中に順序が正しく処理されるようにします。 「Delta Live Tables での APPLY CHANGES API を使用した変更データ キャプチャの簡略化」をご覧ください。

Delta テーブルをソースと増分同期する

Databricks SQL および Databricks Runtime 12.2 LTS 以降では、WHEN NOT MATCHED BY SOURCE を使用して、テーブルの一部をアトミックに削除したり置き換えたりする任意の条件を作成できます。 これは、最初のデータ入力後に数日間レコードが変更または削除される可能性があるが、最終的に最終状態に落ち着くようなソース テーブルがある場合に特に便利です。

次のクエリは、このパターンを使用して、ソースから 5 日間のレコードを選択し、ターゲットの一致するレコードを更新し、ソースからターゲットに新しいレコードを挿入し、ターゲットの過去 5 日間の一致しないレコードをすべて削除する方法を示しています。

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

ソース テーブルとターゲット テーブルに同じブール値フィルターを指定することで、ソースからターゲット テーブルへ、削除を含む変更を動的に反映できます。

注意

このパターンは条件句なしで使用することもできますが、その場合、ターゲット テーブルを完全に書き換えることになり、コストがかかる可能性があります。