Azure Databricks에서 Delta Lake 변경 데이터 피드 사용

참고 항목

  • 이 문서에서는 변경 데이터 피드 기능을 사용하여 Delta 테이블에 대한 행 수준 변경 정보를 기록하고 쿼리하는 방법을 설명합니다. 원본 데이터의 변경 내용을 기반으로 Delta Live Tables 파이프라인에서 테이블을 업데이트하는 방법을 알아보려면 APPLY CHANGES API: Delta Live Tables에서 변경 데이터 캡처 간소화를 참조 하세요.

변경 데이터 피드를 통해 Azure Databricks는 델타 테이블 버전 간의 행 수준 변경 내용을 추적할 수 있습니다. 델타 테이블에서 사용하도록 설정하면 런타임은 테이블에 기록된 모든 데이터에 대해 변경 이벤트를 기록합니다. 여기에는 지정된 행이 삽입, 삭제 또는 업데이트되었는지 여부를 나타내는 메타데이터와 함께 행 데이터가 포함됩니다.

Spark SQL, Apache Spark DataFrames 및 Structured Streaming을 사용하여 일괄 처리 쿼리에서 변경 이벤트를 읽을 수 있습니다.

Important

변경 데이터 피드는 테이블 기록과 함께 작동하여 변경 정보를 제공합니다. 델타 테이블을 복제하면 별도의 기록이 생성되므로 복제된 테이블의 변경 데이터 피드가 원래 테이블의 변경 데이터 피드와 일치하지 않습니다.

사용 사례

변경 데이터 피드는 기본적으로 사용하도록 설정되지 않습니다. 다음 사용 사례는 변경 데이터 피드를 사용하도록 설정할 때 구동되어야 합니다.

  • 실버 및 골드 테이블: ETL 및 ELT 작업을 가속화하고 간소화하기 위한 초기 MERGE, UPDATE, 또는 DELETE 작업에 따라 행 수준의 변경 사항만 처리하여 Delta Lake 성능을 향상시킵니다.
  • 구체화된 뷰: 전체 기본 테이블을 다시 처리하지 않고도 BI 및 분석에서 사용할 수 있도록 최신 집계 정보 보기를 만들고 변경 내용이 발생한 경우에만 업데이트합니다.
  • 변경 내용 전송: 변경 데이터 피드를 Kafka 또는 RDBMS와 같은 다운스트림 시스템에 전송합니다. 이 피드를 사용하여 데이터 파이프라인 이후 단계에서 증분 처리할 수 있습니다.
  • 감사 추적 테이블: 변경 데이터 피드를 캡처합니다. 델타 테이블에서 영구 스토리지 및 효율적인 쿼리 기능을 제공하여 삭제가 발생하는 시기와 업데이트한 내용을 포함한 시간에 따른 모든 변경 내용을 확인할 수 있습니다.

변경 데이터 피드 사용

다음 방법 중 하나를 사용하여 변경 데이터 피드 옵션을 명시적으로 사용하도록 설정해야 합니다.

  • 새 테이블: CREATE TABLE 명령에서 테이블 속성 delta.enableChangeDataFeed = true를 설정합니다.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 기존 테이블: ALTER TABLE 명령에서 테이블 속성 delta.enableChangeDataFeed = true를 설정합니다.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 모든 새 테이블:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Important

변경 데이터 피드를 사용하도록 설정한 후에 수행한 변경 내용만 기록됩니다. 테이블에 대한 이전 변경 내용은 캡처되지 않습니다.

데이터 스토리지 변경

Azure Databricks는 테이블 디렉터리 아래 _change_data 폴더의 UPDATE, DELETEMERGE 작업에 대한 변경 데이터를 기록합니다. Azure Databricks는 트랜잭션 로그에서 직접 변경 데이터 피드를 효율적으로 계산할 수 있기 때문에 삽입 전용 작업 및 전체 파티션 삭제와 같은 일부 작업은 _change_data 디렉터리에 데이터를 생성하지 않습니다.

_change_data 폴더의 파일은 테이블의 보존 정책을 따릅니다. 따라서 VACUUM 명령을 실행하는 경우 변경 데이터 피드 데이터도 삭제됩니다.

일괄 처리 쿼리의 변경 내용 읽기

시작 및 종료에 대한 버전 또는 타임스탬프를 제공할 수 있습니다. 시작 및 끝 버전 및 타임스탬프는 쿼리에 포함됩니다. 특정 시작 버전에서 테이블의 최신 버전으로 변경 내용을 읽으려면 시작 버전 또는 타임스탬프만 지정합니다.

버전을 정수로 지정하고 타임스탬프를 yyyy-MM-dd[ HH:mm:ss[.SSS]] 형식의 문자열로 지정합니다.

변경 이벤트를 기록한 버전보다 낮은 버전 또는 오래된 타임스탬프를 제공하는 경우, 즉, 변경 데이터 피드를 사용하도록 설정했을 때 변경 데이터 피드가 사용 설정되지 않았음을 나타내는 오류가 throw됩니다.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

스트리밍 쿼리의 변경 내용 읽기

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

테이블을 읽는 동안 변경 데이터를 얻으려면 옵션 readChangeFeedtrue로 설정합니다. startingVersion 또는 startingTimestamp는 선택 사항이며, 제공되지 않은 경우 스트림은 스트리밍 시 테이블의 최신 스냅샷을 변경 데이터로 변경으로 INSERT 반환합니다. 변경 데이터를 읽을 때 속도 제한(maxFilesPerTrigger, maxBytesPerTrigger)과 같은 옵션과 더불어 excludeRegex도 지원됩니다.

참고 항목

속도 제한은 시작 스냅샷 버전 이외의 버전에 대해 원자 단위일 수 있습니다. 즉, 전체 커밋 버전은 속도가 제한되거나 전체 커밋이 반환됩니다.

기본적으로 사용자가 테이블의 마지막 커밋을 초과하는 버전 또는 타임스탬프를 전달하는 경우 timestampGreaterThanLatestCommit 오류가 throw됩니다. Databricks Runtime 11.3 LTS 이상에서는 사용자가 다음 구성을 다음으로 설정하는 경우 변경 데이터 피드가 범위를 벗어난 버전 사례를 처리할 true수 있습니다.

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

테이블의 마지막 커밋보다 큰 시작 버전을 제공하거나 테이블의 마지막 커밋보다 최신의 시작 타임스탬프를 제공하는 경우 이전 구성을 사용하도록 설정하면 빈 읽기 결과가 반환됩니다.

테이블의 마지막 커밋보다 큰 최종 버전을 제공하거나 테이블의 마지막 커밋보다 최신의 종료 타임스탬프를 제공하는 경우 이전 구성을 일괄 읽기 모드로 사용하도록 설정하면 시작 버전과 마지막 커밋 간의 모든 변경 내용이 반환됩니다.

변경 데이터 피드의 스키마는 무엇인가요?

테이블에 대한 변경 데이터 피드에서 읽을 때 최신 테이블 버전의 스키마가 사용됩니다.

참고 항목

대부분의 스키마 변경 및 진화 작업은 완전히 지원됩니다. 열 매핑을 사용하도록 설정된 테이블은 모든 사용 사례를 지원하지 않으며 다른 동작을 보여 줍니다. 열 매핑이 사용하도록 설정된 테이블에 대한 변경 데이터 피드 제한을 참조하세요.

델타 테이블의 스키마에 있는 데이터 열 외에도 변경 데이터 피드에는 변경 이벤트의 유형을 식별하는 메타데이터 열이 포함되어 있습니다.

열 이름 Type
_change_type 문자열 insert, update_preimage , update_postimage, delete(1)
_commit_version Long 변경 내용이 포함된 델타 로그 또는 테이블 버전입니다.
_commit_timestamp 타임스탬프 커밋을 만들 때 연결된 타임스탬프입니다.

(1)preimage는 업데이트 전 값이며 postimage는 업데이트 후의 값입니다.

참고 항목

스키마에 추가된 열과 이름이 같은 열이 포함된 경우 테이블에서 변경 데이터 피드를 사용하도록 설정할 수 없습니다. 변경 데이터 피드를 사용하도록 설정하기 전에 이 충돌을 해결하기 위해 테이블의 열 이름을 바꿉니다.

열 매핑을 사용하도록 설정된 테이블의 데이터 피드 제한 사항 변경

델타 테이블에서 열 매핑을 사용하도록 설정하면 기존 데이터에 대한 데이터 파일을 다시 작성하지 않고도 테이블의 열을 삭제하거나 이름을 바꿀 수 있습니다. 열 매핑을 사용하도록 설정하면 열 이름 바꾸기 또는 삭제, 데이터 형식 변경 또는 Null 허용 여부 변경과 같은 비가치 스키마 변경을 수행한 후 변경 데이터 피드에 제한이 있습니다.

Important

  • 일괄 처리 의미 체계를 사용하여 비가산적 스키마 변경이 발생하는 트랜잭션 또는 범위에 대한 변경 데이터 피드를 읽을 수 없습니다.
  • Databricks Runtime 12.2 LTS 이하에서는 비가산적 스키마 변경이 발생한 열 매핑이 설정된 테이블은 변경 데이터 피드의 스트리밍 읽기를 지원하지 않습니다. 열 매핑 및 스키마 변경 내용이 포함된 스트리밍을 참조하세요.
  • Databricks Runtime 11.3 LTS 이하에서는 열 이름 바꾸기 또는 삭제가 발생한 열 매핑이 활성화된 테이블에 대한 변경 데이터 피드를 읽을 수 없습니다.

Databricks Runtime 12.2 LTS 이상에서는 비가치 스키마 변경이 발생한 열 매핑이 활성화된 테이블에 대한 변경 데이터 피드에서 일괄 읽기를 수행할 수 있습니다. 읽기 작업은 최신 버전의 테이블 스키마를 사용하는 대신 쿼리에 지정된 테이블의 최종 버전 스키마를 사용합니다. 지정된 버전 범위가 비가산적 스키마 변경에 걸쳐 있으면 쿼리가 실패합니다.

질문과 대답(FAQ)

변경 데이터 피드를 사용하도록 설정하는 오버헤드는 무엇인가요?

큰 영향은 없습니다. 변경 데이터 레코드는 쿼리 실행 프로세스 중에 한 줄로 생성되며 일반적으로 다시 작성된 파일의 전체 크기보다 훨씬 작습니다.

변경 레코드에 대한 보존 정책은 무엇인가요?

변경 레코드는 오래된 테이블 버전과 동일한 보존 정책을 따르며 지정된 보존 기간이 지난 경우 VACUUM을 통해 정리됩니다.

변경 데이터 피드에서 새 레코드를 사용할 수 있는 시기는 언제인가요?

변경 데이터는 Delta Lake 트랜잭션과 함께 커밋되며 테이블에서 새 데이터를 사용할 수 있는 동시에 사용할 수 있게 됩니다.

Notebook 예제: 델타 변경 데이터 피드를 사용하여 변경 내용 전파

이 Notebook에서는 실버 테이블(백신 접종 절대 수)의 변경 내용을 골드 테이블(백신 접종 비율)로 전파하는 방법을 보여 줍니다.

데이터 피드 변경 Notebook

전자 필기장 가져오기