Gérer la qualité des données avec Delta Live Tables

Vous utilisez les attentes pour définir les contraintes de qualité des données sur le contenu d’un jeu de données. Les attentes vous permettent de garantir que les données arrivant dans des tables répondent aux exigences de qualité des données et fournissent des insights sur la qualité des données pour chaque mise à jour de pipeline. Vous appliquez des attentes aux requêtes à l’aide d’éléments décoratifs Python ou de clauses de contrainte SQL.

Quelles sont les attentes Delta Live Tables ?

Les attentes sont des clauses facultatives que vous ajoutez aux déclarations de jeu de données Delta Live Tables qui appliquent des vérifications de qualité des données sur chaque enregistrement passant par une requête.

Une attente se compose de trois éléments :

  • Une description, qui agit en tant qu’identificateur unique et vous permet de suivre les métriques de la contrainte.
  • Une instruction booléenne qui retourne toujours true ou false en fonction d’une condition indiquée.
  • Une action à entreprendre lorsqu’un enregistrement ne répond pas à l’attente, ce qui signifie que l’instruction booléenne retourne false.

La matrice suivante montre les trois actions que vous pouvez appliquer aux enregistrements non valides :

Pour Résultat
avertir (par défaut) Les enregistrements non valides sont écrits dans la cible ; l’échec est signalé comme une métrique pour le jeu de données.
supprimer Les enregistrements non valides sont supprimés avant que les données ne soient écrites dans la cible ; l’échec est signalé en tant que métrique pour le jeu de données.
fail Les enregistrements non valides empêchent la mise à jour de réussir. Une intervention manuelle est nécessaire avant le nouveau traitement.

Vous pouvez afficher les mesures de qualité des données, telles que le nombre d’enregistrements qui violent une attente, en interrogeant le journal des événements de Delta Live Tables. Consultez Surveiller les pipelines Delta Live Tables.

Pour obtenir une référence complète de la syntaxe de déclaration de jeu de données Delta Live Tables, consultez la référence du langage Python Delta Live Tables ou la référence du langage SQL Delta Live Tables.

Remarque

Bien que vous puissiez inclure plusieurs clauses dans n’importe quelle attente, seul Python prend en charge la définition d’actions basées sur plusieurs attentes. Consultez Attentes multiples.

Conserver les enregistrements non valides

Utilisez l’opérateur expect lorsque vous souhaitez conserver les enregistrements qui violent l’attente. Les enregistrements qui violent l’attente sont ajoutés au jeu de données cible avec des enregistrements valides :

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Supprimer les enregistrements non valides

Utilisez l’opérateur expect or drop pour empêcher le traitement ultérieur des enregistrements non valides. Les enregistrements qui violent l’attente sont supprimés du jeu de données cible :

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Échec sur les enregistrements non valides

Lorsque des enregistrements non valides sont inacceptables, utilisez l’opérateur expect or fail pour arrêter immédiatement l’exécution lorsque la validation de l’enregistrement échoue. Si l’opération est une mise à jour de table, le système restaure atomiquement la transaction :

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Lorsqu’un pipeline échoue en raison d’une violation des attentes, vous devez corriger le code du pipeline pour gérer correctement les données non valides avant de réexécuter le pipeline.

Les attentes d’échec modifient le plan de requête Spark de vos transformations pour suivre les informations nécessaires à la détection et au signalement des violations. Pour de nombreuses requêtes, vous pouvez utiliser ces informations pour identifier l’enregistrement d’entrée qui a provoqué la violation. Voici un exemple d’exception :

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Attentes multiples

Vous pouvez définir des attentes avec une ou plusieurs contraintes de qualité des données dans des pipelines Python. Ces éléments décoratifs acceptent un dictionnaire Python comme argument, où la clé est le nom de l’attente et la valeur est la contrainte de l’attente.

Utilisez expect_all pour spécifier plusieurs contraintes de qualité des données lorsque les enregistrements dont la validation échoue doivent être inclus dans le jeu de données cible :

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Utilisez expect_all_or_drop pour spécifier plusieurs contraintes de qualité des données lorsque les enregistrements dont la validation échoue doivent être supprimés du jeu de données cible :

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Utilisez expect_all_or_fail pour spécifier plusieurs contraintes de qualité des données lorsque les enregistrements dont la validation échoue doivent interrompre l’exécution du pipeline :

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Vous pouvez également définir une collection d’attentes en tant que variable et la transmettre à une ou plusieurs requêtes de votre pipeline :

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Mettre en quarantaine des données non valides

L’exemple suivant utilise des attentes en combinaison avec des tables et des vues temporaires. Ce modèle vous fournit des métriques pour les enregistrements qui passent des vérifications d’attente pendant les mises à jour du pipeline et fournit un moyen de traiter des enregistrements valides et non valides via différents chemins en aval.

Remarque

Cet exemple lit des exemples de données incluses dans les jeux de données Databricks. Étant donné que les jeux de données Databricks ne sont pas pris en charge avec un pipeline qui publie dans Unity Catalog, cet exemple fonctionne uniquement avec un pipeline configuré pour publier sur le metastore Hive. Toutefois, ce modèle fonctionne également avec les pipelines compatibles avec Unity Catalog, mais vous devez lire des données à partir d’emplacements externes. Pour en savoir plus sur l’utilisation de Unity Catalog avec Delta Live Tables, consultez l’article Utiliser Unity Catalog avec vos pipelines Delta Live Tables.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Valider le nombre de lignes dans les tables

Vous pouvez ajouter une table supplémentaire à votre pipeline qui définit une attente pour comparer les nombres de lignes entre deux tables actives. Les résultats de cette attente s’affichent dans le journal des événements et l’interface utilisateur des tables dynamiques Delta. L’exemple suivant valide les nombres de lignes égaux entre les tables tbla et tblb :

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Effectuer une validation avancée avec des attentes Delta Live Tables

Vous pouvez définir des tables actives à l'aide de requêtes d'agrégation et de jointure et utiliser les résultats de ces requêtes dans le cadre de votre vérification des attentes. Cela est utile si vous souhaitez effectuer des vérifications complexes de la qualité des données, par exemple pour vous assurer qu’une table dérivée contient tous les enregistrements de la table source ou pour garantir l’égalité d’une colonne numérique entre les tables. Vous pouvez utiliser le mot clé TEMPORARY pour empêcher la publication de ces tables dans le schéma cible.

L’exemple suivant vérifie que tous les enregistrements attendus sont présents dans la table report :

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

L’exemple suivant utilise un agrégat pour garantir l’unicité d’une clé primaire :

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Rendre les attentes portables et réutilisables

Vous pouvez gérer les règles de qualité des données séparément de vos implémentations de pipeline.

Databricks recommande de stocker les règles dans une table Delta avec chaque règle catégorisée par une balise. Vous utilisez cette balise dans les définitions de jeu de données pour déterminer les règles à appliquer.

L’exemple suivant crée une table nommée rules pour gérer les règles :

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

L’exemple Python suivant définit des attentes de qualité des données en fonction des règles stockées dans la table rules. La fonction get_rules() lit les règles à partir de la table rules et retourne un dictionnaire Python contenant des règles correspondant à l’argument tag passé à la fonction. Le dictionnaire est appliqué dans les décorateurs @dlt.expect_all_*() pour appliquer les contraintes de qualité des données. Par exemple, tous les enregistrements qui échouent avec les règles marquées avec validity sont supprimés de la table raw_farmers_market :

Remarque

Cet exemple lit des exemples de données incluses dans les jeux de données Databricks. Étant donné que les jeux de données Databricks ne sont pas pris en charge avec un pipeline qui publie dans Unity Catalog, cet exemple fonctionne uniquement avec un pipeline configuré pour publier sur le metastore Hive. Toutefois, ce modèle fonctionne également avec les pipelines compatibles avec Unity Catalog, mais vous devez lire des données à partir d’emplacements externes. Pour en savoir plus sur l’utilisation de Unity Catalog avec Delta Live Tables, consultez l’article Utiliser Unity Catalog avec vos pipelines Delta Live Tables.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Au lieu de créer une table nommée rules pour gérer des règles, vous pouvez créer un module Python pour les gérer, par exemple dans un fichier nommé rules_module.py dans le même dossier que le notebook :

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Modifiez ensuite le notebook précédent en important le module et en modifiant la fonction get_rules() pour lire à partir du module, et non à partir de la table rules :

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )