Delta Lake テーブル履歴の処理

Delta テーブルを変更する操作を行うたびに、テーブルの新しいバージョンが作成されます。 履歴情報を使用し、タイム トラベルを使用することで、特定の時点での操作を監査したり、テーブルをロールバックしたり、テーブルに対してクエリを実行したりできます。

注意

Databricks では、データ アーカイブの長期的なバックアップ ソリューションとして Delta Lake テーブル履歴を使用することはお勧めしていません。 データとログの保持の両方の構成を大きな値に設定していない限り、Databricks では、タイム トラベル操作には過去 7 日間のみを使用することをお勧めしています。

Delta テーブルの履歴を取得する

history コマンドを実行すると、Delta テーブルへの書き込みごとに操作、ユーザー、タイムスタンプなどに関する情報を取得できます。 操作は、逆の時系列順で返されます。

テーブル履歴の保持期間は、テーブル設定 delta.logRetentionDuration によって決まり、既定では 30 日間です。

注意

タイム トラベルとテーブル履歴は、さまざまな保持しきい値によって制御されます。 「Delta Lake のタイム トラベルとは」を参照してください。

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Spark SQL 構文の詳細については、「DESCRIBE HISTORY」を参照してください。

Scala、Java、Python 構文の詳細については、Delta Lake API ドキュメントを参照してください。

Catalog Explorer には、Delta テーブルのこの詳細なテーブル情報と履歴が表示されます。 テーブル スキーマとサンプル データに加えて、[履歴] タブをクリックして DESCRIBE HISTORY で表示されるテーブル履歴を確認することもできます。

履歴スキーマ

history 操作の出力には、次の列があります。

タイプ 説明
version long 操作によって生成されたテーブルのバージョン。
timestamp timestamp このバージョンがコミットされた時点。
userId string 操作を実行したユーザーの ID。
userName string 操作を実行したユーザーの名前。
operation string 操作の名前。
operationParameters map 操作のパラメーター (述語など)。
ジョブ (job) struct 操作を実行したジョブの詳細。
ノートブック struct 操作が実行されたノートブックの詳細。
clusterId string 操作が実行されたクラスターの ID。
readVersion long 書き込み操作を実行するために読み取ったテーブルのバージョン。
isolationLevel string この操作に使用された分離レベル。
isBlindAppend Boolean この操作によってデータが追加されたかどうか。
operationMetrics map 操作のメトリック (変更された行数やファイル数など)。
userMetadata string ユーザー定義のコミット メタデータ (指定されている場合)
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Note

操作メトリックのキー

history操作は、operationMetrics 列マップ内の操作メトリックのコレクションを返します。

次の表は、マップのキー定義を操作別に示しています。

操作 メトリックの名前 説明
WRITE、CREATE TABLE AS SELECT、REPLACE TABLE AS SELECT、COPY INTO
numFiles 書き込まれたファイルの数。
numOutputBytes 書き込まれたコンテンツのサイズ (バイト単位)。
numOutputRows 書き込まれた行の数。
STREAMING UPDATE
numAddedFiles 追加されたファイルの数。
numRemovedFiles 削除されたファイルの数。
numOutputRows 書き込まれた行の数。
numOutputBytes 書き込みのサイズ (バイト単位)。
DELETE
numAddedFiles 追加されたファイルの数。 テーブルのパーティションが削除された場合には提供されません。
numRemovedFiles 削除されたファイルの数。
numDeletedRows 削除された行の数。 テーブルのパーティションが削除された場合には提供されません。
numCopiedRows ファイルの削除処理中にコピーされた行の数。
executionTimeMs 操作全体の実行にかかった時間。
scanTimeMs ファイルの一致をスキャンするのに要した時間。
rewriteTimeMs 一致したファイルの書き換えに要した時間。
TRUNCATE
numRemovedFiles 削除されたファイルの数。
executionTimeMs 操作全体の実行にかかった時間。
MERGE
numSourceRows ソース データフレーム内の行の数。
numTargetRowsInserted ターゲット テーブルに挿入された行の数。
numTargetRowsUpdated ターゲット テーブル内で更新された行の数。
numTargetRowsDeleted ターゲット テーブル内で削除された行の数。
numTargetRowsCopied コピーされたターゲット行の数。
numOutputRows 書き出された行数の合計。
numTargetFilesAdded シンク (ターゲット) に追加されたファイルの数。
numTargetFilesRemoved シンク (ターゲット) から削除されたファイルの数。
executionTimeMs 操作全体の実行にかかった時間。
scanTimeMs ファイルの一致をスキャンするのに要した時間。
rewriteTimeMs 一致したファイルの書き換えに要した時間。
UPDATE
numAddedFiles 追加されたファイルの数。
numRemovedFiles 削除されたファイルの数。
numUpdatedRows 更新された行の数。
numCopiedRows ファイルの更新処理中にコピーされた行の数。
executionTimeMs 操作全体の実行にかかった時間。
scanTimeMs ファイルの一致をスキャンするのに要した時間。
rewriteTimeMs 一致したファイルの書き換えに要した時間。
FSCK numRemovedFiles 削除されたファイルの数。
CONVERT numConvertedFiles 変換された Parquet ファイルの数。
OPTIMIZE
numAddedFiles 追加されたファイルの数。
numRemovedFiles 最適化されたファイルの数。
numAddedBytes テーブルが最適化された後に追加されたバイト数。
numRemovedBytes 削除されたバイト数。
minFileSize テーブルが最適化された後の最小ファイル サイズ。
p25FileSize テーブルが最適化された後の 25 パーセンタイル ファイルのサイズ。
p50FileSize テーブルが最適化された後のファイル サイズの中央値。
p75FileSize テーブルが最適化された後の 75 パーセンタイル ファイルのサイズ。
maxFileSize テーブルが最適化された後の最大ファイル サイズ。
CLONE
sourceTableSize 複製されたバージョンのソース テーブルのサイズ (バイト単位)。
sourceNumOfFiles 複製されたバージョンのソース テーブル内のファイルの数。
numRemovedFiles 前の Delta テーブルが置き換えられた場合に、ターゲット テーブルから削除されたファイルの数。
removedFilesSize 前の Delta テーブルが置き換えられた場合に、ターゲット テーブルから削除されたファイルの合計サイズ (バイト単位)。
numCopiedFiles 新しい場所にコピーされたファイルの数。 シャロー複製の場合は 0。
copiedFilesSize 新しい場所にコピーされたファイルの合計サイズ (バイト単位)。 シャロー複製の場合は 0。
RESTORE
tableSizeAfterRestore 復元後のテーブル サイズ (バイト単位)。
numOfFilesAfterRestore 復元後のテーブル内のファイルの数。
numRemovedFiles 復元操作によって削除されたファイルの数。
numRestoredFiles 復元の結果として追加されたファイルの数。
removedFilesSize 復元によって削除されたファイルのサイズ (バイト単位)。
restoredFilesSize 復元によって追加されたファイルのサイズ (バイト単位)。
VACUUM
numDeletedFiles 削除されたファイルの数。
numVacuumedDirectories バキューム処理されたディレクトリの数。
numFilesToDelete 削除するファイルの数。

