Delta Lake 테이블 기록 작업

Delta Lake 테이블을 수정하는 각 작업에 따라 새 테이블 버전이 만들어집니다. 기록 정보를 사용하여 작업 감사, 테이블 롤백 또는 시간 여행을 사용하여 특정 시점에 테이블을 쿼리할 수 있습니다.

참고 항목

Databricks는 데이터 보관을 위한 장기 백업 솔루션으로 Delta Lake 테이블 기록을 사용하는 것을 권장하지 않습니다. Databricks는 데이터 및 로그 보존 구성을 모두 더 큰 값으로 설정하지 않는 한 시간 이동 작업에 지난 7일만 사용할 것을 권장합니다.

델타 테이블 기록 검색

명령을 실행 history 하여 Delta 테이블에 대한 각 쓰기에 대한 작업, 사용자 및 타임스탬프를 포함한 정보를 검색할 수 있습니다. 연산은 역순으로 반환됩니다.

테이블 기록 보존은 기본적으로 30일인 테이블 설정 delta.logRetentionDuration에 의해 결정됩니다.

참고 항목

시간 이동 및 테이블 기록은 서로 다른 보존 임계값에 의해 제어됩니다. Delta Lake 시간 이동이란?을 참조하세요.

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Spark SQL 구문 세부 정보는 DESCRIBE HISTORY를 참조하세요.

Scala/Java/Python 구문에 대한 자세한 내용은 Delta Lake API 설명서를 참조하세요.

카탈로그 탐색기는 델타 테이블에 대한 이 자세한 테이블 정보 및 기록을 시각적으로 볼 수 있습니다. 테이블 스키마 및 샘플 데이터 외에도 기록 탭을 클릭하여 DESCRIBE HISTORY를 통해 표시하는 테이블 기록을 볼 수 있습니다.

기록 스키마

history 작업의 출력은 다음과 같은 열을 갖습니다.

Column Type 설명
version long 작업에 의해 생성된 테이블 버전.
timestamp timestamp 이 버전이 커밋된 시점.
userId string 작업을 실행한 사용자의 ID.
userName string 작업을 실행한 사용자의 이름.
operation string 작업의 이름입니다.
operationParameters map 작업의 매개 변수(예: 조건자).
작업(job) struct 작업을 실행한 작업의 세부 정보.
Notebook struct 작업이 실행된 Notebook의 세부 정보.
clusterId string 작업이 실행된 클러스터의 ID.
readVersion long 쓰기 작업을 수행하기 위해 읽은 테이블의 버전.
isolationLevel string 이 작업에 사용된 격리 수준.
isBlindAppend 부울 값 이 작업에 데이터가 추가되었는지 여부.
operationMetrics map 작업의 메트릭(예: 수정된 행 및 파일의 수).
userMetadata string 지정된 경우, 사용자 정의 커밋 메타데이터
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

참고 항목

작업 메트릭 키

history 작업은 operationMetrics 열 맵으로 작업 메트릭 모음을 반환합니다.

다음 표에는 작업별 맵 키 정의가 나와 있습니다.

연산 메트릭 이름 설명
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles 쓰기된 파일의 수.
numOutputBytes 쓰기된 콘텐츠의 크기(바이트).
numOutputRows 쓰기된 행의 수.
STREAMING UPDATE
numAddedFiles 추가된 파일의 수.
numRemovedFiles 제거된 파일의 수.
numOutputRows 쓰기된 행의 수.
numOutputBytes 쓰기의 크기(바이트).
DELETE
numAddedFiles 추가된 파일의 수. 테이블의 파티션이 삭제된 경우에는 제공되지 않음.
numRemovedFiles 제거된 파일의 수.
numDeletedRows 제거된 행의 수. 테이블의 파티션이 삭제된 경우에는 제공되지 않음.
numCopiedRows 파일을 삭제하는 과정에서 복사된 행의 수.
executionTimeMs 작업 전체를 실행하는 데 걸린 시간.
scanTimeMs 파일에서 일치하는 항목을 검색하는 데 걸린 시간.
rewriteTimeMs 일치하는 파일을 다시 쓰는 데 걸린 시간.
TRUNCATE
numRemovedFiles 제거된 파일의 수.
executionTimeMs 작업 전체를 실행하는 데 걸린 시간.
MERGE
numSourceRows 원본 DataFrame의 행 수.
numTargetRowsInserted 대상 테이블에 삽입된 행 수.
numTargetRowsUpdated 대상 테이블에서 업데이트된 행 수.
numTargetRowsDeleted 대상 테이블에서 삭제된 행 수.
numTargetRowsCopied 복사된 대상 행의 수.
numOutputRows 출력으로 쓰기된 행의 총 수.
numTargetFilesAdded 싱크(대상)에 추가된 파일의 수.
numTargetFilesRemoved 싱크(대상)에서 제거된 파일의 수.
executionTimeMs 작업 전체를 실행하는 데 걸린 시간.
scanTimeMs 파일에서 일치하는 항목을 검색하는 데 걸린 시간.
rewriteTimeMs 일치하는 파일을 다시 쓰는 데 걸린 시간.
UPDATE
numAddedFiles 추가된 파일의 수.
numRemovedFiles 제거된 파일의 수.
numUpdatedRows 업데이트된 행의 수.
numCopiedRows 파일을 업데이트하는 과정에서 방금 복사된 행의 수.
executionTimeMs 작업 전체를 실행하는 데 걸린 시간.
scanTimeMs 파일에서 일치하는 항목을 검색하는 데 걸린 시간.
rewriteTimeMs 일치하는 파일을 다시 쓰는 데 걸린 시간.
FSCK numRemovedFiles 제거된 파일의 수.
CONVERT numConvertedFiles 변환된 Parquet 파일의 수.
OPTIMIZE
numAddedFiles 추가된 파일의 수.
numRemovedFiles 최적화된 파일의 수.
numAddedBytes 테이블이 최적화된 후 추가된 바이트 수.
numRemovedBytes 제거된 바이트 수.
minFileSize 테이블이 최적화된 후 가장 작은 파일의 크기.
p25FileSize 테이블이 최적화된 후 25번째 백분위수 파일의 크기.
p50FileSize 테이블이 최적화된 후의 파일 크기의 중앙값.
p75FileSize 테이블이 최적화된 후 75번째 백분위수 파일의 크기.
maxFileSize 테이블이 최적화된 후 가장 큰 파일의 크기.
복제
sourceTableSize 복제된 버전에서 원본 테이블의 크기(바이트).
sourceNumOfFiles 복제된 버전에서 원본 테이블의 파일 수.
numRemovedFiles 이전 델타 테이블을 바꾼 경우 대상 테이블에서 제거된 파일의 수.
removedFilesSize 이전 델타 테이블을 바꾼 경우 대상 테이블에서 제거된 파일의 총 크기(바이트).
numCopiedFiles 새 위치로 복사된 파일의 수. 얕은 복제의 경우 0.
copiedFilesSize 새 위치로 복사된 파일의 총 크기(바이트). 얕은 복제의 경우 0.
RESTORE
tableSizeAfterRestore 복원 후 테이블 크기(바이트).
numOfFilesAfterRestore 복원 후 테이블의 파일 수.
numRemovedFiles 복원 작업으로 제거된 파일의 수.
numRestoredFiles 복원의 결과로 추가된 파일의 수.
removedFilesSize 복원에 의해 제거된 파일의 크기(바이트).
restoredFilesSize 복원에 의해 추가된 파일의 크기(바이트).
VACUUM
numDeletedFiles 삭제된 파일의 수.
numVacuumedDirectories vacuum된 디렉터리의 수.
numFilesToDelete 삭제할 파일의 수.

Delta Lake 시간 이동이란?

Delta Lake 시간 이동은 타임스탬프 또는 테이블 버전(트랜잭션 로그에 기록된 대로)에 따라 이전 테이블 버전 쿼리를 지원합니다. 다음과 같은 애플리케이션에 시간 이동 기능을 사용할 수 있습니다.

  • 분석, 보고서 또는 출력(예: 기계 학습 모델의 출력)을 다시 생성합니다. 특히 규제 산업에서 디버깅 또는 감사에 유용할 수 있습니다.
  • 복잡한 임시 쿼리를 작성합니다.
  • 데이터의 실수를 수정합니다.
  • 빠르게 변화하는 테이블을 위한 일련의 쿼리에 스냅샷 격리를 제공합니다.

