Share via


Gegevenskwaliteit beheren met Delta Live Tables

U gebruikt verwachtingen om beperkingen voor gegevenskwaliteit te definiëren voor de inhoud van een gegevensset. Met de verwachtingen kunt u garanderen dat gegevens die binnenkomen in tabellen voldoen aan de vereisten voor gegevenskwaliteit en inzicht krijgen in de gegevenskwaliteit voor elke pijplijnupdate. U past verwachtingen toe op query's met behulp van Python-decorators of SQL-beperkingsclausules.

Wat zijn de verwachtingen van Delta Live Tables?

Verwachtingen zijn optionele componenten die u toevoegt aan declaraties van gegevenssets van Delta Live Tables die controles van gegevenskwaliteit toepassen op elke record die via een query wordt doorgegeven.

Een verwachting bestaat uit drie dingen:

  • Een beschrijving, die fungeert als een unieke id en waarmee u metrische gegevens voor de beperking kunt bijhouden.
  • Een Booleaanse instructie die altijd waar of onwaar retourneert op basis van een bepaalde voorwaarde.
  • Een actie die moet worden uitgevoerd wanneer een record de verwachting mislukt, wat betekent dat de Booleaanse waarde onwaar retourneert.

In de volgende matrix ziet u de drie acties die u kunt toepassen op ongeldige records:

Actie Result
waarschuwen (standaard) Ongeldige records worden naar het doel geschreven; fout wordt gerapporteerd als een metrische waarde voor de gegevensset.
Drop Ongeldige records worden verwijderd voordat gegevens naar het doel worden geschreven; fout wordt gerapporteerd als metrische gegevens voor de gegevensset.
Niet Ongeldige records verhinderen dat de update slaagt. Handmatige interventie is vereist voordat de verwerking opnieuw wordt uitgevoerd.

U kunt metrische gegevens over de kwaliteit van gegevens bekijken, zoals het aantal records dat een verwachting schendt door een query uit te voeren op het gebeurtenislogboek van Delta Live Tables. Zie Delta Live Tables-pijplijnen bewaken.

Zie voor een volledig overzicht van de declaratiesyntaxis van Delta Live Tables-gegevenssets delta livetabellen python of sql-taalreferentie voor Delta Live Tables.

Notitie

Hoewel u meerdere componenten in elke verwachting kunt opnemen, ondersteunt alleen Python het definiëren van acties op basis van meerdere verwachtingen. Bekijk meerdere verwachtingen.

Ongeldige records behouden

Gebruik de expect operator als u records wilt bewaren die in strijd zijn met de verwachting. Records die in strijd zijn met de verwachting, worden toegevoegd aan de doelgegevensset, samen met geldige records:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Ongeldige records verwijderen

Gebruik de expect or drop operator om verdere verwerking van ongeldige records te voorkomen. Records die in strijd zijn met de verwachting, worden verwijderd uit de doelgegevensset:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Mislukt bij ongeldige records

Wanneer ongeldige records onacceptabel zijn, gebruikt u de operator om de expect or fail uitvoering onmiddellijk te stoppen wanneer de validatie van een record mislukt. Als de bewerking een tabelupdate is, wordt de transactie door het systeem atomisch teruggedraaid:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Wanneer een pijplijn mislukt vanwege een schending van de verwachting, moet u de pijplijncode corrigeren om de ongeldige gegevens correct te verwerken voordat u de pijplijn opnieuw uitvoert.

Bij het mislukken van de verwachtingen wordt het Spark-queryplan van uw transformaties gewijzigd om informatie bij te houden die nodig is om schendingen te detecteren en te rapporteren. Voor veel query's kunt u deze informatie gebruiken om te bepalen welke invoerrecord heeft geleid tot de schending. Hier volgt een voorbeeld van een uitzondering:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Meerdere verwachtingen

U kunt verwachtingen definiëren met een of meer beperkingen voor gegevenskwaliteit in Python-pijplijnen. Deze decorators accepteren een Python-woordenlijst als argument, waarbij de sleutel de naam van de verwachting is en de waarde de verwachtingsbeperking is.

Gebruik expect_all dit om meerdere beperkingen voor gegevenskwaliteit op te geven wanneer records die niet kunnen worden gevalideerd, moeten worden opgenomen in de doelgegevensset:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Gebruik expect_all_or_drop dit om meerdere beperkingen voor gegevenskwaliteit op te geven wanneer records die niet kunnen worden gevalideerd, moeten worden verwijderd uit de doelgegevensset:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Gebruik expect_all_or_fail dit om meerdere beperkingen voor gegevenskwaliteit op te geven wanneer records die mislukken bij validatie, de uitvoering van pijplijnen moeten stoppen:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

U kunt ook een verzameling verwachtingen definiëren als een variabele en deze doorgeven aan een of meer query's in uw pijplijn:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Ongeldige gegevens in quarantaine plaatsen

In het volgende voorbeeld worden verwachtingen gebruikt in combinatie met tijdelijke tabellen en weergaven. Dit patroon biedt u metrische gegevens voor records die verwachtingen tijdens pijplijnupdates doorgeven en biedt een manier om geldige en ongeldige records via verschillende downstreampaden te verwerken.

Notitie

In dit voorbeeld worden voorbeeldgegevens gelezen die zijn opgenomen in de Databricks-gegevenssets. Omdat de Databricks-gegevenssets niet worden ondersteund met een pijplijn die naar Unity Catalog wordt gepubliceerd, werkt dit voorbeeld alleen met een pijplijn die is geconfigureerd om te publiceren naar de Hive-metastore. Dit patroon werkt echter ook met pijplijnen met Unity Catalog, maar u moet gegevens van externe locaties lezen. Raadpleeg Unity Catalog gebruiken met uw Delta Live Tables-pijplijnen voor meer informatie over het gebruik van Unity Catalog met Delta Live Tables.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Aantal rijen valideren in verschillende tabellen

U kunt een extra tabel toevoegen aan uw pijplijn waarmee een verwachting wordt gedefinieerd voor het vergelijken van het aantal rijen tussen twee livetabellen. De resultaten van deze verwachting worden weergegeven in het gebeurtenislogboek en de gebruikersinterface van Delta Live Tables. In dit volgende voorbeeld worden gelijke aantal rijen tussen de tbla en tblb tabellen gevalideerd:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Geavanceerde validatie uitvoeren met de verwachtingen van Delta Live Tables

U kunt livetabellen definiëren met behulp van aggregaties en joinquery's en de resultaten van deze query's gebruiken als onderdeel van uw verwachtingscontrole. Dit is handig als u complexe gegevenskwaliteitscontroles wilt uitvoeren, bijvoorbeeld om ervoor te zorgen dat een afgeleide tabel alle records uit de brontabel bevat of de gelijkheid van een numerieke kolom tussen tabellen garandeert. U kunt het TEMPORARY trefwoord gebruiken om te voorkomen dat deze tabellen worden gepubliceerd naar het doelschema.

In het volgende voorbeeld wordt gecontroleerd of alle verwachte records aanwezig zijn in de report tabel:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

In het volgende voorbeeld wordt een statistische functie gebruikt om de uniekheid van een primaire sleutel te garanderen:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Verwachtingen draagbaar en herbruikbaar maken

U kunt regels voor gegevenskwaliteit afzonderlijk van uw pijplijn-implementaties onderhouden.

Databricks raadt aan de regels op te slaan in een Delta-tabel, waarbij elke regel wordt gecategoriseerd door een tag. U gebruikt deze tag in gegevenssetdefinities om te bepalen welke regels moeten worden toegepast.

In het volgende voorbeeld wordt een tabel gemaakt met de naam rules voor het onderhouden van regels:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

In het volgende Python-voorbeeld worden de verwachtingen voor gegevenskwaliteit gedefinieerd op basis van de regels die zijn opgeslagen in de rules tabel. De get_rules() functie leest de regels uit de rules tabel en retourneert een Python-woordenlijst met regels die overeenkomen met het tag argument dat aan de functie is doorgegeven. De woordenlijst wordt toegepast in de @dlt.expect_all_*() decorators om beperkingen voor gegevenskwaliteit af te dwingen. Records die niet voldoen aan de regels die zijn validity getagd, worden bijvoorbeeld verwijderd uit de raw_farmers_market tabel:

Notitie

In dit voorbeeld worden voorbeeldgegevens gelezen die zijn opgenomen in de Databricks-gegevenssets. Omdat de Databricks-gegevenssets niet worden ondersteund met een pijplijn die naar Unity Catalog wordt gepubliceerd, werkt dit voorbeeld alleen met een pijplijn die is geconfigureerd om te publiceren naar de Hive-metastore. Dit patroon werkt echter ook met pijplijnen met Unity Catalog, maar u moet gegevens van externe locaties lezen. Raadpleeg Unity Catalog gebruiken met uw Delta Live Tables-pijplijnen voor meer informatie over het gebruik van Unity Catalog met Delta Live Tables.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

In plaats van een tabel met de naam rules regels te maken, kunt u een Python-module maken voor de belangrijkste regels, bijvoorbeeld in een bestand met de naam rules_module.py in dezelfde map als het notebook:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Wijzig vervolgens het voorgaande notebook door de module te importeren en de get_rules() functie te wijzigen die uit de module moet worden gelezen in plaats van uit de rules tabel:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )