Az adatminőség kezelése Delta Live Tables használatával

Az elvárásokat az adathalmaz tartalmára vonatkozó adatminőségi korlátozások meghatározására használja. Az elvárások lehetővé teszik, hogy a táblákba érkező adatok megfeleljenek az adatminőségi követelményeknek, és betekintést nyújt az egyes folyamatfrissítések adatminőségébe. Elvárásokat alkalmazhat a Python-dekorátorok vagy SQL-kényszer záradékok használatával végzett lekérdezésekre.

Mik a Delta Live Tables elvárásai?

Az elvárások olyan opcionális záradékok, amelyeket a Delta Live Tables adatkészlet-deklarációihoz ad hozzá, amelyek adatminőség-ellenőrzést alkalmaznak a lekérdezésen áthaladó minden rekordon.

A várakozás három dologból áll:

  • Egy leírás, amely egyedi azonosítóként működik, és lehetővé teszi a korlátozás metrikáinak nyomon követését.
  • Logikai utasítás, amely mindig igaz vagy hamis értéket ad vissza valamilyen megadott feltétel alapján.
  • Olyan művelet, amely akkor hajtható végre, ha egy rekord nem felel meg a várakozásnak, vagyis a logikai érték hamis értéket ad vissza.

Az alábbi mátrix az érvénytelen rekordokra alkalmazható három műveletet mutatja be:

Művelet Eredmény
figyelmeztetés (alapértelmezett) Érvénytelen rekordok vannak megírva a célhoz; hibajelentés az adathalmaz metrikájaként történik.
Csepp Az adatok célba írása előtt érvénytelen rekordok kerülnek elvetésére; hibajelentés az adathalmaz metrikájaként történik.
Nem Érvénytelen rekordok megakadályozzák a frissítés sikerességét. Az újrafeldolgozás előtt manuális beavatkozásra van szükség.

A Delta Live Tables eseménynaplójának lekérdezésével megtekintheti az adatminőségi metrikákat, például a várakozást sértő rekordok számát. Lásd: Delta Live Tables-folyamatok figyelése.

A Delta Live Tables adatkészlet-deklarációs szintaxisának teljes referenciáját lásd: Delta Live Tables Python nyelvi referencia vagy Delta Live Tables SQL nyelvi referencia.

Feljegyzés

Bár bármilyen elvárásba több záradékot is belefoglalhat, csak a Python támogatja a műveletek több elváráson alapuló definiálását. Tekintse meg a több elvárást.

Érvénytelen rekordok megőrzése

Használja az operátort expect , ha olyan rekordokat szeretne tárolni, amelyek megsértik a várakozást. A várakozást sértő rekordok hozzáadódnak a céladatkészlethez az érvényes rekordokkal együtt:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Érvénytelen rekordok elvetése

Az operátorral expect or drop megakadályozhatja az érvénytelen rekordok további feldolgozását. A várakozást sértő rekordokat a rendszer elveti a céladatkészletből:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Érvénytelen rekordok meghiúsulnak

Ha érvénytelen rekordok nem fogadhatók el, az operátorral azonnal leállíthatja a expect or fail végrehajtást, ha egy rekord érvényesítése meghiúsul. Ha a művelet táblafrissítés, a rendszer atomi módon visszaállítja a tranzakciót:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Ha egy folyamat egy várakozási szabálysértés miatt meghiúsul, a folyamat újrafuttatása előtt ki kell javítania a folyamat kódját, hogy megfelelően kezelje az érvénytelen adatokat.

A sikertelen várakozások úgy módosítják az átalakítások Spark-lekérdezési tervét, hogy nyomon kövessék a szabálysértések észleléséhez és jelentéséhez szükséges információkat. Számos lekérdezés esetén ezekkel az információkkal azonosíthatja, hogy melyik bemeneti rekord eredményezte a szabálysértést. A következő példa kivétel:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Több elvárás

A Python-folyamatokban egy vagy több adatminőségi korlátozással definiálhat elvárásokat. Ezek a dekorátorok argumentumként elfogadnak egy Python-szótárat, ahol a kulcs a várakozás neve, az érték pedig a várakozási kényszer.

Több adatminőségi korlátozás megadására használható expect_all , ha a sikertelen érvényesítést tartalmazó rekordokat fel kell venni a céladatkészletbe:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Több adatminőségi korlátozás megadására használható expect_all_or_drop , ha a sikertelen érvényesítést tartalmazó rekordokat el kell dobni a céladatkészletből:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Több adatminőségi korlátozás megadására használható expect_all_or_fail , ha a sikertelen érvényesítést tartalmazó rekordoknak le kell állítaniuk a folyamat végrehajtását:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Az elvárások gyűjteményét változóként is definiálhatja, és átadhatja a folyamat egy vagy több lekérdezésének:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Érvénytelen adatok karanténba helyezése

Az alábbi példa az elvárásokat használja ideiglenes táblákkal és nézetekkel kombinálva. Ez a minta metrikákat biztosít a folyamatfrissítések során a várakozási ellenőrzéseket átadó rekordokhoz, és lehetővé teszi az érvényes és érvénytelen rekordok feldolgozását különböző lefelé irányuló útvonalakon keresztül.

Feljegyzés

Ez a példa a Databricks-adathalmazokban szereplő mintaadatokat olvassa be. Mivel a Databricks-adatkészletek nem támogatottak a Unity Catalogban közzétett folyamatokkal, ez a példa csak a Hive metaadattárban való közzétételre konfigurált folyamattal működik. Ez a minta azonban a Unity Catalog által engedélyezett folyamatokkal is működik, de külső helyekről kell adatokat olvasnia. Ha többet szeretne tudni a Unity Catalog és a Delta Live Tables használatáról, olvassa el a Unity Catalog használata a Delta Live Tables-folyamatokkal című témakört.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Sorok számának ellenőrzése táblák között

Hozzáadhat egy további táblát a folyamathoz, amely meghatározza a sorok számának összehasonlítására vonatkozó várakozást két élő tábla között. Ennek a várakozásnak az eredményei megjelennek az eseménynaplóban és a Delta Live Tables felhasználói felületén. Az alábbi példa az egyenlő sorok számát ellenőrzi a tbla táblák között tblb :

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Speciális ellenőrzés végrehajtása a Delta Live Tables elvárásaival

Az élő táblákat összesítő és összekapcsoló lekérdezésekkel határozhatja meg, és a lekérdezések eredményeit a várakozási ellenőrzés részeként használhatja. Ez akkor hasznos, ha összetett adatminőség-ellenőrzéseket szeretne végezni, például biztosíthatja, hogy egy származtatott tábla tartalmazza a forrástáblából származó összes rekordot, vagy garantálja a numerikus oszlopok egyenlőségét a táblák között. A kulcsszóval TEMPORARY megakadályozhatja, hogy ezek a táblák közzé legyenek téve a célséma számára.

Az alábbi példa ellenőrzi, hogy az összes várt rekord szerepel-e a report táblában:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

Az alábbi példa egy összesítést használ az elsődleges kulcs egyediségének biztosítására:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Az elvárások hordozhatóvá és újrafelhasználhatóvá tétele

Az adatminőségi szabályokat a folyamat implementációitól elkülönítve tarthatja fenn.

A Databricks azt javasolja, hogy a szabályokat egy Delta-táblában tárolja, minden szabályt címkével kategorizálva. Ezt a címkét az adathalmaz-definíciókban használja annak meghatározásához, hogy mely szabályokat kell alkalmazni.

Az alábbi példa létrehoz egy táblát a rules szabályok fenntartásához:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

Az alábbi Python-példa a táblában tárolt szabályok alapján határozza meg az adatminőséggel rules kapcsolatos elvárásokat. A get_rules() függvény beolvassa a szabályokat a rules táblából, és visszaad egy Python-szótárt, amely a függvénynek átadott argumentumnak megfelelő tag szabályokat tartalmazza. A szótár a dekorátorokban @dlt.expect_all_*() az adatminőségi korlátozások kikényszerítéséhez lesz alkalmazva. Ha például a szabályok nem hajthatóak be, a rendszer elveti a táblából a raw_farmers_market következő validity rekordokat:

Feljegyzés

Ez a példa a Databricks-adathalmazokban szereplő mintaadatokat olvassa be. Mivel a Databricks-adatkészletek nem támogatottak a Unity Catalogban közzétett folyamatokkal, ez a példa csak a Hive metaadattárban való közzétételre konfigurált folyamattal működik. Ez a minta azonban a Unity Catalog által engedélyezett folyamatokkal is működik, de külső helyekről kell adatokat olvasnia. Ha többet szeretne tudni a Unity Catalog és a Delta Live Tables használatáról, olvassa el a Unity Catalog használata a Delta Live Tables-folyamatokkal című témakört.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

A szabályok fenntartásához elnevezett rules tábla létrehozása helyett létrehozhat egy Python-modult a fő szabályokhoz, például egy olyan fájlban, amely ugyanabban a mappában található rules_module.py , mint a jegyzetfüzet:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Ezután módosítsa az előző jegyzetfüzetet úgy, hogy importálja a modult, és a get_rules() függvényt úgy módosítja, hogy a táblázat helyett a modulból olvasson rules :

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )