Delta Live Tables 쿡북

이 문서에는 Delta Live Tables 파이프라인에서 일반적인 작업을 구현하기 위한 권장 사항 및 솔루션 컬렉션이 포함되어 있습니다.

예상을 이식 가능하고 재사용 가능하게 만듭니다.

시나리오

여러 테이블에 공통 데이터 품질 규칙 집합을 적용하려고 하거나 데이터 품질 규칙을 개발하고 유지 관리하는 팀 구성원이 파이프라인 개발자와 별개입니다.

솔루션

파이프라인 구현과 별도로 데이터 품질 규칙을 유지 관리합니다. 예를 들어 DBFS, 클라우드 스토리지 또는 Delta 테이블에 저장된 텍스트 파일과 같이 안정적이고 액세스 및 업데이트하기 쉬운 형식으로 규칙을 저장합니다. 다음 예에서는 DBFS에 저장된 rules.csv라는 CSV 파일을 사용하여 규칙을 유지 관리합니다. rules.csv의 각 규칙은 태그별로 분류됩니다. 데이터 세트 정의에서 이 태그를 사용하여 적용할 규칙을 결정합니다.

name, constraint, tag
website_not_null,"Website IS NOT NULL",validity
location_not_null,"Location IS NOT NULL",validity
state_not_null,"State IS NOT NULL",validity
fresh_data,"to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",maintained
social_media_access,"NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",maintained

다음 Python 예시는 rules.csv 파일에 저장된 규칙을 기반으로 데이터 품질 예상을 정의합니다. get_rules() 함수는 rules.csv에서 규칙을 읽고 함수에 전달된 tag 인수와 일치하는 규칙이 포함된 Python 사전을 반환합니다. 사전은 데이터 품질 제약 조건을 적용하기 위해 @dlt.expect_all_*() 데코레이터에 적용됩니다. 예를 들어, validity 태그가 지정된 규칙에 실패한 모든 레코드는 raw_farmers_market 테이블에서 삭제됩니다.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from csv file
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.format("csv").option("header", "true").load("/path/to/rules.csv")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

SQL에서 Python UDF 사용

시나리오

Delta Live Tables 데이터 세트를 정의하는 SQL의 단순성을 원하지만 SQL에서 직접 지원되지 않는 변환이 또한 필요합니다.

솔루션

SQL 쿼리에 Python UDF(사용자 정의 함수)를 사용합니다. 다음 예에서는 square() UDF를 정의 및 등록하여 입력 인수의 제곱을 반환하고 SQL 식에서 square() UDF를 호출합니다.

  1. UDF를 정의하고 등록합니다.

    기본 언어Python으로 설정된 Notebook을 만들고 다음을 셀에 추가합니다:

    def square(i: int) -> int:
      """
        Simple udf for squaring the parameter passed.
        :param i: column from Pyspark or SQL
        :return: squared value of the passed param.
      """
      return i * i
    
    spark.udf.register("makeItSquared", square) # register the square udf for Spark SQL
    
  2. UDF에 전화합니다.

    SQL Notebook을 만들고 셀에 다음 쿼리를 추가합니다.

    CREATE OR REFRESH LIVE TABLE raw_squared
    AS SELECT makeItSquared(2) AS numSquared;
    
  3. 파이프라인 만들기

    만든 Notebooks를 Notebooks 라이브러리에 추가하여 새로운 Delta Live Tables 파이프라인을 만듭니다. Notebooks 라이브러리 추가 단추를 사용하여 파이프라인 만들기 대화 상자에서 Notebooks를 추가하거나 Delta Live Tables 설정libraries 필드에서 Notebooks를 구성합니다.

Delta Live Tables 파이프라인에서 MLFlow 모델 사용

시나리오

파이프라인에서 MLFlow 학습된 모델을 사용하려고 합니다.

솔루션

