Delta Live Tables를 사용하여 데이터 품질 관리

데이터 세트 내용에 대한 데이터 품질 제약 조건을 정의하기 위해 기대치를 사용합니다. 기대에 따라 테이블에 도착하는 데이터가 데이터 품질 요구 사항을 충족하도록 보장하고 각 파이프라인 업데이트의 데이터 품질에 대한 인사이트를 제공할 수 있습니다. Python 데코레이터 또는 SQL 제약 조건 절을 사용하여 쿼리에 기대치를 적용합니다.

Delta Live Tables의 기대치는 무엇인가요?

예상은 쿼리를 통과하는 각 레코드에 데이터 품질 검사 적용하는 Delta Live Tables 데이터 세트 선언에 추가하는 선택적 절입니다.

기대치는 다음 세 가지로 구성됩니다.

  • 고유 식별자 역할을 하며 제약 조건에 대한 메트릭을 추적할 수 있는 설명입니다.
  • 지정된 조건에 따라 항상 true 또는 false를 반환하는 부울 문입니다.
  • 레코드가 예상에 실패할 때 수행할 작업입니다. 즉, 부울이 false를 반환합니다.

다음 행렬은 잘못된 레코드에 적용할 수 있는 세 가지 작업을 보여 줍니다.

작업 결과
warn (기본값) 잘못된 레코드가 대상에 기록됩니다. 오류는 데이터 세트에 대한 메트릭으로 보고됩니다.
드롭 데이터가 대상에 기록되기 전에 잘못된 레코드가 삭제됩니다. 오류는 데이터 세트에 대한 메트릭으로 보고됩니다.
fail 레코드가 잘못되었어도 업데이트가 성공하지 않습니다. 다시 처리하기 전에 수동 개입이 필요합니다.

Delta Live Tables 이벤트 로그를 쿼리하여 예상을 위반한 레코드 수와 같은 데이터 품질 메트릭을 볼 수 있습니다. 델타 라이브 테이블 파이프라인 모니터링을 참조 하세요.

Delta Live Tables 데이터 세트 선언 구문에 대한 전체 참조는 Delta Live Tables Python 언어 참조 또는 Delta Live Tables SQL 언어 참조를 참조하세요.

참고 항목

모든 기대에 여러 절을 포함할 수 있지만 Python만 여러 기대에 따라 작업 정의를 지원합니다. 여러 기대치를 참조 하세요.

잘못된 레코드 보관

예상을 위반하는 레코드를 유지하려면 expect 연산자를 사용합니다. 예상을 위반하는 레코드는 유효한 레코드와 함께 대상 데이터 세트에 추가됩니다.

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

잘못된 레코드 삭제

연산자를 expect or drop 사용하여 잘못된 레코드의 추가 처리를 방지합니다. 예상을 위반하는 레코드는 대상 데이터 세트에서 삭제됩니다.

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

잘못된 레코드 실패

잘못된 레코드를 사용할 수 없는 경우 레코드 유효성 검사에 expect or fail 실패할 때 즉시 연산자를 사용하여 실행을 중지합니다. 작업이 테이블 업데이트인 경우 시스템은 트랜잭션을 원자성으로 롤백합니다.

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

예상 위반으로 인해 파이프라인이 실패하면 파이프라인을 다시 실행하기 전에 잘못된 데이터를 올바르게 처리하도록 파이프라인 코드를 수정해야 합니다.

실패 예상은 위반을 쿼리하고 보고하는 데 필요한 정보를 추적하기 위해 변환의 Spark 쿼리 계획을 수정합니다. 많은 쿼리의 경우 이 정보를 사용하여 위반을 초래한 입력 레코드를 식별할 수 있습니다. 다음은 예외의 예입니다.

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

여러 예상

Python 파이프라인에서 하나 이상의 데이터 품질 제약 조건으로 기대치를 정의할 수 있습니다. 이러한 데코레이터는 Python 사전을 인수로 허용합니다. 여기서 키는 예상 이름이고 값은 예상 제약 조건입니다.

유효성 검사에 실패한 레코드가 대상 데이터 세트에 포함되어야 하는 경우 expect_all을 사용하여 여러 데이터 품질 제약 조건을 지정합니다.

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

유효성 검사에 실패한 레코드를 대상 데이터 세트에서 삭제해야 하는 경우 expect_all_or_drop을 사용하여 여러 데이터 품질 제약 조건을 지정합니다.

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

유효성 검사에 실패한 레코드가 파이프라인 실행을 중지해야 하는 경우 expect_all_or_fail을 사용하여 여러 데이터 품질 제약 조건을 지정합니다.

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

예상 컬렉션을 변수로 정의하고 파이프라인의 하나 이상의 쿼리에 전달할 수도 있습니다.

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

잘못된 데이터 격리

다음 예제에서는 임시 테이블 및 뷰와 함께 기대치를 사용합니다. 이 패턴은 파이프라인 업데이트 중에 예상 검사 전달하는 레코드에 대한 메트릭을 제공하고 다른 다운스트림 경로를 통해 유효하고 잘못된 레코드를 처리하는 방법을 제공합니다.

참고 항목

이 예제에서는 Databricks 데이터 세트포함된 샘플 데이터를 읽습니다. Databricks 데이터 세트는 Unity 카탈로그에 게시되는 파이프라인에서 지원되지 않으므로 이 예제는 Hive 메타스토어에 게시하도록 구성된 파이프라인에서만 작동합니다. 그러나 이 패턴은 Unity 카탈로그 사용 파이프라인에서도 작동하지만 외부 위치에서 데이터를 읽어야 합니다. Delta Live Tables과 함께 Unity 카탈로그를 사용하는 방법에 대해 자세히 알아보려면 Delta Live Tables 파이프라인과 함께 Unity 카탈로그 사용을 참조하세요.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

테이블 전체의 행 수 유효성 검사

파이프라인에 두 개의 라이브 테이블 간의 행 개수를 비교할 수 있는 기대치를 정의하는 테이블을 추가할 수 있습니다. 이 예상 결과는 이벤트 로그 및 Delta Live Tables UI에 나타납니다. 다음 예제에서는 테이블과 tblb 테이블 간의 동일한 행 개수의 유효성을 tbla 검사합니다.

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Delta Live Tables 기대치로 고급 유효성 검사 수행

집계 및 조인 쿼리를 사용하여 라이브 테이블을 정의하고 이러한 쿼리의 결과를 기대치 검사의 일부로 사용할 수 있습니다. 예를 들어 파생 테이블에 원본 테이블의 모든 레코드가 포함되도록 하거나 테이블 간에 숫자 열의 같음을 보장하는 등 복잡한 데이터 품질 검사 수행하려는 경우에 유용합니다. 키워드(keyword) 사용하여 TEMPORARY 이러한 테이블이 대상 스키마에 게시되지 않도록 할 수 있습니다.

다음 예에서는 모든 예상 레코드가 report 테이블에 있는지 유효성을 검사합니다.

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

다음 예에서는 집계를 사용하여 기본 키의 고유성을 보장합니다.

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

예상을 이동 가능하고 재사용 가능하게 만들기

파이프라인 구현과 별도로 데이터 품질 규칙을 기본 수 있습니다.

Databricks는 태그로 분류된 각 규칙을 사용하여 델타 테이블에 규칙을 저장하는 것이 좋습니다. 데이터 세트 정의에서 이 태그를 사용하여 적용할 규칙을 결정합니다.

다음 예제에서는 기본 규칙을 기본 테이블을 rules 만듭니다.

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

다음 Python 예제에서는 테이블에 저장된 규칙을 기반으로 데이터 품질 기대치를 rules 정의합니다. 함수는 get_rules() 테이블에서 규칙을 읽고 함수에 rules 전달된 인수와 일치하는 tag 규칙을 포함하는 Python 사전을 반환합니다. 사전은 데이터 품질 제약 조건을 적용하기 위해 @dlt.expect_all_*() 데코레이터에 적용됩니다. 예를 들어, validity 태그가 지정된 규칙에 실패한 모든 레코드는 raw_farmers_market 테이블에서 삭제됩니다.

참고 항목

이 예제에서는 Databricks 데이터 세트포함된 샘플 데이터를 읽습니다. Databricks 데이터 세트는 Unity 카탈로그에 게시되는 파이프라인에서 지원되지 않으므로 이 예제는 Hive 메타스토어에 게시하도록 구성된 파이프라인에서만 작동합니다. 그러나 이 패턴은 Unity 카탈로그 사용 파이프라인에서도 작동하지만 외부 위치에서 데이터를 읽어야 합니다. Delta Live Tables과 함께 Unity 카탈로그를 사용하는 방법에 대해 자세히 알아보려면 Delta Live Tables 파이프라인과 함께 Unity 카탈로그 사용을 참조하세요.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

규칙을 기본이라는 테이블을 rules 만드는 대신 Notebook과 동일한 폴더에 있는 rules_module.py 파일에서 규칙을 기본 Python 모듈을 만들 수 있습니다.

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

그런 다음, 모듈을 가져오고 테이블 대신 모듈에서 읽을 함수를 변경 get_rules() 하여 이전 Notebook을 rules 수정합니다.

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )