Delta Live Tables를 사용하여 데이터 로드

Delta Live Tables를 사용하여 Azure Databricks의 Apache Spark에서 지원하는 모든 데이터 원본에서 데이터를 로드할 수 있습니다. 스트리밍 DataFrames 및 Pandas for Spark DataFrames를 포함하여 Spark DataFrame을 반환하는 모든 쿼리에 대해 Delta Live Tables에서 데이터 세트(테이블 및 뷰)를 정의할 수 있습니다. 데이터 수집 작업의 경우 Databricks는 대부분의 사용 사례에 대해 스트리밍 테이블을 사용할 것을 권장합니다. 스트리밍 테이블은 Auto Loader를 사용하는 클라우드 개체 스토리지 또는 Kafka와 같은 메시지 버스에서 데이터를 수집하는 데 적합합니다. 아래 예는 몇 가지 일반적인 패턴을 보여줍니다.

Important

모든 데이터 원본에 SQL이 지원되는 것은 아닙니다. Delta Live Tables 파이프라인에서 SQL 및 Python 전자 필기장을 혼합하여 수집 이외의 모든 작업에 SQL을 사용할 수 있습니다.

기본적으로 Delta Live Tables에 패키징되지 않은 라이브러리 작업에 대한 자세한 내용은 파이프라인 종속성을 참조하세요.

클라우드 개체 스토리지에서 파일 로드

Databricks는 클라우드 개체 스토리지에서 대부분의 데이터 수집 작업에 델타 라이브 테이블과 함께 자동 로더를 사용하는 것이 좋습니다. 자동 로더 및 델타 라이브 테이블은 클라우드 스토리지에 도착할 때 지속적으로 증가하는 데이터를 증분 및 멱등하게 로드하도록 설계되었습니다. 다음 예제에서는 자동 로더를 사용하여 CSV 및 JSON 파일에서 데이터 세트를 만듭니다.

참고 항목

Unity 카탈로그를 사용하는 파이프라인에서 자동 로더를 사용하여 파일을 로드하려면 외부 위치를 사용해야 합니다. Delta Live Tables과 함께 Unity 카탈로그를 사용하는 방법에 대해 자세히 알아보려면 Delta Live Tables 파이프라인과 함께 Unity 카탈로그 사용을 참조하세요.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

자동 로더란?자동 로더 SQL 구문을 참조하세요.

Warning

파일 알림과 함께 자동 로더를 사용하고 파이프라인 또는 스트리밍 테이블에 대해 전체 새로 고침을 실행하는 경우 리소스를 수동으로 클린 합니다. Notebook에서 CloudFilesResourceManager를 사용하여 클린 수행할 수 있습니다.

메시지 버스에서 데이터 로드

스트리밍 테이블을 사용하여 메시지 버스에서 데이터를 수집하도록 Delta Live Tables 파이프라인을 구성할 수 있습니다. Databricks는 스트리밍 테이블을 연속 실행 및 향상된 자동 크기 조정과 결합하여 메시지 버스에서 대기 시간이 짧은 로드에 가장 효율적인 수집을 제공하는 것이 좋습니다. 향상된 자동 크기 조정이란?을 참조하세요.

예를 들어 다음 코드는 Kafka에서 데이터를 수집하도록 스트리밍 테이블을 구성합니다.

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

다음 예제와 같이 순수 SQL에서 다운스트림 작업을 작성하여 이 데이터에 대한 스트리밍 변환을 수행할 수 있습니다.

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Event Hubs 를 사용하는 예제는 Azure Event Hubs를 델타 라이브 테이블 데이터 원본으로 사용하세요.

스트리밍 데이터 원본 구성을 참조 하세요.

외부 시스템에서 데이터 로드

Delta Live Tables는 Azure Databricks에서 지원하는 모든 데이터 원본에서 데이터 로드를 지원합니다. 데이터 원본에 대한 커넥트 참조하세요. 지원되는 데이터 원본에 대해 Lakehouse Federation을 사용하여 외부 데이터를 로드할 수도 있습니다. 레이크하우스 페더레이션에는 Databricks Runtime 13.1 이상이 필요하므로 Lakehouse 페더레이션을 사용하려면 미리 보기 채널을 사용하도록 파이프라인을 구성해야 합니다.

일부 데이터 원본은 SQL에서 동등한 지원을 제공하지 않습니다. 이러한 데이터 원본 중 하나에서 Lakehouse Federation을 사용할 수 없는 경우 독립 실행형 Python Notebook을 사용하여 원본에서 데이터를 수집할 수 있습니다. 그런 다음 이 Notebook을 SQL Notebook을 사용하여 원본 라이브러리로 추가하여 Delta Live Tables 파이프라인을 빌드할 수 있습니다. 다음 예제에서는 구체화된 뷰를 선언하여 원격 PostgreSQL 테이블의 현재 데이터 상태에 액세스합니다.

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

클라우드 개체 스토리지에서 소규모 또는 정적 데이터 세트 로드

Apache Spark 로드 구문을 사용하여 작거나 정적 데이터 세트를 로드할 수 있습니다. Delta Live Tables는 Azure Databricks의 Apache Spark에서 지원하는 모든 파일 형식을 지원합니다. 전체 목록은 데이터 형식 옵션을 참조 하세요.

다음 예제에서는 델타 라이브 테이블 테이블을 만들기 위해 JSON을 로드하는 방법을 보여 줍니다.

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

참고 항목

SELECT * FROM format.`path`; SQL 구문은 Azure Databricks의 모든 SQL 환경에 공통적으로 적용됩니다. 델타 라이브 테이블과 함께 SQL을 사용하는 직접 파일 액세스에 권장되는 패턴입니다.

파이프라인에서 비밀을 사용하여 스토리지 자격 증명에 안전하게 액세스

Azure Databricks 비밀을 사용하여 액세스 키 또는 암호와 같은 자격 증명을 저장할 수 있습니다. 파이프라인에서 비밀을 구성하려면 파이프라인 설정 클러스터 구성에서 Spark 속성을 사용합니다. 컴퓨팅 설정 구성을 참조하세요.

다음 예제에서는 비밀과 자동 로더를 사용하여 ADLS Gen2(Azure Data Lake Storage Gen2) 스토리지 계정에서 입력 데이터를 읽는 데 필요한 액세스 키를 저장합니다. 이 동일한 방법을 사용하여 파이프라인에 필요한 비밀(예: S3에 액세스하기 위한 AWS 키 또는 Apache Hive 메타스토어에 대한 암호)을 구성할 수 있습니다.

Azure Data Lake Storage Gen2 작업에 대한 자세한 내용은 Azure Data Lake Storage Gen2 및 Blob Storage에 대한 커넥트 참조하세요.

참고 항목

비밀 값을 설정하는 spark_conf 구성 키에 spark.hadoop. 접두사를 추가해야 합니다.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

바꾸기

  • <storage-account-name>을 ADLS Gen2 스토리지 계정 이름으로 바꿉니다.
  • <scope-name>을 Azure Databricks 비밀 범위 이름으로 바꿉니다.
  • <secret-name>을 Azure Storage 계정 액세스 키가 포함된 키 이름으로 바꿉니다.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

바꾸기

  • 입력 데이터를 저장하는 Azure Storage 계정 컨테이너 이름을 사용하는 <container-name>입니다.
  • <storage-account-name>을 ADLS Gen2 스토리지 계정 이름으로 바꿉니다.
  • 입력 데이터 세트의 경로를 사용하는 <path-to-input-dataset>입니다.