Azure Databricks에서 첫 번째 ETL 워크로드 실행

Azure Databricks에서 프로덕션 지원 도구를 사용하여 데이터 오케스트레이션을 위한 첫 번째 ETL(추출, 변환 및 로드) 파이프라인을 개발하고 배포하는 방법을 알아봅니다.

이 문서의 학습을 마치면 다음 개념을 이해할 수 있습니다.

  1. Databricks 다목적 컴퓨팅 클러스터 시작.
  2. Databricks Notebook 만들기
  3. 자동 로더를 사용하여 Delta Lake로 증분 데이터 수집 구성.
  4. Notebook 셀을 실행하여 데이터 처리, 쿼리 및 미리 보기
  5. Databricks 작업으로 Notebook 예약

이 자습서에서는 대화형 Notebook을 사용하여 Python 또는 Scala에서 공통 ETL 작업을 완료합니다.

Delta Live Tables를 사용하여 ETL 파이프라인을 빌드할 수도 있습니다. Databricks는 프로덕션 ETL 파이프라인을 빌드, 배포 및 유지 관리하는 복잡성을 줄이기 위해 Delta Live Tables를 만들었습니다. 자습서: 첫 번째 Delta Live Tables 파이프라인을 실행합니다.

Databricks Terraform 공급자를 사용하여 이 문서의 리소스를 만들 수도 있습니다. Terraform을 사용하여 클러스터, Notebooks 및 작업 만들기를 참조하세요.

요구 사항

참고 항목

클러스터 제어 권한이 없어도 클러스터 액세스 권한이 있는 한 아래 단계를 대부분 수행할 수 있습니다.

1단계: 클러스터 만들기

탐색 데이터 분석 및 데이터 엔지니어링을 수행하기 위해 명령 실행에 필요한 컴퓨팅 리소스를 제공하도록 클러스터를 만듭니다.

  1. 사이드바에서 컴퓨팅 아이콘컴퓨팅을 클릭합니다.
  2. 클러스터 페이지에서 클러스터 만들기를 클릭합니다. 그러면 새 클러스터 페이지가 열립니다.
  3. 클러스터의 고유한 이름을 지정하고 나머지 값을 기본 상태로 두고 클러스터 만들기를 클릭합니다.

Databricks 클러스터에 대한 자세한 내용은 Compute를 참조 하세요.

2단계: Databricks Notebook 만들기

Azure Databricks에서 대화형 코드를 작성하고 실행하기 위해 Notebook을 만듭니다.

  1. 사이드바에서 새로 만들기새 아이콘를 클릭한 다음 전자 필기장을 클릭합니다.
  2. 전자 필기장 만들기 페이지에서 다음을 수행합니다.
    • Notebook에 고유한 이름을 지정합니다.
    • 기본 언어가 Python 또는 Scala로 설정되었는지 확인합니다.
    • 클러스터 드롭다운에서 1단계에서 만든 클러스터를 선택합니다.
    • 만들기를 클릭합니다.

맨 위에 빈 셀이 있는 Notebook이 열립니다.

Notebook 만들기 및 관리에 대한 자세한 내용은 Notebook 관리를 참조하세요.

3단계: Delta Lake로 데이터를 수집하도록 자동 로더 구성

Databricks는 증분 데이터 수집을 위해 자동 로더를 사용할 것을 권장합니다. 자동 로더는 클라우드 개체 스토리지에 도착하는 새 파일을 자동으로 감지하고 처리합니다.

Databricks에서는 Delta Lake를 사용하여 데이터를 저장하는 것이 좋습니다. Delta Lake는 ACID 트랜잭션을 제공하고 데이터 레이크하우스를 가능하게 하는 오픈 소스 스토리지 레이어입니다. Delta Lake는 Databricks에서 만든 테이블의 기본 형식입니다.

Delta Lake 테이블에 데이터를 수집하도록 자동 로더를 구성하려면 다음 코드를 복사하여 Notebook의 빈 셀에 붙여넣습니다.

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

참고 항목

이 코드에 정의된 변수를 사용하면 기존 작업 영역 자산 또는 다른 사용자와의 충돌 위험 없이 안전하게 실행할 수 있습니다. 제한된 네트워크 또는 스토리지 권한은 이 코드를 실행할 때 오류를 발생시킵니다. 이러한 제한 사항을 해결하려면 작업 영역 관리자에게 문의하세요.

자동 로더에 대한 자세한 내용은 자동 로더란?을 참조하세요.

4단계: 데이터 처리 및 상호 작용

Notebook은 셀 단위로 논리 셀을 실행합니다. 셀에서 논리를 실행하려면 다음을 수행합니다.

  1. 이전 단계에서 완료한 셀을 실행하려면 셀을 선택하고 Shift+Enter를 누릅니다.

  2. 방금 만든 테이블을 쿼리하려면 다음 코드를 복사하여 빈 셀에 붙여넣은 다음, Shift+Enter를 눌러 셀을 실행합니다.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. DataFrame에서 데이터를 미리 보려면 다음 코드를 복사하여 빈 셀에 붙여넣은 다음, Shift+Enter를 눌러 셀을 실행합니다.

    Python

    display(df)
    

    Scala

    display(df)
    

데이터 시각화를 위한 대화형 옵션에 대한 자세한 내용은 Databricks Notebook의 시각화를 참조 하세요.

5단계: 작업 예약

Databricks 작업에 추가하여 Databricks Notebook을 프로덕션 스크립트로 실행할 수 있습니다. 이 단계에서는 수동으로 트리거할 수 있는 새 작업을 만듭니다.

Notebook을 작업으로 예약하려면 다음을 수행합니다.

  1. 머리글 표시줄 오른쪽에서 예약을 클릭합니다.
  2. 작업 이름에 고유한 이름을 입력합니다.
  3. 수동을 클릭합니다.
  4. 클러스터 드롭다운에서 1단계에서 만든 클러스터를 선택합니다.
  5. 만들기를 클릭합니다.
  6. 표시된 창에서 지금 실행을 클릭합니다.
  7. 작업 실행 결과를 보려면 마지막 실행 타임스탬프 옆에 있는 아이콘을 클릭합니다외부 링크.

작업에 대한 자세한 내용은 Azure Databricks 작업란?을 참조하세요.

추가 통합

Azure Databricks를 사용한 데이터 엔지니어링을 위한 통합 및 도구에 대해 자세히 알아봅니다.