ZASTOSUJ ZMIANY INTERFEJS API: Upraszczanie przechwytywania zmian danych w tabelach delta Live Tables
Funkcja Delta Live Tables upraszcza przechwytywanie zmian danych (CDC) za pomocą interfejsu APPLY CHANGES
API. Wcześniej instrukcja MERGE INTO
była często używana do przetwarzania rekordów CDC w usłudze Azure Databricks. MERGE INTO
Może jednak generować nieprawidłowe wyniki z powodu rekordów poza sekwencją lub wymagać złożonej logiki w celu ponownego porządkowenia rekordów.
Dzięki automatycznej obsłudze rekordów poza sekwencją interfejs APPLY CHANGES
API w tabelach Delta Live Tables zapewnia poprawne przetwarzanie rekordów CDC i eliminuje konieczność opracowania złożonej logiki do obsługi rekordów poza sekwencją.
Interfejs APPLY CHANGES
API jest obsługiwany w interfejsach DELTA Live Tables SQL i Python, w tym obsługę aktualizowania tabel z typem 1 protokołu SCD i typem 2:
- Użyj typu SCD 1, aby bezpośrednio aktualizować rekordy. Historia nie jest zachowywana dla rekordów, które są aktualizowane.
- Użyj typu SCD 2, aby zachować historię rekordów we wszystkich aktualizacjach lub aktualizacjach określonego zestawu kolumn.
Aby uzyskać informacje o składni i innych odwołaniach, zobacz:
- Zmienianie przechwytywania danych przy użyciu języka Python w tabelach delta live
- Zmienianie przechwytywania danych za pomocą bazy danych SQL w tabelach delta live
- Kontrolowanie zarządzania grobowcami dla zapytań typu SCD 1
Uwaga
W tym artykule opisano sposób aktualizowania tabel w potoku delta Live Tables na podstawie zmian w danych źródłowych. Aby dowiedzieć się, jak rejestrować i wykonywać zapytania dotyczące zmian na poziomie wiersza dla tabel delty, zobacz Use Delta Lake change data feed on Azure Databricks (Używanie zestawienia zmian usługi Delta Lake w usłudze Azure Databricks).
W jaki sposób usługa CDC jest implementowana za pomocą tabel delta live?
Należy określić kolumnę w danych źródłowych, na których mają być sekwencjonujące rekordy, które tabele Delta Live Tables interpretują jako monotonicznie zwiększającą reprezentację prawidłowego porządku danych źródłowych. Delta Live Tables automatycznie obsługuje dane odbierane z zamówienia. W przypadku zmian typu SCD 2 tabele delta live propagują odpowiednie wartości sekwencjonowania do __START_AT
kolumn i __END_AT
tabeli docelowej. Każda wartość sekwencjonowania powinna zawierać jedną odrębną aktualizację na klucz, a wartości sekwencjonowania NULL nie są obsługiwane.
Aby wykonać przetwarzanie CDC za pomocą tabel delta live, należy najpierw utworzyć tabelę przesyłania strumieniowego, a następnie użyć APPLY CHANGES INTO
instrukcji w celu określenia źródła, kluczy i sekwencjonowania dla zestawienia zmian. Aby utworzyć docelową tabelę przesyłania strumieniowego, użyj CREATE OR REFRESH STREAMING TABLE
instrukcji w języku SQL lub create_streaming_table()
funkcji w języku Python. Aby utworzyć instrukcję definiującą przetwarzanie CDC, użyj APPLY CHANGES
instrukcji w języku SQL lub apply_changes()
funkcji w języku Python. Aby uzyskać szczegółowe informacje o składni, zobacz Change data capture with SQL in Delta Live Tables (Zmienianie przechwytywania danych za pomocą języka SQL w tabelach delta live table) lub Change data capture with Python in Delta Live Tables (Przechwytywanie zmian danych za pomocą języka Python w tabelach na żywo funkcji Delta).
Jakie obiekty danych są używane do przetwarzania delta Live Tables CDC?
Po zadeklarowaniu tabeli docelowej w magazynie metadanych Hive tworzone są dwie struktury danych:
- Widok używający nazwy przypisanej do tabeli docelowej.
- Wewnętrzna tabela zapasowa używana przez tabele delta Live Tables do zarządzania przetwarzaniem CDC. Ta tabela ma nazwę prepending
__apply_changes_storage_
na nazwę tabeli docelowej.
Jeśli na przykład zadeklarujesz tabelę docelową o nazwie dlt_cdc_target
, zobaczysz widok o nazwie i tabelę o nazwie dlt_cdc_target
__apply_changes_storage_dlt_cdc_target
w magazynie metadanych. Utworzenie widoku umożliwia funkcji Delta Live Tables filtrowanie dodatkowych informacji (na przykład grobowce i wersje) wymaganych do obsługi danych poza kolejnością. Aby wyświetlić przetworzone dane, wykonaj zapytanie względem widoku docelowego. Ponieważ schemat __apply_changes_storage_
tabeli może ulec zmianie w celu obsługi przyszłych funkcji lub ulepszeń, nie należy wykonywać zapytań dotyczących tabeli pod kątem użycia w środowisku produkcyjnym. W przypadku ręcznego dodawania danych do tabeli zakłada się, że rekordy zostaną wprowadzone przed innymi zmianami, ponieważ brakuje kolumn wersji.
Jeśli potok publikuje w wykazie aparatu Unity, wewnętrzne tabele kopii zapasowych nie są dostępne dla użytkowników.
Pobieranie danych dotyczących rekordów przetwarzanych przez zapytanie CDC tabel delta Live Tables
Następujące metryki są przechwytywane przez apply changes
zapytania:
num_upserted_rows
: liczba wierszy wyjściowych upserted do zestawu danych podczas aktualizacji.num_deleted_rows
: liczba istniejących wierszy wyjściowych usuniętych z zestawu danych podczas aktualizacji.
num_output_rows
Metryka, która jest danymi wyjściowymi przepływów innych niż CDC, nie jest przechwytywana dla apply changes
zapytań.
Ograniczenia
Obiekt docelowy kwerendy APPLY CHANGES INTO
lub apply_changes
funkcji nie może być używany jako źródło dla tabeli przesyłania strumieniowego. Tabela, która odczytuje element docelowy APPLY CHANGES INTO
zapytania lub apply_changes
funkcji, musi być zmaterializowanym widokiem.
Typ scD 1 i typ SCD 2 w usłudze Azure Databricks
W poniższych sekcjach przedstawiono przykłady demonstrujące tabele delta Live Tables TYPU 1 i 2 zapytania aktualizujące tabele docelowe na podstawie zdarzeń źródłowych, które:
- Utwórz nowe rekordy użytkownika.
- Usuń rekord użytkownika.
- Aktualizowanie rekordów użytkowników. W przykładzie scD typu 1 ostatnie
UPDATE
operacje docierają późno i są usuwane z tabeli docelowej, pokazując obsługę zdarzeń poza kolejnością.
W poniższych przykładach założono, że znajomość konfigurowania i aktualizowania potoków tabel na żywo delty. Zobacz Samouczek: uruchamianie pierwszego potoku delty tabel na żywo.
Aby uruchomić te przykłady, musisz zacząć od utworzenia przykładowego zestawu danych. Zobacz Generowanie danych testowych.
Poniżej przedstawiono rekordy wejściowe dla tych przykładów:
Identyfikator użytkownika | name | miejscowość | rozdzielnicy | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Jeśli usuniesz komentarz z końcowego wiersza w przykładowych danych, wstawi następujący rekord, który określa, gdzie rekordy powinny być obcięte:
Identyfikator użytkownika | name | miejscowość | rozdzielnicy | sequenceNum |
---|---|---|---|---|
null | null | null | OBCIĄĆ | 3 |
Uwaga
Wszystkie poniższe przykłady obejmują opcje określania operacji DELETE
i TRUNCATE
, ale każda z nich jest opcjonalna.
Przetwarzanie aktualizacji typu SCD 1
W poniższym przykładzie kodu przedstawiono przetwarzanie aktualizacji typu 1 protokołu SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Po uruchomieniu przykładu typu SCD 1 tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | name | miejscowość |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
Po uruchomieniu przykładowego typu SCD 1 z dodatkowym TRUNCATE
rekordem rekordy 124
i 126
są obcinane z TRUNCATE
powodu operacji w sequenceNum=3
lokalizacji , a tabela docelowa zawiera następujący rekord:
Identyfikator użytkownika | name | miejscowość |
---|---|---|
125 | Mercedes | Guadalajara |
Przetwarzanie aktualizacji typu SCD 2
W poniższym przykładzie kodu przedstawiono przetwarzanie aktualizacji typu 2 protokołu SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Po uruchomieniu przykładowego typu SCD 2 tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | name | miejscowość | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | null |
Zapytanie typu SCD 2 może również określać podzestaw kolumn wyjściowych do śledzenia historii w tabeli docelowej. Zmiany w innych kolumnach są aktualizowane zamiast generowania nowych rekordów historii. W poniższym przykładzie pokazano wykluczenie kolumny city
ze śledzenia:
W poniższym przykładzie pokazano użycie historii śledzenia z typem SCD 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Po uruchomieniu tego przykładu bez dodatkowego TRUNCATE
rekordu tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | name | miejscowość | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
Generowanie danych testowych
Poniższy kod służy do generowania przykładowego zestawu danych do użycia w przykładowych zapytaniach znajdujących się w tym samouczku. Zakładając, że masz odpowiednie poświadczenia, aby utworzyć nowy schemat i utworzyć nową tabelę, możesz wykonać te instrukcje za pomocą notesu lub usługi Databricks SQL. Poniższy kod nie jest przeznaczony do uruchomienia w ramach potoku delta Live Tables:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Dodawanie, zmienianie lub usuwanie danych w docelowej tabeli przesyłania strumieniowego
Jeśli potok publikuje tabele w wykazie aparatu Unity, możesz użyć instrukcji języka manipulowania danymi (DML), w tym wstawiania, aktualizowania, usuwania i scalania instrukcji, aby modyfikować docelowe tabele przesyłania strumieniowego utworzone przez APPLY CHANGES INTO
instrukcje.
Uwaga
- Instrukcje DML modyfikujące schemat tabeli przesyłania strumieniowego nie są obsługiwane. Upewnij się, że instrukcje DML nie próbują rozwijać schematu tabeli.
- Instrukcje DML, które aktualizują tabelę przesyłania strumieniowego, mogą być uruchamiane tylko w udostępnionym klastrze wykazu aparatu Unity lub w usłudze SQL Warehouse przy użyciu środowiska Databricks Runtime 13.3 LTS lub nowszego.
- Ponieważ przesyłanie strumieniowe wymaga źródeł danych tylko do dołączania, jeśli przetwarzanie wymaga przesyłania strumieniowego ze źródłowej tabeli przesyłania strumieniowego ze zmianami (na przykład instrukcjami DML), ustaw flagę skipChangeCommits podczas odczytywania źródłowej tabeli przesyłania strumieniowego. Po
skipChangeCommits
ustawieniu transakcje, które usuwają lub modyfikują rekordy w tabeli źródłowej, są ignorowane. Jeśli przetwarzanie nie wymaga tabeli przesyłania strumieniowego, możesz użyć zmaterializowanego widoku (który nie ma ograniczenia tylko do dołączania) jako tabeli docelowej.
Ponieważ tabele delta live używa określonej SEQUENCE BY
kolumny i propagują odpowiednie wartości sekwencjonowania do __START_AT
kolumn i __END_AT
tabeli docelowej (dla typu SCD 2), należy upewnić się, że instrukcje DML używają prawidłowych wartości dla tych kolumn, aby zachować właściwą kolejność rekordów. Zobacz Jak usługa CDC jest implementowana za pomocą tabel delta live?.
Aby uzyskać więcej informacji na temat używania instrukcji DML z tabelami przesyłania strumieniowego, zobacz Dodawanie, zmienianie lub usuwanie danych w tabeli przesyłania strumieniowego.
Poniższy przykład wstawia aktywny rekord z sekwencją początkową 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);