Udostępnij za pośrednictwem


ZASTOSUJ ZMIANY INTERFEJS API: Upraszczanie przechwytywania zmian danych w tabelach delta Live Tables

Funkcja Delta Live Tables upraszcza przechwytywanie zmian danych (CDC) za pomocą interfejsu APPLY CHANGES API. Wcześniej instrukcja MERGE INTO była często używana do przetwarzania rekordów CDC w usłudze Azure Databricks. MERGE INTO Może jednak generować nieprawidłowe wyniki z powodu rekordów poza sekwencją lub wymagać złożonej logiki w celu ponownego porządkowenia rekordów.

Dzięki automatycznej obsłudze rekordów poza sekwencją interfejs APPLY CHANGES API w tabelach Delta Live Tables zapewnia poprawne przetwarzanie rekordów CDC i eliminuje konieczność opracowania złożonej logiki do obsługi rekordów poza sekwencją.

Interfejs APPLY CHANGES API jest obsługiwany w interfejsach DELTA Live Tables SQL i Python, w tym obsługę aktualizowania tabel z typem 1 protokołu SCD i typem 2:

  • Użyj typu SCD 1, aby bezpośrednio aktualizować rekordy. Historia nie jest zachowywana dla rekordów, które są aktualizowane.
  • Użyj typu SCD 2, aby zachować historię rekordów we wszystkich aktualizacjach lub aktualizacjach określonego zestawu kolumn.

Aby uzyskać informacje o składni i innych odwołaniach, zobacz:

Uwaga

W tym artykule opisano sposób aktualizowania tabel w potoku delta Live Tables na podstawie zmian w danych źródłowych. Aby dowiedzieć się, jak rejestrować i wykonywać zapytania dotyczące zmian na poziomie wiersza dla tabel delty, zobacz Use Delta Lake change data feed on Azure Databricks (Używanie zestawienia zmian usługi Delta Lake w usłudze Azure Databricks).

W jaki sposób usługa CDC jest implementowana za pomocą tabel delta live?

Należy określić kolumnę w danych źródłowych, na których mają być sekwencjonujące rekordy, które tabele Delta Live Tables interpretują jako monotonicznie zwiększającą reprezentację prawidłowego porządku danych źródłowych. Delta Live Tables automatycznie obsługuje dane odbierane z zamówienia. W przypadku zmian typu SCD 2 tabele delta live propagują odpowiednie wartości sekwencjonowania do __START_AT kolumn i __END_AT tabeli docelowej. Każda wartość sekwencjonowania powinna zawierać jedną odrębną aktualizację na klucz, a wartości sekwencjonowania NULL nie są obsługiwane.

Aby wykonać przetwarzanie CDC za pomocą tabel delta live, należy najpierw utworzyć tabelę przesyłania strumieniowego, a następnie użyć APPLY CHANGES INTO instrukcji w celu określenia źródła, kluczy i sekwencjonowania dla zestawienia zmian. Aby utworzyć docelową tabelę przesyłania strumieniowego, użyj CREATE OR REFRESH STREAMING TABLE instrukcji w języku SQL lub create_streaming_table() funkcji w języku Python. Aby utworzyć instrukcję definiującą przetwarzanie CDC, użyj APPLY CHANGES instrukcji w języku SQL lub apply_changes() funkcji w języku Python. Aby uzyskać szczegółowe informacje o składni, zobacz Change data capture with SQL in Delta Live Tables (Zmienianie przechwytywania danych za pomocą języka SQL w tabelach delta live table) lub Change data capture with Python in Delta Live Tables (Przechwytywanie zmian danych za pomocą języka Python w tabelach na żywo funkcji Delta).

Jakie obiekty danych są używane do przetwarzania delta Live Tables CDC?

Po zadeklarowaniu tabeli docelowej w magazynie metadanych Hive tworzone są dwie struktury danych:

  • Widok używający nazwy przypisanej do tabeli docelowej.
  • Wewnętrzna tabela zapasowa używana przez tabele delta Live Tables do zarządzania przetwarzaniem CDC. Ta tabela ma nazwę prepending __apply_changes_storage_ na nazwę tabeli docelowej.