Important

시간 이동으로 액세스할 수 있는 테이블 버전은 트랜잭션 로그 파일의 보존 임계값과 작업의 빈도 및 지정된 보존의 조합에 VACUUM 따라 결정됩니다. 기본값을 사용하여 매일 실행하는 VACUUM 경우 시간 이동에 7일의 데이터를 사용할 수 있습니다.

Delta 시간 이동 구문

테이블 이름 사양 다음에 절을 추가하여 시간 이동이 있는 델타 테이블을 쿼리합니다.

  • timestamp_expression 는 다음 중 하나일 수 있습니다.
    • '2018-10-18T22:15:12.013Z', 즉, 타임스탬프로 캐스팅할 수 있는 문자열
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', 즉, 날짜 문자열
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • 타임스탬프이거나 타임스탬프로 캐스팅될 수 있는 기타 식
  • versionDESCRIBE HISTORY table_spec 출력에서 가져올 수 있는 긴 값입니다.

timestamp_expressionversion는 모두 하위 쿼리가 될 수 없습니다.

날짜 또는 타임스탬프 문자열만 허용됩니다. 예를 들어 "2019-01-01""2019-01-01T00:00:00.000Z"를 지정합니다. 다음 코드에서 예제 구문 참조:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

@ 구문을 사용하여 테이블 이름의 일부로 타임스탬프 또는 버전을 지정할 수도 있습니다. 타임스탬프는 yyyyMMddHHmmssSSS 형식이어야 합니다. 버전 앞에 v를 추가하여 @ 뒤에 버전을 지정할 수 있습니다. 다음 코드에서 예제 구문 참조:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

트랜잭션 로그 검사점이란?

Delta Lake는 테이블 데이터와 함께 저장되는 _delta_log 디렉터리 내에서 테이블 버전을 JSON 파일로 기록합니다. 검사점 쿼리를 최적화하기 위해 Delta Lake는 테이블 버전을 Parquet 검사점 파일에 집계하므로 테이블 기록의 모든 JSON 버전을 읽을 필요가 없습니다. Azure Databricks는 데이터 크기 및 워크로드에 대한 검사점 빈도를 최적화합니다. 사용자는 검사점과 직접 상호 작용할 필요가 없습니다. 검사점 빈도는 예고 없이 변경될 수 있습니다.

시간 이동 쿼리에 대한 데이터 보존 구성

이전 테이블 버전을 쿼리하려면 해당 버전의 로그와 데이터 파일을 모두 유지해야 합니다.

테이블에 대해 실행하면 VACUUM 데이터 파일이 삭제됩니다. Delta Lake는 테이블 버전을 검사 후 로그 파일 제거를 자동으로 관리합니다.

대부분의 델타 테이블은 정기적으로 실행되므로 VACUUM 지정 시간 쿼리는 기본적으로 7일인 보존 임계값을 VACUUM준수해야 합니다.

델타 테이블에 대한 데이터 보존 임계값을 늘리려면 다음 테이블 속성을 구성해야 합니다.

  • delta.logRetentionDuration = "interval <interval>": 테이블에 대한 기록이 유지되는 기간을 제어합니다. 기본값은 interval 30 days입니다.
  • delta.deletedFileRetentionDuration = "interval <interval>": 현재 테이블 버전에서 더 이상 참조되지 않는 데이터 파일을 제거하는 데 사용되는 임계값 VACUUM 을 결정합니다. 기본값은 interval 7 days입니다.

테이블을 만드는 동안 델타 속성을 지정하거나 문을 사용하여 ALTER TABLE 설정할 수 있습니다. Delta 테이블 속성 참조를 참조하세요.

참고 항목

자주 VACUUM 작업하는 테이블에 대해 테이블 기록이 더 긴 기간 동안 유지되도록 두 속성을 모두 설정해야 합니다. 예를 들어 30일의 기록 데이터에 액세스하려면 기본 설정delta.logRetentionDuration과 일치하도록 설정합니다 delta.deletedFileRetentionDuration = "interval 30 days" .

데이터 보존 임계값을 늘리면 더 많은 데이터 파일이 기본 있으므로 스토리지 비용이 증가할 수 있습니다.

델타 테이블을 이전 상태로 복원

RESTORE 명령을 사용하여 델타 테이블을 이전 상태로 복원할 수 있습니다. 델타 테이블은 내부적으로 테이블의 이전 버전을 유지 관리하므로 이전 상태로 복원할 수 있습니다. 이전 상태에 해당하는 버전 또는 이전 상태가 만들어진 시점의 타임스탬프가 RESTORE 명령에 의해 옵션으로 지원됩니다.

Important

  • 이미 복원된 테이블을 복원할 수 있습니다.
  • 복제된 테이블을 복원할 수 있습니다.
  • 복원할 테이블에 대한 MODIFY 권한이 필요합니다.
  • 데이터 파일이 수동으로 또는 에 의해 vacuum삭제된 이전 버전으로 테이블을 복원할 수 없습니다. spark.sql.files.ignoreMissingFilestrue로 설정된 경우에는 이 버전으로 부분적으로 복원하는 것이 가능합니다.
  • 이전 상태로 복원하기 위한 타임스탬프 형식은 yyyy-MM-dd HH:mm:ss입니다. 날짜(yyyy-MM-dd) 문자열만 제공하는 것도 지원됩니다.
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

자세한 내용은 RESTORE를 참조하세요.

Important

복원은 데이터 변경 작업으로 간주됩니다. RESTORE 명령에 의해 추가된 Delta Lake 로그 항목에는 true로 설정된 dataChange가 포함됩니다. Delta Lake 테이블에 대한 업데이트를 처리하는 구조적 스트리밍 작업과 같은 다운스트림 애플리케이션이 있는 경우 복원 작업에서 추가한 데이터 변경 로그 항목은 새 데이터 업데이트로 간주되며 이를 처리하면 데이터가 중복될 수 있습니다.

예시:

테이블 버전 연산 델타 로그 업데이트 데이터 변경 로그 업데이트의 레코드
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (최적화 압축으로 레코드가 없는 경우 테이블의 데이터가 변경되지 않음)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

앞의 예제에서 RESTORE 명령은 델타 테이블 버전 0 및 1을 읽을 때 이미 표시된 업데이트를 생성합니다. 스트리밍 쿼리가 이 테이블을 읽는 경우 이러한 파일은 새로 추가된 데이터로 간주되어 다시 처리됩니다.

메트릭 복원

RESTORE는 작업이 완료되면 다음과 같은 메트릭을 단일 행 DataFrame으로 보고합니다.

  • table_size_after_restore: 복원 후 테이블의 크기.

  • num_of_files_after_restore: 복원 후 테이블의 파일 수.

  • num_removed_files: 테이블에서 제거된(논리적으로 삭제된) 파일의 수.

  • num_restored_files: 롤백으로 인해 복원된 파일의 수.

  • removed_files_size: 테이블에서 제거된 파일의 총 크기(바이트).

  • restored_files_size: 복원된 파일의 총 크기(바이트).

    메트릭 복원 예제

Delta Lake 시간 이동 사용 예제

  • 실수로 인한 사용자(111)에 대한 테이블 삭제를 수정합니다.

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • 실수로 인한 테이블에 대한 잘못된 업데이트를 수정합니다.

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • 지난 주에 추가된 신규 고객 수를 쿼리합니다.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Spark 세션에서 마지막 커밋의 버전을 어떻게 찾습니까?

모든 스레드와 모든 테이블에서 현재 SparkSession에 의해 써진 마지막 커밋의 버전 번호를 가져오려면 SQL 구성 spark.databricks.delta.lastCommitVersionInSession을 쿼리합니다.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

SparkSession에서 만들어진 커밋이 없는 경우 키를 쿼리하면 빈 값이 반환됩니다.

참고 항목

여러 스레드에서 동일한 SparkSession을 공유하는 것은 여러 스레드에서 하나의 변수를 공유하는 것과 비슷합니다. 즉, 구성 값이 동시에 업데이트될 때 경합 조건이 발생합니다.