Dbio を使用したクラウドストレージへのトランザクション書き込み Transactional Writes to Cloud Storage with DBIO

Databricks DBIO パッケージは、Apache Spark ジョブのクラウドストレージへのトランザクション書き込みを提供します。The Databricks DBIO package provides transactional writes to cloud storage for Apache Spark jobs. これにより、Spark がクラウドネイティブの設定 (たとえば、Azure Blob storage などのストレージサービスに直接書き込む) で使用されている場合に発生するパフォーマンスと正確性に関する多くの問題が解決されます。This solves a number of performance and correctness issues that come when Spark is used in a cloud-native setting (for example, writing directly to storage services like Azure Blob storage).

注意

DBIO トランザクションコミットが有効になっている場合、_started_<id>_committed_<id> で始まるメタデータファイルには、Spark ジョブによって作成されたデータファイルが付随します。When DBIO transactional commit is enabled, metadata files starting with _started_<id> and _committed_<id> will accompany data files created by Spark jobs. 一般に、これらのファイルを直接変更することはできません。Generally you shouldn’t alter these files directly. 代わりに、VACUUM コマンドを使用します。Rather, use the VACUUM command.

コミットされていないファイルをクリーンアップする Clean up uncommitted files

Spark ジョブから残されたコミットされていないファイルをクリーンアップするには、VACUUM コマンドを使用して削除します。To clean up uncommitted files left over from Spark jobs, use the VACUUM command to remove them. 通常 VACUUM、Spark ジョブの完了後に自動的に発生しますが、ジョブが中止された場合には手動で実行することもできます。Normally VACUUM happens automatically after Spark jobs complete, but you can also run it manually if a job is aborted.

たとえば、VACUUM ... RETAIN 1 HOUR は1時間経過したコミットされていないファイルを削除します。For example, VACUUM ... RETAIN 1 HOUR removes uncommitted files older than one hour.

重要

  • 1時間未満の期間の削除は避けてください。Avoid vacuuming with a horizon of less than one hour. データの不整合が発生する可能性があります。It can cause data inconsistency.
  • クラウドストレージで VACUUM を直接使用することはできません。You cannot use VACUUM directly on cloud storage. Vacuum ストレージには、 DBFS にマウントし、マウントされたディレクトリで VACUUM を実行する必要があります。To vacuum storage, you must mount it to DBFS and run VACUUM on the mounted directory.

Vacuum」も参照してください。Also see Vacuum.

SQLSQL

-- recursively vacuum an output path
%sql VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]

-- vacuum all partitions of a catalog table
%sql VACUUM tableName [RETAIN <N> HOURS]

ScalaScala

// recursively vacuum an output path
spark.sql("VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]")

// vacuum all partitions of a catalog table
spark.sql("VACUUM tableName [RETAIN <N> HOURS]")