Zarządzanie jakością danych za pomocą tabel delta live

Oczekiwania są używane do definiowania ograniczeń jakości danych dotyczących zawartości zestawu danych. Oczekiwania pozwalają zagwarantować, że dane przychodzące w tabelach spełniają wymagania dotyczące jakości danych i zapewniają wgląd w jakość danych dla każdej aktualizacji potoku. Oczekiwania są stosowane do zapytań przy użyciu dekoratorów języka Python lub klauzul ograniczeń SQL.

Co to są oczekiwania usługi Delta Live Tables?

Oczekiwania to opcjonalne klauzule dodawane do deklaracji zestawu danych delta Live Tables, które stosują kontrole jakości danych dla każdego rekordu przechodzącego przez zapytanie.

Oczekiwanie składa się z trzech rzeczy:

  • Opis, który działa jako unikatowy identyfikator i umożliwia śledzenie metryk dla ograniczenia.
  • Instrukcja logiczna, która zawsze zwraca wartość true lub false na podstawie określonego warunku.
  • Akcja do wykonania, gdy rekord zakończy się niepowodzeniem oczekiwania, co oznacza, że wartość logiczna zwraca wartość false.

Poniższa macierz przedstawia trzy akcje, które można zastosować do nieprawidłowych rekordów:

Akcja Result
ostrzegaj (ustawienie domyślne) Nieprawidłowe rekordy są zapisywane w obiekcie docelowym; błąd jest zgłaszany jako metryka dla zestawu danych.
Upuść Nieprawidłowe rekordy są porzucane, zanim dane zostaną zapisane w obiekcie docelowym; błąd jest zgłaszany jako metryki dla zestawu danych.
Nie Nieprawidłowe rekordy uniemożliwiają pomyślne zaktualizowanie. Interwencja ręczna jest wymagana przed ponownym przetworzeniem.

Możesz wyświetlić metryki jakości danych, takie jak liczba rekordów, które naruszają oczekiwania, wysyłając zapytanie do dziennika zdarzeń usługi Delta Live Tables. Zobacz Monitor Delta Live Tables pipelines (Monitorowanie potoków tabel na żywo funkcji Delta).

Aby uzyskać pełną dokumentację składni deklaracji zestawu danych delta Live Tables, zobacz Dokumentacja języka Python tabel delta Live Tables lub Dokumentacja języka SQL tabel delta live tables.

Uwaga

Chociaż można uwzględnić wiele klauzul w dowolnym oczekiwaniu, tylko język Python obsługuje definiowanie akcji na podstawie wielu oczekiwań. Zobacz Wiele oczekiwań.

Zachowaj nieprawidłowe rekordy

expect Użyj operatora , jeśli chcesz przechowywać rekordy naruszające oczekiwania. Rekordy naruszające oczekiwania są dodawane do docelowego zestawu danych wraz z prawidłowymi rekordami:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Usuwanie nieprawidłowych rekordów

Użyj operatora , expect or drop aby zapobiec dalszemu przetwarzaniu nieprawidłowych rekordów. Rekordy naruszające oczekiwania są porzucane z docelowego zestawu danych:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Niepowodzenie w nieprawidłowych rekordach

Jeśli nieprawidłowe rekordy są niedopuszczalne, użyj expect or fail operatora , aby zatrzymać wykonywanie natychmiast, gdy rekord zakończy się niepowodzeniem weryfikacji. Jeśli operacja jest aktualizacją tabeli, system niepodzieal cofa transakcję:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Gdy potok zakończy się niepowodzeniem z powodu naruszenia oczekiwań, należy naprawić kod potoku, aby poprawnie obsłużyć nieprawidłowe dane przed ponownym uruchomieniem potoku.

Oczekiwania dotyczące niepowodzenia modyfikują plan zapytania platformy Spark przekształceń, aby śledzić informacje wymagane do wykrywania naruszeń i zgłaszania ich. W przypadku wielu zapytań można użyć tych informacji, aby określić, który rekord wejściowy spowodował naruszenie. Oto przykładowy wyjątek:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Wiele oczekiwań

Oczekiwania można zdefiniować przy użyciu co najmniej jednego ograniczenia jakości danych w potokach języka Python. Te dekoratory akceptują słownik języka Python jako argument, gdzie klucz jest nazwą oczekiwania, a wartość jest ograniczeniem oczekiwania.

Służy expect_all do określania wielu ograniczeń dotyczących jakości danych, gdy rekordy, które nie powiodły się weryfikacji, powinny być uwzględnione w docelowym zestawie danych:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Służy expect_all_or_drop do określania wielu ograniczeń dotyczących jakości danych, gdy rekordy, które nie powiodły się weryfikacji, powinny zostać usunięte z docelowego zestawu danych:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Użyj expect_all_or_fail polecenia , aby określić wiele ograniczeń dotyczących jakości danych, gdy rekordy, które nie powiodły się, walidacja powinna zatrzymać wykonywanie potoku:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Możesz również zdefiniować kolekcję oczekiwań jako zmienną i przekazać ją do co najmniej jednego zapytania w potoku:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Kwarantanna nieprawidłowych danych