Jeśli na przykład zadeklarujesz tabelę docelową o nazwie dlt_cdc_target, zobaczysz widok o nazwie i tabelę o nazwie dlt_cdc_target__apply_changes_storage_dlt_cdc_target w magazynie metadanych. Utworzenie widoku umożliwia funkcji Delta Live Tables filtrowanie dodatkowych informacji (na przykład grobowce i wersje) wymaganych do obsługi danych poza kolejnością. Aby wyświetlić przetworzone dane, wykonaj zapytanie względem widoku docelowego. Ponieważ schemat __apply_changes_storage_ tabeli może ulec zmianie w celu obsługi przyszłych funkcji lub ulepszeń, nie należy wykonywać zapytań dotyczących tabeli pod kątem użycia w środowisku produkcyjnym. W przypadku ręcznego dodawania danych do tabeli zakłada się, że rekordy zostaną wprowadzone przed innymi zmianami, ponieważ brakuje kolumn wersji.

Jeśli potok publikuje w wykazie aparatu Unity, wewnętrzne tabele kopii zapasowych nie są dostępne dla użytkowników.

Pobieranie danych dotyczących rekordów przetwarzanych przez zapytanie CDC tabel delta Live Tables

Następujące metryki są przechwytywane przez apply changes zapytania:

  • num_upserted_rows: liczba wierszy wyjściowych upserted do zestawu danych podczas aktualizacji.
  • num_deleted_rows: liczba istniejących wierszy wyjściowych usuniętych z zestawu danych podczas aktualizacji.

num_output_rows Metryka, która jest danymi wyjściowymi przepływów innych niż CDC, nie jest przechwytywana dla apply changes zapytań.

Ograniczenia

Obiekt docelowy kwerendy APPLY CHANGES INTO lub apply_changes funkcji nie może być używany jako źródło dla tabeli przesyłania strumieniowego. Tabela, która odczytuje element docelowy APPLY CHANGES INTO zapytania lub apply_changes funkcji, musi być zmaterializowanym widokiem.

Typ scD 1 i typ SCD 2 w usłudze Azure Databricks

W poniższych sekcjach przedstawiono przykłady demonstrujące tabele delta Live Tables TYPU 1 i 2 zapytania aktualizujące tabele docelowe na podstawie zdarzeń źródłowych, które:

  1. Utwórz nowe rekordy użytkownika.
  2. Usuń rekord użytkownika.
  3. Aktualizowanie rekordów użytkowników. W przykładzie scD typu 1 ostatnie UPDATE operacje docierają późno i są usuwane z tabeli docelowej, pokazując obsługę zdarzeń poza kolejnością.

W poniższych przykładach założono, że znajomość konfigurowania i aktualizowania potoków tabel na żywo delty. Zobacz Samouczek: uruchamianie pierwszego potoku delty tabel na żywo.

Aby uruchomić te przykłady, musisz zacząć od utworzenia przykładowego zestawu danych. Zobacz Generowanie danych testowych.

Poniżej przedstawiono rekordy wejściowe dla tych przykładów:

Identyfikator użytkownika name miejscowość rozdzielnicy sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Jeśli usuniesz komentarz z końcowego wiersza w przykładowych danych, wstawi następujący rekord, który określa, gdzie rekordy powinny być obcięte:

Identyfikator użytkownika name miejscowość rozdzielnicy sequenceNum
null null null OBCIĄĆ 3

Uwaga

Wszystkie poniższe przykłady obejmują opcje określania operacji DELETE i TRUNCATE , ale każda z nich jest opcjonalna.

Przetwarzanie aktualizacji typu SCD 1

W poniższym przykładzie kodu przedstawiono przetwarzanie aktualizacji typu 1 protokołu SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Po uruchomieniu przykładu typu SCD 1 tabela docelowa zawiera następujące rekordy:

Identyfikator użytkownika name miejscowość
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

Po uruchomieniu przykładowego typu SCD 1 z dodatkowym TRUNCATE rekordem rekordy 124 i 126 są obcinane z TRUNCATE powodu operacji w sequenceNum=3lokalizacji , a tabela docelowa zawiera następujący rekord:

Identyfikator użytkownika name miejscowość
125 Mercedes Guadalajara

Przetwarzanie aktualizacji typu SCD 2

W poniższym przykładzie kodu przedstawiono przetwarzanie aktualizacji typu 2 protokołu SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Po uruchomieniu przykładowego typu SCD 2 tabela docelowa zawiera następujące rekordy:

Identyfikator użytkownika name miejscowość __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 null

Zapytanie typu SCD 2 może również określać podzestaw kolumn wyjściowych do śledzenia historii w tabeli docelowej. Zmiany w innych kolumnach są aktualizowane zamiast generowania nowych rekordów historii. W poniższym przykładzie pokazano wykluczenie kolumny city ze śledzenia:

W poniższym przykładzie pokazano użycie historii śledzenia z typem SCD 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Po uruchomieniu tego przykładu bez dodatkowego TRUNCATE rekordu tabela docelowa zawiera następujące rekordy:

Identyfikator użytkownika name miejscowość __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

Generowanie danych testowych

Poniższy kod służy do generowania przykładowego zestawu danych do użycia w przykładowych zapytaniach znajdujących się w tym samouczku. Zakładając, że masz odpowiednie poświadczenia, aby utworzyć nowy schemat i utworzyć nową tabelę, możesz wykonać te instrukcje za pomocą notesu lub usługi Databricks SQL. Poniższy kod nie jest przeznaczony do uruchomienia w ramach potoku delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Dodawanie, zmienianie lub usuwanie danych w docelowej tabeli przesyłania strumieniowego

Jeśli potok publikuje tabele w wykazie aparatu Unity, możesz użyć instrukcji języka manipulowania danymi (DML), w tym wstawiania, aktualizowania, usuwania i scalania instrukcji, aby modyfikować docelowe tabele przesyłania strumieniowego utworzone przez APPLY CHANGES INTO instrukcje.

Uwaga

  • Instrukcje DML modyfikujące schemat tabeli przesyłania strumieniowego nie są obsługiwane. Upewnij się, że instrukcje DML nie próbują rozwijać schematu tabeli.
  • Instrukcje DML, które aktualizują tabelę przesyłania strumieniowego, mogą być uruchamiane tylko w udostępnionym klastrze wykazu aparatu Unity lub w usłudze SQL Warehouse przy użyciu środowiska Databricks Runtime 13.3 LTS lub nowszego.
  • Ponieważ przesyłanie strumieniowe wymaga źródeł danych tylko do dołączania, jeśli przetwarzanie wymaga przesyłania strumieniowego ze źródłowej tabeli przesyłania strumieniowego ze zmianami (na przykład instrukcjami DML), ustaw flagę skipChangeCommits podczas odczytywania źródłowej tabeli przesyłania strumieniowego. Po skipChangeCommits ustawieniu transakcje, które usuwają lub modyfikują rekordy w tabeli źródłowej, są ignorowane. Jeśli przetwarzanie nie wymaga tabeli przesyłania strumieniowego, możesz użyć zmaterializowanego widoku (który nie ma ograniczenia tylko do dołączania) jako tabeli docelowej.

Ponieważ tabele delta live używa określonej SEQUENCE BY kolumny i propagują odpowiednie wartości sekwencjonowania do __START_AT kolumn i __END_AT tabeli docelowej (dla typu SCD 2), należy upewnić się, że instrukcje DML używają prawidłowych wartości dla tych kolumn, aby zachować właściwą kolejność rekordów. Zobacz Jak usługa CDC jest implementowana za pomocą tabel delta live?.

Aby uzyskać więcej informacji na temat używania instrukcji DML z tabelami przesyłania strumieniowego, zobacz Dodawanie, zmienianie lub usuwanie danych w tabeli przesyłania strumieniowego.

Poniższy przykład wstawia aktywny rekord z sekwencją początkową 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);