Delta Live Tables 파이프라인에서 MLFlow 모델을 사용하려면:

  1. MLFlow 모델의 실행 ID 및 모델 이름을 가져옵니다. 실행 ID와 모델 이름은 MLFlow 모델의 URI를 구성하는 데 사용됩니다.
  2. URI를 사용하여 MLFlow 모델을 로드할 Spark UDF를 정의합니다.
  3. MLFlow 모델을 사용하려면 테이블 정의에서 UDF를 호출합니다.

다음 예는 대출 위험 데이터에 대해 학습된 MLFlow 모델을 로드하는 loaded_model이라는 Spark UDF를 정의합니다. 그런 다음 loaded_model UDF를 사용하여 gtb_scoring_train_datagtb_scoring_valid_data 테이블을 정의합니다.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id= "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = "runs:/{run_id}/{model_name}".format(run_id=run_id, model_name=model_name)
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML scored training dataset based on Loan Risk",
  table_properties={
    "quality": "gold"
  }
)
def gtb_scoring_train_data():
  return dlt.read("train_data")
    .withColumn('predictions', loaded_model(struct(features)))

@dlt.table(
  comment="GBT ML scored valid dataset based on Loan Risk",
  table_properties={
    "quality": "gold"
  }
)
def gtb_scoring_valid_data():
  return dlt.read("valid_data")
    .withColumn('predictions', loaded_model(struct(features)))

개발 및 테스트를 위한 샘플 데이터 세트 만들기

시나리오

개발 또는 테스트를 위한 샘플 데이터 세트(예: 데이터의 하위 집합 또는 특정 레코드 유형이 포함된 데이터 세트)를 만들려고 합니다.

솔루션

단일 또는 공유 Notebooks 집합에서 변환 논리를 구현합니다. 그런 다음 별도의 Notebooks를 만들어 환경에 따라 여러 데이터 세트를 정의합니다. 예를 들어 프로덕션에서 파이프라인에 대한 전체 데이터 집합을 정의하는 Notebook을 만듭니다.

CREATE OR REFRESH STREAMING LIVE TABLE input_data AS SELECT * FROM cloud_files("/production/data", "json")

그런 다음 요구 사항에 따라 데이터 샘플을 정의하는 Notebooks를 만듭니다. 예를 들어 테스트를 위해 특정 레코드가 있는 작은 데이터 세트를 생성하려면 다음을 수행합니다.

CREATE OR REFRESH LIVE TABLE input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

데이터를 필터링하여 개발 또는 테스트를 위한 프로덕션 데이터의 하위 집합을 만들 수도 있습니다.

CREATE OR REFRESH LIVE TABLE input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

이러한 다양한 데이터 세트를 사용하려면 변환 논리를 구현하는 Notebooks로 여러 파이프라인을 만듭니다. 각 파이프라인은 LIVE.input_data 데이터 세트에서 데이터를 읽을 수 있지만 환경에 특정한 데이터집합을 만드는 Notebook을 포함하도록 구성됩니다.

프로그래밍 방식으로 여러 라이브 테이블 관리 및 만들기

시나리오

소수의 매개 변수만 다른 여러 흐름 또는 데이터 세트 정의가 포함된 파이프라인이 있습니다. 이러한 중복으로 인해 오류가 발생하기 쉽고 유지 관리가 어려운 파이프라인이 생성됩니다. 예를 들어, 다음 다이어그램은 소방서 데이터 세트를 사용하여 다양한 범주의 비상 호출에 대해 가장 빠른 응답 시간을 가진 동네를 찾는 파이프라인의 그래프를 보여 줍니다. 이 예에서 병렬 흐름은 몇 가지 매개 변수만 다릅니다.

Fire dataset flow diagram

메타프로그래밍 패턴을 사용하여 중복 흐름 정의를 생성하고 유지 관리하는 오버헤드를 줄일 수 있습니다. Delta Live Tables의 메타프로그래밍은 Python 내부 함수를 사용하여 수행됩니다. 이러한 함수는 느리게 평가되기 때문에 입력 매개 변수를 제외하고 동일한 흐름을 만드는 데 사용할 수 있습니다. 각 호출에는 다음 예와 같이 각 테이블이 생성되어야 하는 방법을 제어하는 서로 다른 매개 변수 집합이 포함될 수 있습니다.

import dlt
from pyspark.sql.functions import *

@dlt.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
    .select('call_type', 'received', 'responded', 'neighborhood')
  )

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dlt.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return (
      spark.sql("""
        SELECT
          unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
          unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
          neighborhood
        FROM LIVE.raw_fire_department
        WHERE call_type = '{filter}'
      """.format(filter=filter))
    )

  @dlt.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time "
  )
  def create_response_table():
    return (
      spark.sql("""
        SELECT
          neighborhood,
          AVG((ts_received - ts_responded)) as response_time
        FROM LIVE.{call_table}
        GROUP BY 1
        ORDER BY response_time
        LIMIT 10
      """.format(call_table=call_table))
    )

  all_tables.append(response_table)

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dlt.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dlt.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

잘못된 데이터 격리

시나리오

데이터 품질 제약 조건을 위반하는 레코드를 필터링하기 위한 예상을 정의하는 것뿐만 아니라 분석을 위해 잘못된 레코드도 저장하는 것이 좋습니다.

솔루션

정의한 예상과 반대되는 규칙을 만들고 이러한 규칙을 사용하여 잘못된 레코드를 별도의 테이블에 저장합니다. 프로그래밍 방식으로 이러한 역 규칙을 만들 수 있습니다. 다음 예는 valid_websitevalid_location 데이터 품질 제약 조건을 통과하는 입력 레코드가 포함된 valid_farmers_market 테이블을 만들고 이러한 데이터 품질 제약 조건을 통과하지 못하는 레코드가 포함된 invalid_farmers_market 테이블도 만듭니다.

import dlt

rules = {}
quarantine_rules = {}

rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"

# concatenate inverse rules
quarantine_rules["invalid_record"] = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="valid_farmers_market"
)
@dlt.expect_all_or_drop(rules)
def get_valid_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
  )

@dlt.table(
  name="invalid_farmers_market"
)
@dlt.expect_all_or_drop(quarantine_rules)
def get_invalid_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
  )

위 방법의 단점은 데이터를 두 번 처리하여 격리 테이블을 생성한다는 것입니다. 이러한 성능 오버헤드를 원하지 않는 경우 쿼리 내에서 직접 제약 조건을 사용하여 레코드의 유효성 검사 상태를 나타내는 열을 생성할 수 있습니다. 그런 다음 추가 최적화를 위해 이 열을 기준으로 테이블을 분할할 수 있습니다.

이 방법은 기대치를 사용하지 않으므로 데이터 품질 메트릭은 이벤트 로그 또는 파이프라인 UI에 표시되지 않습니다.

import dlt
from pyspark.sql.functions import expr

rules = {}
quarantine_rules = {}

rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="partitioned_farmers_market",
  partition_cols = [ 'Quarantine' ]
)
def get_partitioned_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .withColumn("Quarantine", expr(quarantine_rules))
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime",
              "Quarantine")
  )

테이블 전체의 행 수 유효성 검사

시나리오

행 삭제 없이 데이터가 성공적으로 처리되었는지 확인하려면 두 개의 라이브 테이블 간에 행 수를 비교해야 합니다.

솔루션

비교를 수행할 기대치를 정의하는 추가 테이블을 파이프라인에 추가합니다. 이 예상 결과는 이벤트 로그 및 Delta Live Tables UI에 나타납니다. 이 예에서는 tblatblb 테이블 간의 동일한 행 수의 유효성을 검사합니다.

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

수동 삭제 또는 업데이트 유지

시나리오

애플리케이션은 테이블의 레코드를 임의로 삭제하거나 업데이트하고 모든 다운스트림 테이블을 다시 계산해야 합니다. 다음 다이어그램은 두 개의 스트리밍 라이브 테이블을 보여 줍니다.

  • raw_user_table은 원본에서 원시 사용자 데이터 집합을 수집합니다.
  • bmi_tableraw_user_table의 체중과 키를 사용하여 BMI 점수를 점진적으로 계산합니다.

개인 정보 보호 요구 사항을 준수하려면 raw_user_table에서 사용자 레코드를 삭제하거나 업데이트하고 bmi_table을 다시 계산해야 합니다.

Retain data diagram

raw_user_table에서 레코드를 수동으로 삭제하거나 업데이트하고 새로 고침 작업을 수행하여 다운스트림 테이블을 다시 계산할 수 있습니다. 그러나 삭제된 레코드가 원본 데이터에서 다시 로드되지 않도록 해야 합니다.

솔루션

pipelines.reset.allowed 테이블 속성을 사용하여 raw_user_table에 대한 전체 새로 고침을 사용하지 않도록 설정하여 의도한 변경 내용이 시간이 지나도 유지되도록 합니다.

CREATE OR REFRESH STREAMING LIVE TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING LIVE TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);

pipelines.reset.allowedfalse로 설정하면 raw_user_table에 대한 새로 고침이 방지되지만 테이블에 대한 증분 쓰기가 방지되거나 데이터가 테이블로 유입되는 것을 방지할 수는 없습니다.

게시에서 테이블 제외

시나리오

테이블을 게시하도록 target 설정을 구성했지만 게시하고 싶지 않은 테이블이 있습니다.

솔루션

temporary 테이블을 정의하여 Delta Live Tables가 테이블에 대한 메타데이터를 유지하지 않도록 지시합니다.

CREATE TEMPORARY LIVE TABLE customers_raw
AS SELECT * FROM json.`/data/customers/json/`
@dlt.table(
  comment="Raw customer data",
  temporary=True)
def customers_raw():
  return ("...")

파이프라인에서 비밀 사용

시나리오

파이프라인(예: 클라우드 데이터 스토리지 또는 데이터베이스)에서 데이터 원본에 인증해야 하며 Notebook 또는 구성에 자격 증명을 포함하지 않습니다.

솔루션

Azure Databricks 비밀을 사용하여 액세스 키 또는 암호와 같은 자격 증명을 저장합니다. 파이프라인에서 비밀을 구성하려면 파이프라인 설정 클러스터 구성에서 Spark 속성을 사용합니다.

다음 예제에서는 비밀과 자동 로더를 사용하여 ADLS Gen2(Azure Data Lake Storage Gen2) 스토리지 계정에서 입력 데이터를 읽는 데 필요한 액세스 키를 저장합니다. 이 동일한 방법을 사용하여 파이프라인에 필요한 비밀(예: S3에 액세스하기 위한 AWS 키 또는 Apache Hive 메타스토어에 대한 암호)을 구성할 수 있습니다.

ADLS Gen2 스토리지 계정, 액세스 키 및 액세스 키를 저장하는 Azure Databricks 비밀을 만드는 방법에 대한 자세한 내용은 Azure Data Lake Storage Gen2 시작을 참조하세요.

참고

비밀 값을 설정하는 spark_conf 구성 키에 spark.hadoop. 접두사를 추가해야 합니다.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "label": "default",
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5
        }
      },
      {
        "label": "maintenance",
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

바꾸기

  • <storage-account-name>을 ADLS Gen2 스토리지 계정 이름으로 바꿉니다.
  • <scope-name>을 Azure Databricks 비밀 범위 이름으로 바꿉니다.
  • <secret-name>을 Azure Storage 계정 액세스 키가 포함된 키 이름으로 바꿉니다.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

바꾸기

  • 입력 데이터를 저장하는 Azure Storage 계정 컨테이너 이름을 사용하는 <container-name>입니다.
  • <storage-account-name>을 ADLS Gen2 스토리지 계정 이름으로 바꿉니다.
  • 입력 데이터 세트의 경로를 사용하는 <path-to-input-dataset>입니다.