Azure Databricks で Delta Lake 変更データ フィードを使用する
Note
- この記事では、変更データ フィード機能を使用し、デルタ テーブルの行レベルの変更情報を記録およびクエリする方法について説明します。 ソース データの変更に基づいて Delta Live Tables パイプライン内のテーブルを更新する方法については、「Delta Live Tables での APPLY CHANGES API を使用した変更データ キャプチャの簡略化」を参照してください。
Azure Databricks で変更データ フィードを使うと、Delta テーブルのバージョン間で行レベルの変更を追跡できます。 Delta テーブルで有効になっていると、ランタイムによって、テーブルに書き込まれたすべてのデータの "変更イベント" が記録されます。 これには、行データと、指定された行が挿入、削除、または更新されたかどうかを示すメタデータが含まれます。
Spark SQL、Apache Spark DataFrames、および構造化ストリーミングを使って、バッチ クエリで変更イベントを読み取ることができます。
重要
変更データ フィードはテーブル履歴と連携して動作し、変更情報を提供します。 Delta テーブルを複製すると別の履歴が作成されるため、複製されたテーブルの変更データ フィードは元のテーブルの変更データ フィードと一致しません。
ユース ケース
変更データ フィードは、既定では有効になっていません。 変更データ フィードを有効にすると、次のユース ケースが利用できるようになります。
- シルバーおよびゴールド テーブル: 最初の
MERGE
、UPDATE
、またはDELETE
操作の後で行レベルの変更のみを処理し、ETL および ELT 操作を高速化および簡素化することにより、Delta Lake のパフォーマンスを向上させます。 - 具体化されたビュー: 基になるテーブルをすべて再処理することなく、BI および分析で使用する情報の最新の集計されたビューを作成し、変更箇所のみを更新します。
- 変更の転送: データ パイプラインを後で段階的に処理するために使用できる、Kafka や RDBMS などの下流システムに変更データ フィードを送信します。
- 監査証跡テーブル: デルタ テーブルとして変更データ フィードをキャプチャすると、永続的なストレージと効率的なクエリ機能が提供されます。これにより、削除が発生したときや、更新が行われたタイミングなど、時間の経過と共にすべての変更を確認できます。
変更データ フィードを有効にする
次のいずれかの方法を使用して、データ フィードの変更オプションを明示的に有効にする必要があります。
新しいテーブル: テーブルのプロパティ
delta.enableChangeDataFeed = true
を、CREATE TABLE
コマンドで設定します。CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
既存のテーブル: テーブルのプロパティ
delta.enableChangeDataFeed = true
を、ALTER TABLE
コマンドで設定します。ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
すべての新しいテーブル:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
重要
データ フィードの変更を有効にした後に加えられた変更のみが記録されます。テーブルに対する以前の変更はキャプチャされません。
データ ストレージを変更する
Azure Databricks によって、UPDATE
、DELETE
、MERGE
操作に対する変更データが、テーブル ディレクトリの下の _change_data
フォルダーに記録されます。 挿入のみの操作やパーティションの完全な削除などの一部の操作では、Azure Databricks がトランザクション ログから直接変更データ フィードを効率的に計算できるため、_change_data
ディレクトリにデータは生成されません。
_change_data
フォルダー内のファイルは、テーブルの保持ポリシーに従います。 そのため、VACUUM コマンドを実行すると、データ フィードの変更データも削除されます。
バッチ クエリの変更を読み取る
開始と終了に、バージョンまたはタイムスタンプを指定できます。 クエリには、開始バージョンと終了バージョンおよびタイムスタンプが含まれます。 特定の開始バージョンから最新バージョンのテーブルへの変更を読み取るには、開始バージョンまたはタイムスタンプのみを指定します。
バージョンを整数として指定し、タイムスタンプを形式 yyyy-MM-dd[ HH:mm:ss[.SSS]]
の文字列として指定します。
変更イベントが記録されているバージョンより古いバージョンまたはタイムスタンプを指定した場合、つまり変更データ フィードが有効になっている場合は、変更データ フィードが有効になっていないことを示すエラーがスローされます。
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
Python
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
Scala
// version as ints or longs
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// path based tables
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("pathToMyDeltaTable")
ストリーミング クエリの変更を読み取る
Python
# providing a starting version
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# providing a starting timestamp
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2021-04-21 05:35:43") \
.load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("myDeltaTable")
Scala
// providing a starting version
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// providing a starting timestamp
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "2021-04-21 05:35:43")
.load("/pathToMyDeltaTable")
// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("myDeltaTable")
テーブルの読み取り中に変更データを取得するには、オプション readChangeFeed
を true
に設定します。
startingVersion
またはstartingTimestamp
は省略可能です。指定されていない場合、ストリームはストリーミング時にテーブルの最新のスナップショットを INSERT
として返し、その後の変更は変更データとして返します。
転送率の制限 (maxFilesPerTrigger
、maxBytesPerTrigger
) や excludeRegex
などのオプションは、変更データの読み取り時にもサポートされます。
注意
レート制限は、スナップショットの開始バージョン以外のバージョンではアトミックにすることができます。 つまり、コミット バージョン全体に対してレート制限が適用されるか、コミット全体が返されます。
既定では、ユーザーがテーブルの最後のコミットを超えるバージョンまたはタイムスタンプを渡すと、エラー timestampGreaterThanLatestCommit
がスローされます。 Databricks Runtime 11.3 LTS 以降では、ユーザーが次の構成を true
に設定している場合、変更データ フィードは範囲外のバージョンのケースを処理できます。
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
テーブルの最後のコミットより新しい開始バージョン、またはテーブルの最後のコミットより新しい開始タイムスタンプを指定した場合、上記の構成が有効になっていると、空の読み取り結果が返されます。
テーブルの最後のコミットより新しい開始バージョン、またはテーブルの最後のコミットより新しい開始タイムスタンプを指定した場合、前述の構成がバッチ読み取りモードで有効になっている時、開始バージョンと最後のコミットの間のすべての変更が 返されます。
変更データ フィードのスキーマとは
テーブルの変更データ フィードから読み取る際には、最新のテーブル バージョンのスキーマが使用されます。
注意
スキーマの変更と進化のほとんどの操作は、完全にサポートされています。 列マッピングが有効になっているテーブルでは、一部のユース ケースがサポートされておらず、異なる動作が示されます。 「列マッピングが有効になっているテーブルの変更データ フィードの制限事項」を参照してください。
変更データ フィードには、Delta テーブルのスキーマのデータ列に加えて、変更イベントの種類を識別するメタデータ列が含まれています。
列名 | Type | 値 |
---|---|---|
_change_type |
String | insert 、update_preimage 、update_postimage 、delete (1) |
_commit_version |
long | 変更を含むデルタ ログまたはテーブルのバージョン。 |
_commit_timestamp |
Timestamp | コミットが作成されたときに関連付けられたタイムスタンプ。 |
(1)preimage
は更新前の値です。postimage
は更新後の値です。
注意
追加された列と同じ名前の列がスキーマに含まれている場合、テーブルで変更データ フィードを有効にすることはできません。 変更データ フィードを有効にする前に、テーブル内の列の名前を変更してこの競合を解決してください。
列マッピングが有効になっているテーブルの変更データ フィードの制限事項
Delta テーブルで列マッピングが有効になっていると、既存データのデータ ファイルを書き換えることなく、テーブル内の列を削除または名前変更できます。 列マッピングが有効な場合、列の名前変更や削除、データ型の変更、NULL 値の許容の変更といった追加処理のないスキーマ変更を行うと、変更データ フィードに制限事項が生じます。
重要
- バッチセマンティクスを使用して追加処理のないスキーマ変更が発生したトランザクションまたは範囲の変更データ フィードを読み取ることはできません。
- Databricks Runtime 12.2 LTS 以前では、列マッピングが有効になっているテーブルで追加処理のないスキーマ変更が発生した場合、変更データ フィードに対するストリーム読み取りはサポートされません。 「列マッピングとスキーマの変更が伴うストリーミング」を参照してください。
- Databricks Runtime 11.3 LTS 以前では、列マッピングが有効になっているテーブルで列の名前変更または削除が発生した場合、そのテーブルの変更データ フィードを読み取ることはできません。
Databricks Runtime 12.2 LTS 以降では、列マッピングが有効になっているテーブルで追加処理のないスキーマ変更が発生した場合、そのテーブルの変更データ フィードに対してバッチ読み取りを実行できます。 読み取り操作では、テーブルの最新バージョンのスキーマを使用する代わりに、クエリで指定されたテーブルの最終バージョンのスキーマが使用されます。 指定されたバージョン範囲に追加処理のないスキーマ変更が含まれる場合、クエリは失敗します。
よく寄せられる質問 (FAQ)
変更データ フィードを有効にするオーバーヘッドはありますか。
大きな影響はありません。 変更データ レコードは、クエリ実行プロセス中に行単位で生成され、通常、書き換えられたファイルの合計サイズよりも大幅に小さくなります。
変更レコードの保持ポリシーとは何ですか。
変更レコードは、古いバージョンのテーブルと同じ保持ポリシーに従います。指定した保有期間を超えると、VACUUM によってクリーンアップされます。
変更データ フィードで新しいレコードが使用可能になるのはいつですか。
変更データは、Delta Lake トランザクションと共にコミットされ、テーブルで新しいデータを使用できるようになったときに使用できるようになります。
ノートブックの例: 差分変更データ フィードを使用して変更を反映する
このノートブックでは、ワクチン接種率の絶対数のシルバー テーブルに行われた変更を、ワクチン接種率のゴールド テーブルに反映する方法を示します。