Delta Live Tables의 API를 사용하여 APPLY CHANGES 간소화된 변경 데이터 캡처

Delta Live Tables는 API를 사용하여 CDC(변경 데이터 캡처)를 APPLY CHANGES 간소화합니다. 이전에는 이 MERGE INTO 문이 Azure Databricks에서 CDC 레코드를 처리하는 데 일반적으로 사용되었습니다. 그러나 MERGE INTO 순서가 잘못된 레코드로 인해 잘못된 결과를 생성하거나 레코드의 순서를 다시 지정하기 위해 복잡한 논리가 필요할 수 있습니다.

Delta Live Tables의 API는 순서가 잘못된 레코드 APPLY CHANGES 를 자동으로 처리하여 CDC 레코드의 올바른 처리를 보장하고 시퀀스 외부 레코드를 처리하기 위한 복잡한 논리를 개발할 필요가 없습니다.

APPLY CHANGES API는 SCD 유형 1 및 형식 2를 사용하여 테이블을 업데이트하는 지원을 포함하여 Delta Live Tables SQL 및 Python 인터페이스에서 지원됩니다.

  • SCD 유형 1을 사용하여 레코드를 직접 업데이트합니다. 업데이트된 레코드의 기록은 보존되지 않습니다.
  • SCD 형식 2를 사용하여 모든 업데이트 또는 지정된 열 집합에 대한 업데이트에서 레코드 기록을 유지합니다.

구문 및 기타 참조는 다음을 참조하세요.

참고 항목

이 문서에서는 원본 데이터의 변경 내용을 기반으로 Delta Live Tables 파이프라인의 테이블을 업데이트하는 방법을 설명합니다. Delta 테이블에 대한 행 수준 변경 정보를 기록하고 쿼리하는 방법을 알아보려면 Azure Databricks에서 Delta Lake 변경 데이터 피드 사용을 참조하세요.

델타 라이브 테이블을 사용하여 CDC를 구현하는 방법은 무엇인가요?

원본 데이터에서 레코드를 시퀀싱할 열을 지정해야 합니다. 이 열은 Delta Live Tables가 원본 데이터의 적절한 순서를 단조적으로 증가시키는 표현으로 해석합니다. Delta Live Tables는 순서가 잘못 도착하는 데이터를 자동으로 처리합니다. SCD 형식 2 변경의 경우 Delta Live Tables는 적절한 시퀀싱 값을 대상 테이블의 열 및 __END_AT 열에 __START_AT 전파합니다. 각 시퀀싱 값에는 키당 하나의 고유 업데이트가 있어야 하며 NULL 시퀀싱 값은 지원되지 않습니다.

Delta Live Tables를 사용하여 CDC 처리를 수행하려면 먼저 스트리밍 테이블을 만든 다음 문을 사용하여 APPLY CHANGES INTO 변경 피드에 대한 원본, 키 및 시퀀싱을 지정합니다. 대상 스트리밍 테이블을 만들려면 SQL의 CREATE OR REFRESH STREAMING TABLE 문 또는 Python의 함수를 create_streaming_table() 사용합니다. CDC 처리를 정의하는 문을 만들려면 SQL의 APPLY CHANGES 문 또는 Python의 함수를 apply_changes() 사용합니다. 구문 세부 정보는 Delta Live Tables에서 SQL을 사용하여 데이터 캡처 변경 또는 Delta Live Tables에서 Python을 사용하여 데이터 캡처 변경을 참조하세요.

Delta Live Tables CDC 처리에 사용되는 데이터 개체는 무엇인가요?

Hive 메타스토어에서 대상 테이블을 선언하면 두 개의 데이터 구조가 만들어집니다.

  • 대상 테이블에 할당된 이름을 사용하는 뷰입니다.
  • CDC 처리를 관리하기 위해 Delta Live Tables에서 사용하는 내부 지원 테이블입니다. 이 테이블의 이름은 대상 테이블 이름 앞에 추가하여 __apply_changes_storage_ 지정됩니다.

예를 들어 명명 dlt_cdc_target된 대상 테이블을 선언하면 메타스토어에 명명된 dlt_cdc_target 뷰와 테이블이 __apply_changes_storage_dlt_cdc_target 표시됩니다. 뷰를 만들면 Delta Live Tables에서 순서가 다른 데이터를 처리하는 데 필요한 추가 정보(예: 삭제 표시 및 버전)를 필터링할 수 있습니다. 처리된 데이터를 보려면 대상 뷰를 쿼리합니다. 테이블의 스키마가 __apply_changes_storage_ 향후 기능 또는 향상된 기능을 지원하도록 변경될 수 있으므로 프로덕션 사용을 위해 테이블을 쿼리하면 안 됩니다. 테이블에 데이터를 수동으로 추가하는 경우 버전 열이 누락되어 레코드가 다른 변경 전에 오는 것으로 간주됩니다.

파이프라인이 Unity 카탈로그에 게시되는 경우 내부 지원 테이블은 사용자가 액세스할 수 없습니다.

Delta Live Tables CDC 쿼리에서 처리된 레코드에 대한 데이터 가져오기

다음 메트릭은 쿼리에 의해 apply changes 캡처됩니다.

  • num_upserted_rows: 업데이트 중에 데이터 세트에 삽입된 출력 행의 수입니다.
  • num_deleted_rows: 업데이트 중에 데이터 세트에서 삭제된 기존 출력 행의 수입니다.

num_output_rows CDC가 아닌 흐름에 대한 출력인 메트릭은 쿼리에 대해 apply changes 캡처되지 않습니다.

제한 사항

쿼리 또는 apply_changes 함수의 APPLY CHANGES INTO 대상은 스트리밍 테이블의 원본으로 사용할 수 없습니다. 쿼리 또는 apply_changes 함수의 대상에서 읽는 테이블은 APPLY CHANGES INTO 구체화된 뷰여야 합니다.

Azure Databricks의 SCD 형식 1 및 SCD 형식 2

다음 섹션에서는 원본 이벤트를 기반으로 대상 테이블을 업데이트하는 Delta Live Tables SCD 유형 1 및 형식 2 쿼리를 보여 주는 예제를 제공합니다.

  1. 새 사용자 레코드 만들기.
  2. 사용자 레코드 삭제.
  3. 사용자 레코드 업데이트. SCD 형식 1 예제에서 마지막 UPDATE 작업은 늦게 도착하여 대상 테이블에서 삭제되어 순서가 다른 이벤트의 처리를 보여 줍니다.

다음 예제에서는 Delta Live Tables 파이프라인을 구성하고 업데이트하는 데 익숙하다고 가정합니다. 자습서: 첫 번째 Delta Live Tables 파이프라인을 실행합니다.

이러한 예제를 실행하려면 먼저 샘플 데이터 세트를 만들어야 합니다. 테스트 데이터 생성을 참조하세요.

다음은 이러한 예제에 대한 입력 레코드입니다.

userId name city operation sequenceNum
124 Raul 오악사카 삽입 1
123 Isabel 몬테레이 삽입 1
125 Mercedes 티후아나 삽입 2
126 Lily Cancun 삽입 2
123 null null Delete 6
125 Mercedes 과달라하라 UPDATE 6
125 Mercedes 멕시칼리 UPDATE 5
123 Isabel Chihuahua UPDATE 5

예제 데이터에서 마지막 행의 주석 처리를 제거하면 레코드를 잘라야 하는 위치를 지정하는 다음 레코드가 삽입됩니다.

userId name city operation sequenceNum
null null null 잘라야 3

참고 항목

다음 예제에는 모두 작업과 TRUNCATE 작업을 모두 지정 DELETE 하는 옵션이 포함되어 있지만 각 옵션은 선택 사항입니다.

SCD 유형 1 업데이트 처리

다음 코드 예제에서는 SCD 형식 1 업데이트를 처리하는 방법을 보여 줍니다.

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

SCD 유형 1 예제를 실행하면 대상 테이블에 다음 레코드가 포함됩니다.

userId name city
124 Raul 오악사카
125 Mercedes 과달라하라
126 Lily Cancun

추가 TRUNCATE 레코드를 사용하여 SCD 형식 1 예제를 실행한 후 sequenceNum=3에서 TRUNCATE 작업으로 인해 레코드 124126이 잘리고 대상 테이블에는 다음 레코드가 포함됩니다.

userId name city
125 Mercedes 과달라하라

SCD 유형 2 업데이트 처리

다음 코드 예제에서는 SCD 형식 2 업데이트를 처리하는 방법을 보여 줍니다.

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

SCD 유형 2 예제를 실행하면 대상 테이블에 다음 레코드가 포함됩니다.

userId name city __START_AT __END_AT
123 Isabel 몬테레이 1 5
123 Isabel Chihuahua 5 6
124 Raul 오악사카 1 null
125 Mercedes 티후아나 2 5
125 Mercedes 멕시칼리 5 6
125 Mercedes 과달라하라 6 null
126 Lily Cancun 2 null

SCD 형식 2 쿼리는 대상 테이블의 기록에 대해 추적할 출력 열의 하위 집합을 지정할 수도 있습니다. 다른 열에 대한 변경 내용은 새 기록 레코드를 생성하는 대신 해당 위치에서 업데이트됩니다. 다음 예제에서는 추적에서 열을 제외하는 city 방법을 보여 줍니다.

다음 예제에서는 SCD 유형 2에서 트랙 기록을 사용하는 방법을 보여 줍니다.

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

추가 TRUNCATE 레코드 없이 이 예제를 실행한 후 대상 테이블에는 다음 레코드가 포함됩니다.

userId name city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul 오악사카 1 null
125 Mercedes 과달라하라 2 null
126 Lily Cancun 2 null

테스트 데이터 생성

아래 코드는 이 자습서에 있는 예제 쿼리에서 사용할 예제 데이터 세트를 생성하기 위해 제공됩니다. 새 스키마를 만들고 새 테이블을 만들기 위한 적절한 자격 증명이 있다고 가정하면 Notebook 또는 Databricks SQL을 사용하여 이러한 문을 실행할 수 있습니다. 다음 코드는 Delta Live Tables 파이프라인의 일부로 실행되지 않습니다 .

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

대상 스트리밍 테이블에서 데이터 추가, 변경 또는 삭제

파이프라인이 Unity 카탈로그에 테이블을 게시하는 경우 삽입, 업데이트, 삭제 및 병합 문을 비롯한 DML(데이터 조작 언어) 문을 사용하여 문으로 APPLY CHANGES INTO 만든 대상 스트리밍 테이블을 수정할 수 있습니다.

참고 항목

  • 스트리밍 테이블의 테이블 스키마를 수정하는 DML 문은 지원되지 않습니다. DML 문이 테이블 스키마의 진화를 시도하지 않는지 확인합니다.
  • 스트리밍 테이블을 업데이트하는 DML 문은 Databricks Runtime 13.1 이상을 사용하여 공유 Unity 카탈로그 클러스터 또는 SQL 웨어하우스에서만 실행할 수 있습니다.
  • 스트리밍에는 추가 전용 데이터 원본이 필요하기 때문에 처리 시 변경 내용이 있는 원본 스트리밍 테이블에서 스트리밍이 필요한 경우(예: DML 문) 원본 스트리밍 테이블을 읽을 때 skipChangeCommits 플래그를 설정합니다. 설정되면 skipChangeCommits 원본 테이블에서 레코드를 삭제하거나 수정하는 트랜잭션은 무시됩니다. 처리에 스트리밍 테이블이 필요하지 않은 경우 구체화된 뷰(추가 전용 제한이 없음)를 대상 테이블로 사용할 수 있습니다.

Delta Live Tables는 지정된 SEQUENCE BY 열을 사용하고 적절한 시퀀싱 값을 __START_AT 대상 테이블의 열(__END_ATSCD 형식 2의 경우)에 전파하므로 DML 문이 이러한 열에 유효한 값을 사용하여 레코드의 적절한 순서를 기본 확인해야 합니다. 델타 라이브 테이블을 사용하여 CDC를 구현하는 방법을 참조 하세요.

스트리밍 테이블과 함께 DML 문을 사용하는 방법에 대한 자세한 내용은 스트리밍 테이블의 데이터 추가, 변경 또는 삭제를 참조 하세요.

다음 예제에서는 시작 시퀀스가 5인 활성 레코드를 삽입합니다.

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);