Delta Lake のタイム トラベルとは

Delta Lake のタイム トラベルは、タイムスタンプまたはテーブルのバージョン (トランザクション ログに記録されているもの) に基づいて、テーブルの以前のバージョンに対してクエリを実行することをサポートしています。 タイム トラベルは、次のようなアプリケーションに使用できます。

  • 分析、レポート、または出力 (機械学習モデルの出力など) を再作成する。 これは、特に規制対象の業界でデバッグや監査を行う際に役立ちます。
  • 複雑なテンポラル クエリを記述する。
  • データの間違いを修正する。
  • 急速に変化するテーブルに対応する一連のクエリに対してスナップショット分離を提供する。

重要

タイム トラベルでアクセスできるテーブル バージョンは、トランザクション ログ ファイルの保持しきい値と、VACUUM 操作の頻度と指定された保持期間の組み合わせによって決まります。 既定値を使用して VACUUM を毎日実行する場合は、7 日間のデータをタイム トラベルで使用できます。

Delta のタイム トラベル構文

タイム トラベルを使用して Delta テーブルにクエリを実行するには、テーブル名の指定の後に句を追加します。

  • timestamp_expression には次のいずれかを指定できます。
    • '2018-10-18T22:15:12.013Z'、つまり、タイムスタンプにキャストできる文字列です
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18'、つまり、日付文字列です
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • タイムスタンプにキャストされる (できる) その他の式
  • version は、DESCRIBE HISTORY table_spec の出力から取得できる long 型の値です。

timestamp_expressionversion もサブクエリにすることはできません。

日付またはタイムスタンプ文字列のみが使用できます。 たとえば、"2019-01-01""2019-01-01T00:00:00.000Z"す。 構文の例については、次のコードを参照してください。

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

また、@ 構文を使用して、タイムスタンプまたはバージョンをテーブル名の一部として指定することもできます。 タイムスタンプは yyyyMMddHHmmssSSS 形式である必要があります。 バージョンの前に v を付加することで、@ の後にバージョンを指定できます。 構文の例については、次のコードを参照してください。

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

トランザクション ログ チェックポイントとは

Delta Lake を使用すると、テーブルのバージョンが _delta_log ディレクトリ内の JSON ファイルとして記録され、テーブル データと共に格納されます。 チェックポイント クエリを最適化するために、Delta Lake によってテーブルのバージョンが Parquet チェックポイント ファイルに集計されるため、テーブル履歴の JSON バージョンをすべて読み取る必要がなくなります。 Azure Databricks を使用すると、データ サイズとワークロードのチェックポイント処理の頻度が最適化されます。 ユーザーはチェックポイントを直接操作する必要はありません。 チェックポイント処理の頻度は、予告なく変更される場合があります。

タイム トラベル クエリのデータ保持を構成する

以前のテーブル バージョンに対してクエリを実行するには、そのバージョンのログとデータのファイルの "両方" を保持している必要があります。

データ ファイルは、テーブルに対して VACUUM を実行すると削除されます。 Delta Lake では、テーブルのバージョンをチェックポイント処理した後、ログ ファイルの削除が自動的に管理されます。

ほとんどの Delta テーブルに対して VACUUM が定期的に実行されるため、ポイントインタイム クエリでは VACUUM の保持しきい値 (既定では 7 日) を考慮する必要があります。

Delta テーブルのデータ保持しきい値を大きくするには、次のテーブル プロパティを構成する必要があります。

  • delta.logRetentionDuration = "interval <interval>": テーブルの履歴を保持する期間を制御します。 既定では、 interval 30 daysです。
  • delta.deletedFileRetentionDuration = "interval <interval>": 現在のテーブル バージョンで参照されなくなったデータ ファイルを削除するために VACUUM で使用するしきい値を決定します。 既定では、 interval 7 daysです。

テーブルの作成時に "Delta" プロパティを指定する、または ALTER TABLE ステートメントを使用して設定することもできます。 「Delta テーブル プロパティのリファレンス」を参照してください。

注意

これらの両方のプロパティを設定して、頻繁な VACUUM 操作が行われるテーブルのテーブル履歴が、長期間保持されるようにする必要があります。 たとえば、30 日間の履歴データにアクセスするには、delta.deletedFileRetentionDuration = "interval 30 days" を設定します (これは delta.logRetentionDuration の既定の設定と一致します)。

データ保持のしきい値を大きくすると、より多くのデータ ファイルが維持されるため、ストレージ コストが増加する可能性があります。

Delta テーブルを以前の状態に復元する

RESTORE コマンドを使用して、Delta テーブルを以前の状態に復元できます。 Delta テーブルは、以前の状態に復元できるようにするために、テーブルの履歴バージョンを内部的に保持しています。 RESTORE コマンドのオプションとして、以前の状態に対応するバージョンまたは以前の状態が作成されたときのタイムスタンプがサポートされています。

重要

  • 既に復元されたテーブルを復元できます。
  • 複製されたテーブルを復元できます。
  • 復元するテーブルに対する MODIFY アクセス許可が必要です。
  • 手動または vacuum によってデータ ファイルが削除されている場合、以前のバージョンにテーブルを復元することはできません。 それでも、spark.sql.files.ignoreMissingFilestrue に設定されている場合は、このバージョンに部分的に復元できます。
  • 以前の状態に復元するためのタイムスタンプ形式は yyyy-MM-dd HH:mm:ss です。 日付 (yyyy-MM-dd) 文字列のみを指定することもできます。
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

構文の詳細については、「RESTORE」を参照してください。

重要

復元は、データ変更操作と認識されます。 RESTORE コマンドによって追加された Delta Lake ログ エントリには、true に設定された dataChange が含まれます。 Delta Lake テーブルの更新を処理する 構造化ストリーミング ジョブなどのダウンストリーム アプリケーションがある場合、復元操作によって追加されたデータ変更ログ エントリは、新しいデータ更新と見なされ、それらの処理によってデータが重複する可能性があります。

次に例を示します。

テーブルのバージョン 操作 差分ログの更新 データ変更ログの更新のレコード
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (最適化圧縮はテーブル内のデータを変更しないため、レコードはありません)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

前の例では、RESTORE コマンドの結果、Delta テーブル バージョン 0 と 1 を読み取るときに既に表示されていた更新プログラムが表示されます。 ストリーミング クエリでこのテーブルが読み取られていた場合、これらのファイルは新しく追加されたデータと見なされ、再び処理されます。

復元のメトリック

RESTORE 操作が完了すると、次のメトリックが単一行データフレームとして報告されます。

  • table_size_after_restore: 復元後のテーブルのサイズ。

  • num_of_files_after_restore: 復元後のテーブル内のファイルの数。

  • num_removed_files: テーブルから削除された (論理的に削除された) ファイルの数。

  • num_restored_files: ロールバックによって復元されたファイルの数。

  • removed_files_size: テーブルから削除されたファイルの合計サイズ (バイト単位)。

  • restored_files_size: 復元されたファイルの合計サイズ (バイト単位)。

    復元のメトリックのサンプル

Delta Lake のタイム トラベルの使用例

  • ユーザー 111 のテーブルの誤った削除を修正する:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • テーブルの誤った更新を修正する:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • 先週に追加された新しい顧客の数を照会する:

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Spark セッションで最後のコミットのバージョンを見つけるには、どうすればよいですか?

すべてのスレッドとすべてのテーブルの全体で、現在の SparkSession によって書き込まれた最後のコミットのバージョン番号を取得するには、SQL 構成 spark.databricks.delta.lastCommitVersionInSession のクエリを実行します。

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

SparkSession によるコミットが行われていない場合は、キーのクエリを実行すると空の値が返されます。

注意

複数のスレッド間で同じ SparkSession を共有する場合、それは複数のスレッド間で変数を共有するのと似ています。構成値が同時に更新されると、競合状態が発生する可能性があります。