Share via


Azure Databricks での分離レベルと書き込みの競合

テーブルの分離レベルによって、同時実行操作で行われた変更からのトランザクションの分離の度合いが定義されます。 Azure Databricks での書き込みの競合は、分離レベルによって異なります。

Delta Lake では、読み取りと書き込みの間で ACID トランザクションが保証されます。 これは、次のことを意味します。

  • 複数のクラスターにまたがる複数のライターは、テーブル パーティションを同時に変更できます。 ライターは、テーブルの一貫したスナップショット ビューを表示し、書き込みをシリアル順序で行います。
    • ジョブ中にテーブルが変更された場合でも、閲覧者には、Azure Databricks ジョブで作業を開始したテーブルの一貫性のあるスナップショット ビューが引き続き表示されます。

Azure Databricks での ACID 保証とは」を参照してください。

注意

Azure Databricks では、既定ですべてのテーブルに Delta Lake が使用されます。 この記事では、Delta Lake on Azure Databricks の動作について説明します。

重要

メタデータの変更により、すべての同時書き込み操作が失敗します。 これらの操作には、テーブル プロトコル、テーブル プロパティ、またはデータ スキーマの変更が含まれます。

ストリーミング読み取りは、テーブルメタデータを変更するコミットに遭遇すると失敗します。 ストリームを継続したい場合は、再起動する必要があります。 推奨される方法については、「Azure Databricks での構造化ストリーミング アプリケーションの運用に関する考慮事項」を参照してください。

メタデータを変更するクエリの例を次に示します。

-- Set a table property.
ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

-- Enable a feature using a table property and update the table protocol.
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);

-- Drop a table feature.
ALTER TABLE table_name DROP FEATURE deletionVectors;

-- Upgrade to UniForm.
REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));

-- Update the table schema.
ALTER TABLE table_name ADD COLUMNS (col_name STRING);

行レベルのコンカレンシーでの書き込みの競合

行レベル コンカレンシーは、行レベルで変更を検出し、同時書き込みが同じデータ ファイル内の異なる行を更新または削除する際に発生する競合を自動的に解決することで、同時書き込み操作間の競合を軽減します。

行レベル コンカレンシーは、Databricks Runtime 14.2 以上で一般公開になりました。 次の条件では、行レベルのコンカレンシーが既定でサポートされます:

  • 削除ベクトルが有効になっていて、パーティション分割されていないテーブル。
  • 削除ベクトルを無効にした場合を除き、液体クラスタリングを含むテーブル。

パーティションを持つテーブルは行レベル コンカレンシーをサポートしていませんが、削除ベクトルが有効になっていれば、OPTIMIZE と他のすべての書き込み操作との間の競合を回避できます。 「行レベルのコンカレンシーの制限事項」を参照してください。

その他の Databricks Runtime バージョンについては、「行レベル コンカレンシー プレビューの動作 (レガシ)」を参照してください。

行レベルのコンカレンシーを有効にしている状態で、各分離レベルで競合する可能性がある書き込み操作のペアについて、次の表で説明します。

Note

ID 列を持つテーブルでは、同時実行トランザクションはサポートされていません。 「Delta Lake で ID 列を使用する」を参照してください。

INSERT (1) UPDATE、DELETE、MERGE INTO OPTIMIZE
INSERT 競合できない
UPDATE、DELETE、MERGE INTO WriteSerializable で競合が発生する可能性はありません。 同じ行を変更するときに、Serializable で競合が発生する可能性があります。「行レベル コンカレンシーの制限事項」を参照してください。 MERGE INTO では、行レベルの競合解決のために Photon が必要です。 同じ行を変更するときに競合が発生する可能性があります。「行レベル コンカレンシーの制限事項」を参照してください。
OPTIMIZE 競合できない 競合できない 競合できない

重要

(1) 上記の表のすべての INSERT 操作は、コミット前に同じテーブルからデータを読み取らない追加操作を説明しています。 同じテーブルを読み取るサブクエリを含む INSERT 操作では、MERGE と同じコンカレンシーがサポートされます。

行レベルのコンカレンシーなしでの書き込みの競合

次の表では、各分離レベルで競合する可能性がある書き込み操作のペアについて説明します。

テーブルにパーティションが定義されているか、削除ベクトルが有効になっていない場合、テーブルは行レベル コンカレンシーをサポートしません。 行レベル コンカレンシーには、Databricks Runtime 14.2 以上が必要です。

Note

ID 列を持つテーブルでは、同時実行トランザクションはサポートされていません。 「Delta Lake で ID 列を使用する」を参照してください。

INSERT (1) UPDATE、DELETE、MERGE INTO OPTIMIZE
INSERT 競合できない
UPDATE、DELETE、MERGE INTO WriteSerializable で競合が発生する可能性はありません。 Serializable で競合が発生する可能性があります。「パーティションでの競合の回避」を参照してください。 Serializable と WriteSerializable で競合が発生する可能性があります。「パーティションでの競合の回避」を参照してください。
OPTIMIZE 競合できない 削除ベクトルが有効になっているテーブルでは、競合が発生する可能性はありません。 それ以外の場合は競合が発生する可能性があります。 削除ベクトルが有効になっているテーブルでは、競合が発生する可能性はありません。 それ以外の場合は競合が発生する可能性があります。

重要

(1) 上記の表のすべての INSERT 操作は、コミット前に同じテーブルからデータを読み取らない追加操作を説明しています。 同じテーブルを読み取るサブクエリを含む INSERT 操作では、MERGE と同じコンカレンシーがサポートされます。

行レベルのコンカレンシーの制限事項

行レベルのコンカレンシーには、いくつかの制限事項があてはまります。 次の操作では、競合の解決は、Azure Databricks での書き込み競合に対する通常のコンカレンシーに従います。 「行レベルのコンカレンシーなしでの書き込みの競合」を参照してください。

  • ZORDER BY を指定した OPTIMIZE コマンド。
  • 次のような複雑な条件句を持つコマンド:
    • 構造体、配列、マップなどの複合データ型の条件。
    • 非決定論的な式とサブクエリを使用する条件。
    • 相関サブクエリを含む条件。
  • MERGE コマンドの場合、ターゲット テーブルで明示的な述語を使用して、ソース テーブルに一致する行をフィルター処理する必要があります。 マージ解決の場合、このフィルターは、同時実行操作のフィルター条件に基づいて競合が発生する可能性がある行のみをスキャンするために使用されます。

Note

行レベルの競合検出を使用すると、総実行時間が長くなる可能性があります。 同時実行トランザクションが多い場合、書き込み側が競合解決よりも待機時間を優先し、競合が発生する可能性があります。

削除ベクトルのすべての制限事項もあてはまります。 「制限事項」を参照してください。

Delta Lake がテーブルを読み取らずにコミットするのはどのような場合ですか?

Delta Lake の INSERT または追加操作では、次の条件が満たされている場合、コミット前にテーブルの状態が読み取られません。

  1. ロジックが、INSERT SQL ロジックまたは追加モードを使用して表されている。
  2. ロジックに、書き込み操作の対象となるテーブルを参照するサブクエリや条件が含まれていない。

他のコミットと同様に、Delta Lake はトランザクション ログのメタデータを使用してコミット時にテーブルのバージョンを検証して解決しますが、実際にはテーブルのバージョンは読み取られません。

Note

多くの一般的なパターンで、MERGE 操作を使用して、テーブルの状態に基づいてデータが挿入されます。 INSERT ステートメントを使用してこのロジックを書き換えることは可能ですが、いずれかの条件式がターゲット テーブル内の列を参照する場合、これらのステートメントにも MERGE と同じコンカレンシーの制限事項があります。

WriteSerializable 分離レベルと Serializable 分離レベルを書き込む

テーブルの分離レベルによって、同時実行トランザクションで行われた変更からトランザクションを分離する必要がある度合いが定義されます。 Delta Lake on Azure Databricks では、Serializable と WriteSerializable の 2 つの分離レベルがサポートされています。

  • Serializable: 最も強力な分離レベル。 これにより、コミットされた書き込み操作とすべての読み取りが直列化可能になります。 テーブルに示されているのと同じ結果を生成する、一度に 1 つずつ実行する一連のシーケンスが存在する限り、操作は許可されます。 書き込み操作の場合、シリアル シーケンスはテーブルの履歴に表示されるものとまったく同じです。

  • WriteSerializable (既定値):Serializable よりも弱い分離レベル。 これにより、書き込み操作 (つまり、読み取りではない) だけが直列化可能になります。 ただし、これはスナップショット分離よりもさらに強力です。 WriteSerializable は、ほとんどの一般的な操作でデータの一貫性と可用性の優れたバランスを提供するため、既定の分離レベルです。

    このモードでは、Delta テーブルの内容は、テーブル履歴に表示される一連の操作から予想される内容と異なる場合があります。 これは、このモードでは、Y が X の後にコミットされたことが履歴に示されていても、結果が X の前に Y が実行されたかのように (つまり、これらの間で直列化可能)、同時書き込みの特定のペア (操作 X と Y など) を続行できるためです。この並べ替えを禁止するには、テーブルの分離レベルが Serializable になるように設定して、これらのトランザクションを失敗させます。

読み取り操作では、常にスナップショット分離が使用されます。 書き込み分離レベルにより、履歴に従って"存在しない" テーブルのスナップショットを閲覧者が表示できるかどうかが決まります。

Serializable レベルの場合、閲覧者には常に履歴に準拠するテーブルだけが表示されます。 WriteSerializable レベルの場合、閲覧者は Delta ログに存在しないテーブルを表示できます。

例として、実行時間の長い削除である txn1 と、txn1 によって削除されたデータを挿入する txn2 について考えてみます。 txn2 と txn1 は完了し、履歴にこの順序で記録されます。 履歴によると、txn2 に挿入されたデータはテーブルに存在しないはずです。 Serializable レベルの場合、閲覧者には txn2 によって挿入されたデータは表示されません。 ただし、WriteSerializable レベルの場合、閲覧者は、ある時点で txn2 によって挿入されたデータを確認できます。

各分離レベルで競合する可能性のある操作の種類および発生する可能性のあるエラーの詳細については、「パーティション分割と不整合コマンド条件を使用して競合を回避する」を参照してください。

分離レベルを設定する

分離レベルは、ALTER TABLE コマンドを使用して設定します。

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)

ここで、<level-name>Serializable または WriteSerializable です。

たとえば、分離レベルを既定の WriteSerializable から Serializable に変更するには、次を実行します。

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

パーティション分割と不整合コマンド条件を使用して競合を回避する

"競合する可能性がある" とマークされているすべてのケースで、2 つの操作が競合するかどうかは、同じファイル セットに対して使用するかどうかによって異なります。 操作の条件で使用される列と同じ列でテーブルをパーティション分割すると、2 つのファイル セットが不整合になる可能性があります。 たとえば、テーブルが日付でパーティション分割されていない場合、2 つのコマンド UPDATE table WHERE date > '2010-01-01' ...DELETE table WHERE date < '2010-01-01' の両方で同じファイル セットを変更しようとする可能性があるため、これらのコマンドは競合します。 date によってテーブルをパーティション分割すると、競合が回避されます。 そのため、コマンドで一般的に使用される条件に従ってテーブルをパーティション分割すると、競合が大幅に軽減する可能性があります。 ただし、カーディナリティが高い列でテーブルをパーティション分割すると、サブディレクトリの数が多いことが原因で他のパフォーマンスの問題が発生する可能性があります。

競合の例外

トランザクションの競合が発生すると、次のいずれかの例外が発生します。

ConcurrentAppendException

この例外は、同時実行操作によって、操作で読み取られたのと同じパーティション (またはパーティション分割されていないテーブル内の任意の場所) にファイルが追加された場合に発生します。 INSERTDELETEUPDATE、または MERGE の各操作によってファイルが追加される可能性があります。

既定の分離レベルである WriteSerializable では、"ブラインド" INSERT 操作 (つまり、データを読み取らずにデータを無条件に追加する操作) によって追加されたファイルは、同じパーティション (またはパーティション分割されていないテーブル内の任意の場所) にアクセスする場合でも、どの操作とも競合しません。 分離レベルが Serializable に設定されている場合、ブラインド追加によって競合が発生する可能性があります。

この例外は多くの場合、DELETEUPDATE、または MERGE の同時実行操作中にスローされます。 同時実行操作では、異なるパーティション ディレクトリが物理的に更新されているときに、いずれかの操作によって、他の操作で同時に更新されるものと同じパーティションが読み取られるため、競合が発生する可能性があります。 これは、操作条件で分離を明示的にすることで回避できます。 次の例を考えてみます。

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

上記のコードを、異なる日付または国に対して同時に実行するとします。 各ジョブはターゲット Delta テーブル上の独立したパーティションで動作しているので、競合が発生するとは考えられません。 ただし、この条件は十分に明示的ではなく、テーブル全体がスキャンされることがあり、他のパーティションを更新する同時実行操作と競合する可能性があります。 代わりに、次の例に示すように、ステートメントを書き換えて、特定の日付と国をマージ条件に追加できます。

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

これで、この操作は、異なる日付と国で同時に実行しても安全です。

ConcurrentDeleteReadException

この例外は、同時実行操作によって、ユーザーの操作で読み取られたファイルが削除された場合に発生します。 一般的な原因は、ファイルを書き換える DELETEUPDATE、または MERGE の操作です。

ConcurrentDeleteDeleteException

この例外は、同時実行操作によって、ユーザーの操作でも削除されたファイルがさらに削除された場合に発生します。 これは、同じファイルを書き換える 2 つの同時実行圧縮操作によって発生する可能性があります。

MetadataChangedException

この例外は、同時実行トランザクションによって Delta テーブルのメタデータが更新されると発生します。 一般的な原因は、テーブルのスキーマを更新する Delta テーブルに対する ALTER TABLE 操作または書き込みです。

ConcurrentTransactionException

同じチェックポイントの場所を使用するストリーミング クエリが同時に複数回開始され、同時に Delta テーブルに書き込もうとした場合。 同じチェックポイントの場所を使用する 2 つのストリーミング クエリを同時に実行しないでください。

ProtocolChangedException

この例外は、次の場合に発生する可能性があります。

  • Delta テーブルが新しいプロトコル バージョンにアップグレードされた場合。 今後の操作を成功させるには、Databricks Runtime のアップグレードが必要な場合があります。
  • 複数の編集者がテーブルの作成または置き換えを同時に行っている場合。
  • 複数の編集者が空のパスに同時に書き込んでいる場合。

詳細については、「Azure Databricks で Delta Lake 機能の互換性を管理する方法は?」を参照してください。

行レベル コンカレンシー プレビューの動作 (レガシ)

このセクションでは、Databricks Runtime 14.1 以下での行レベル コンカレンシーのプレビュー動作について説明します。 行レベル コンカレンシーには、常に削除ベクトルが必要です。

Databricks Runtime 13.3 LTS 以上では、リキッド クラスタリングが有効になっているテーブルでは、行レベル コンカレンシーが自動的に有効になります。

Databricks Runtime 14.0 および 14.1 では、クラスターまたは SparkSession に次の構成を設定することで、削除ベクトルを含むテーブルに対して行レベル コンカレンシーを有効にできます。

spark.databricks.delta.rowLevelConcurrencyPreview = true

Databricks Runtime 14.1 以下において、Photon 以外のコンピューティングでは、DELETE 操作の行レベル コンカレンシーのみがサポートされています。