Poniższy przykład używa oczekiwań w połączeniu z tabelami tymczasowymi i widokami. Ten wzorzec zawiera metryki dla rekordów, które przechodzą testy oczekiwań podczas aktualizacji potoku, oraz umożliwiają przetwarzanie prawidłowych i nieprawidłowych rekordów za pośrednictwem różnych ścieżek podrzędnych.

Uwaga

W tym przykładzie są odczytywane przykładowe dane zawarte w zestawach danych usługi Databricks. Ponieważ zestawy danych usługi Databricks nie są obsługiwane w potoku publikowanym w wykazie aparatu Unity, ten przykład działa tylko z potokiem skonfigurowanym do publikowania w magazynie metadanych Hive. Jednak ten wzorzec działa również z potokami obsługującymi wykaz aparatu Unity, ale musisz odczytywać dane z lokalizacji zewnętrznych. Aby dowiedzieć się więcej o korzystaniu z rozwiązania Unity Catalog z tabelami Delta Live Tables, zobacz Używanie ozwiązania Unity Catalog z potokami platformy Delta Live Tables.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Weryfikowanie liczby wierszy w tabelach

Do potoku można dodać dodatkową tabelę, która definiuje oczekiwania dotyczące porównywania liczby wierszy między dwiema tabelami na żywo. Wyniki tego oczekiwania pojawiają się w dzienniku zdarzeń i interfejsie użytkownika tabel delta Live Tables. W poniższym przykładzie sprawdza, czy liczba wierszy między tabelami tbla i tblb jest równa:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Przeprowadzanie zaawansowanej walidacji przy użyciu oczekiwań tabel delta Live Tables

Tabele na żywo można definiować przy użyciu zapytań agregujących i sprzężonych oraz używać wyników tych zapytań w ramach sprawdzania oczekiwań. Jest to przydatne, jeśli chcesz wykonać złożone kontrole jakości danych, na przykład zapewnienie, że tabela pochodna zawiera wszystkie rekordy z tabeli źródłowej lub gwarantuje równość kolumny liczbowej między tabelami. Możesz użyć słowa kluczowego TEMPORARY , aby zapobiec opublikowaniu tych tabel w schemacie docelowym.

Poniższy przykład sprawdza, czy wszystkie oczekiwane rekordy znajdują się w report tabeli:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

W poniższym przykładzie użyto agregacji w celu zapewnienia unikatowości klucza podstawowego:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Oczekiwanie dotyczące przenoszenia i wielokrotnego użytku

Reguły jakości danych można zachować niezależnie od implementacji potoku.

Usługa Databricks zaleca przechowywanie reguł w tabeli delty z każdą regułą skategoryzowaną według tagu. Ten tag jest używany w definicjach zestawu danych, aby określić, które reguły mają być stosowane.

Poniższy przykład tworzy tabelę o nazwie rules w celu zachowania reguł:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

Poniższy przykład w języku Python definiuje oczekiwania dotyczące jakości danych na podstawie reguł przechowywanych w rules tabeli. Funkcja get_rules() odczytuje reguły z rules tabeli i zwraca słownik języka Python zawierający reguły pasujące do argumentu tag przekazanego do funkcji. Słownik jest stosowany w dekoratorach w @dlt.expect_all_*() celu wymuszania ograniczeń dotyczących jakości danych. Na przykład wszystkie rekordy zakończone niepowodzeniem reguł oznaczonych tagiem validity zostaną usunięte z raw_farmers_market tabeli:

Uwaga

W tym przykładzie są odczytywane przykładowe dane zawarte w zestawach danych usługi Databricks. Ponieważ zestawy danych usługi Databricks nie są obsługiwane w potoku publikowanym w wykazie aparatu Unity, ten przykład działa tylko z potokiem skonfigurowanym do publikowania w magazynie metadanych Hive. Jednak ten wzorzec działa również z potokami obsługującymi wykaz aparatu Unity, ale musisz odczytywać dane z lokalizacji zewnętrznych. Aby dowiedzieć się więcej o korzystaniu z rozwiązania Unity Catalog z tabelami Delta Live Tables, zobacz Używanie ozwiązania Unity Catalog z potokami platformy Delta Live Tables.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Zamiast tworzyć tabelę o nazwie rules do obsługi reguł, można utworzyć moduł języka Python do głównych reguł, na przykład w pliku o nazwie rules_module.py w tym samym folderze co notes:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Następnie zmodyfikuj powyższy notes, importując moduł i zmieniając get_rules() funkcję na odczyt z modułu rules zamiast z tabeli:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )