Gerenciar a qualidade dos dados com o Delta Live Tables

Para definir restrições de qualidade de dados no conteúdo de um conjunto de dados são usadas expectativas. As expectativas permitem que você garanta que os dados que chegam às tabelas atendam aos requisitos de qualidade de dados e fornecem insights sobre a qualidade dos dados para cada atualização do pipeline. As expectativas são aplicadas às consultas com o uso de decoradores do Python ou cláusulas de restrição do SQL.

O que são as expectativas do Delta Live Tables?

Expectativas são cláusulas opcionais que você adiciona às declarações do conjunto de dados do Delta Live Tables que aplicam verificações de qualidade de dados em cada registro sendo examinado por uma consulta.

Uma expectativa consiste de três elementos:

  • Uma descrição, que atua como um identificador exclusivo e permite que você acompanhe as métricas para a restrição.
  • Uma instrução booliana que sempre retorna true ou false com base em alguma condição declarada.
  • Uma providência a ser tomada quando um registro não corresponde à expectativa, significando que a instrução booliana retorna um false.

A matriz a seguir mostra as três ações que você pode aplicar a registros inválidos:

Ação Resultado
avisar (padrão) Os registros inválidos são gravados no destino; a falha é reportada como uma métrica para o conjunto de dados.
remover Os registros inválidos são descartados antes que os dados sejam gravados no destino; a falha é reportada como uma métrica para o conjunto de dados.
fail Os registros inválidos impedem que a atualização seja bem-sucedida. Uma intervenção manual é necessária antes do reprocessamento.

Você pode exibir métricas de qualidade de dados, como o número de registros que violam uma expectativa consultando o log de eventos do Delta Live Tables. Consulte Monitorar pipelines de tabelas dinâmicas Delta.

Para obter uma referência completa da sintaxe da declaração do conjunto de dados do Delta Live Tables, confira a Referência da linguagem Python no Delta Live Tables ou a Referência da linguagem SQL no Delta Live Tables.

Observação

Embora você possa incluir várias cláusulas em qualquer expectativa, apenas o Python dá suporte à definição de ações baseadas em várias expectativas. Confira Várias expectativas.

Reter registros inválidos

Use o operador expect quando desejar manter registros que violem a expectativa. Registros que violem a expectativa são adicionados ao conjunto de dados de destino junto com os registos válidos:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Remover registros inválidos

Use o operador expect or drop para impedir o processamento adicional de registros inválidos. Registros que violem a expectativa são removidos do conjunto de dados de destino:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Falha em registros inválidos

Quando os registros inválidos forem inaceitáveis, use o operador expect or fail para interromper a execução imediatamente quando um registro for reprovado na validação. Se a operação for uma atualização de tabela, o sistema reverterá atomicamente a transação:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Quando um pipeline falha devido a uma violação de expectativa, você deve corrigir o código do pipeline para que manipule os dados inválidos corretamente antes de executá-lo novamente.

As expectativas de falha modificam o plano de consulta do Spark das suas transformações para acompanhar as informações necessárias à detecção e ao relato de violações. Para muitas consultas, você pode usar essas informações para identificar qual registro de entrada resultou na violação. Este é um exemplo de exceção:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Várias expectativas

Você pode definir expectativas com uma ou mais restrições de qualidade de dados nos pipelines do Python. Esses decoradores aceitam um dicionário do Python como argumento, sendo a chave o nome da expectativa e o valor, a restrição de expectativa.

Use expect_all para especificar várias restrições de qualidade de dados quando registros que falhem na validação precisarem ser incluídos no conjunto de dados de destino:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_drop para especificar várias restrições de qualidade de dados quando registros que falhem na validação precisarem ser removidos do conjunto de dados de destino:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_fail para especificar várias restrições de qualidade de dados quando registros que falhem na validação precisarem interromper a execução do pipeline:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Você também pode definir uma coleção de expectativas como variável e transmiti-la para uma ou mais consultas no seu pipeline:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Colocar dados inválidos em quarentena

O exemplo a seguir usa as expectativas em combinação com exibições e tabelas temporárias. Esse padrão lhe fornece métricas para registros submetidos a verificações de expectativas durante as atualizações de pipelines e uma maneira de processar registros válidos e inválidos por meio de diferentes caminhos downstream.

Observação

Esse exemplo lê amostras de dados incluídas nos conjuntos de dados do Databricks. Como os conjuntos de dados do Databricks não são compatíveis com pipelines que publiquem no Catálogo do Unity, esse exemplo funciona apenas com um pipeline configurado para publicar no metastore do Hive. No entanto, esse padrão também funciona com pipelines habilitados para o Catálogo do Unity, mas você vai precisar ler dados em locais externos. Para saber mais sobre como usar o Catálogo do Unity com o Delta Live Tables, confira Usar o Catálogo do Unity com seus pipelines do Delta Live Tables.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Validar contagens de linha entre tabelas

Você pode adicionar ao seu pipeline uma tabela extra, que defina uma expectativa para comparar os números de linhas de duas tabelas dinâmicas. Os resultados dessa expectativa aparecem no log de eventos e na interface do usuário do Delta Live Tables. O exemplo a seguir valida os números de linhas iguais nas tabelas tbla e tblb:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Executar validação avançada com as expectativas de Tabelas Dinâmicas Delta

Você pode definir tabelas dinâmicas usando consultas de agregação e junção e usar os resultados dessas consultas como parte da verificação de expectativas. Isso é útil se você precisar executar verificações complexas de qualidade de dados, como, por exemplo, se certificando de que uma tabela derivada contenha todos os registros da tabela de origem ou garantindo a igualdade entre colunas numéricas nas diferentes tabelas. Você pode usar a palavra-chave TEMPORARY para impedir que essas tabelas sejam publicadas no esquema de destino.

O exemplo a seguir valida que todos os registros esperados estão presentes na tabela report:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

O exemplo a seguir usa uma agregação para garantir a exclusividade de uma chave primária:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Tornar as expectativas portáteis e reutilizáveis

Você pode manter as regras de qualidade de dados separadas das implementações do seu pipeline.

O Databricks recomenda armazenar as regras em uma tabela Delta com cada regra categorizada por uma tag. Essa tag deve ser usada nas definições de conjuntos de dados para determinar quais regras aplicar.

O exemplo a seguir cria uma tabela chamada rules para manter as regras:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

O exemplo do Python a seguir define as expectativas de qualidade de dados com base nas regras armazenadas na tabela rules. A função get_rules() lê as regras na tabela de rules e retorna um dicionário do Python contendo regras que correspondem ao argumento tag repassado para a função. O dicionário é aplicado nos decoradores @dlt.expect_all_*() para impor restrições de qualidade de dados. Por exemplo, todo registro que falhar nas regras marcadas com validity será descartado da tabela raw_farmers_market:

Observação

Esse exemplo lê amostras de dados incluídas nos conjuntos de dados do Databricks. Como os conjuntos de dados do Databricks não são compatíveis com pipelines que publiquem no Catálogo do Unity, esse exemplo funciona apenas com um pipeline configurado para publicar no metastore do Hive. No entanto, esse padrão também funciona com pipelines habilitados para o Catálogo do Unity, mas você vai precisar ler dados em locais externos. Para saber mais sobre como usar o Catálogo do Unity com o Delta Live Tables, confira Usar o Catálogo do Unity com seus pipelines do Delta Live Tables.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Em vez de criar uma tabela chamada rules para manter as regras, você pode criar um módulo Python para as regras principais, por exemplo, em um arquivo chamado rules_module.py na mesma pasta que o notebook:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Em seguida, modifique o notebook anterior importando o módulo e alterando a função get_rules() para ler do módulo em vez de ler da tabela rules:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )