첫 번째 구조적 스트리밍 워크로드 실행

이 문서에서는 Azure Databricks에서 첫 번째 구조적 스트리밍 쿼리를 실행하는 데 필요한 기본 개념에 대한 코드 예제 및 설명을 제공합니다. 거의 실시간 및 증분 처리 워크로드에 구조적 스트리밍을 사용할 수 있습니다.

구조적 스트리밍은 델타 라이브 테이블의 스트리밍 테이블에 전원을 공급하는 여러 기술 중 하나입니다. Databricks는 모든 새 ETL, 수집 및 구조적 스트리밍 워크로드에 델타 라이브 테이블을 사용하는 것이 좋습니다. 델타 라이브 테이블이란?을 참조하세요.

참고 항목

Delta Live Tables는 스트리밍 테이블을 선언하기 위해 약간 수정된 구문을 제공하지만 스트리밍 읽기 및 변환을 구성하기 위한 일반 구문은 Azure Databricks의 모든 스트리밍 사용 사례에 적용됩니다. 또한 Delta Live Tables는 상태 정보, 메타데이터 및 다양한 구성을 관리하여 스트리밍을 간소화합니다.

데이터 스트림에서 읽기

구조적 스트리밍을 사용하여 지원되는 데이터 원본에서 데이터를 증분 방식으로 수집할 수 있습니다. Azure Databricks 구조적 스트리밍 워크로드에 사용되는 가장 일반적인 데이터 원본 중 일부는 다음과 같습니다.

  • 클라우드 개체 스토리지의 데이터 파일
  • 메시지 버스 및 큐
  • Delta Lake

Databricks는 클라우드 개체 스토리지에서 스트리밍 수집에 자동 로더를 사용하는 것이 좋습니다. 자동 로더는 구조적 스트리밍에서 지원하는 대부분의 파일 형식을 지원합니다. 자동 로더란?을 참조하세요.

각 데이터 원본은 데이터 일괄 처리를 로드하는 방법을 지정하는 다양한 옵션을 제공합니다. 판독기 구성 중에 설정해야 할 수 있는 기본 옵션은 다음 범주로 분류됩니다.

  • 데이터 원본 또는 형식(예: 파일 형식, 구분 기호 및 스키마)을 지정하는 옵션입니다.
  • 원본 시스템에 대한 액세스를 구성하는 옵션(예: 포트 설정 및 자격 증명).
  • 스트림에서 시작할 위치를 지정하는 옵션(예: Kafka 오프셋 또는 모든 기존 파일 읽기).
  • 각 일괄 처리에서 처리되는 데이터의 양(예: 일괄 처리당 최대 오프셋, 파일 또는 바이트)을 제어하는 옵션입니다.

자동 로더를 사용하여 개체 스토리지에서 스트리밍 데이터 읽기

다음 예제에서는 형식 및 옵션을 나타내는 데 사용하는 자동 로더를 사용하여 cloudFiles JSON 데이터를 로드하는 방법을 보여 줍니다. 이 schemaLocation 옵션을 사용하면 스키마 유추 및 진화가 가능합니다. Databricks Notebook 셀에 다음 코드를 붙여넣고 셀을 실행하여 다음과 같은 스트리밍 DataFrame을 raw_df만듭니다.

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Azure Databricks의 다른 읽기 작업과 마찬가지로 스트리밍 읽기를 구성해도 실제로 데이터가 로드되지는 않습니다. 스트림이 시작되기 전에 데이터에 대한 작업을 트리거해야 합니다.

참고 항목

스트리밍 DataFrame을 호출 display() 하면 스트리밍 작업이 시작됩니다. 대부분의 구조적 스트리밍 사용 사례의 경우 스트림을 트리거하는 작업은 싱크에 데이터를 작성해야 합니다. 프로덕션을 위한 구조적 스트리밍 코드 준비를 참조하세요.

스트리밍 변환 수행

구조적 스트리밍은 Azure Databricks 및 Spark SQL에서 사용할 수 있는 대부분의 변환을 지원합니다. MLflow 모델을 UDF로 로드하고 스트리밍 예측을 변환으로 만들 수도 있습니다.

다음 코드 예제에서는 Spark SQL 함수를 사용하여 수집된 JSON 데이터를 추가 정보로 보강하는 간단한 변환을 완료합니다.

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

결과에 transformed_df 는 데이터 원본에 도착할 때 각 레코드를 로드하고 변환하는 쿼리 지침이 포함됩니다.

참고 항목

구조적 스트리밍은 데이터 원본을 무제한 또는 무한 데이터 세트로 처리합니다. 따라서 일부 변환은 무한 개수의 항목을 정렬해야 하므로 구조적 스트리밍 워크로드에서 지원되지 않습니다.

대부분의 집계 및 많은 조인에는 워터마크, 창 및 출력 모드를 사용하여 상태 정보를 관리해야 합니다. 데이터 처리 임계값을 제어하려면 워터마크 적용을 참조 하세요.

데이터 싱크에 쓰기

데이터 싱크는 스트리밍 쓰기 작업의 대상입니다. Azure Databricks 스트리밍 워크로드에 사용되는 일반적인 싱크에는 다음이 포함됩니다.

  • Delta Lake
  • 메시지 버스 및 큐
  • 키-값 데이터베이스

데이터 원본과 마찬가지로 대부분의 데이터 싱크는 대상 시스템에 데이터를 쓰는 방법을 제어하는 다양한 옵션을 제공합니다. 기록기 구성 중에 설정해야 할 수 있는 기본 옵션은 다음 범주로 분류됩니다.

  • 출력 모드(기본적으로 추가).
  • 검사포인트 위치입니다(각 기록기에 필요).
  • 트리거 간격; 구조적 스트리밍 트리거 간격 구성을 참조 하세요.
  • 데이터 싱크 또는 형식을 지정하는 옵션(예: 파일 형식, 구분 기호 및 스키마).
  • 대상 시스템에 대한 액세스를 구성하는 옵션(예: 포트 설정 및 자격 증명).

Delta Lake에 증분 일괄 처리 쓰기 수행

다음 예제에서는 지정된 파일 경로 및 검사point를 사용하여 Delta Lake에 씁니다.

Important

항상 구성한 각 스트리밍 기록기에 대해 고유한 검사point 위치를 지정해야 합니다. 검사point는 스트림에 대한 고유 ID를 제공하여 처리된 모든 레코드와 스트리밍 쿼리와 관련된 상태 정보를 추적합니다.

트리거에 대한 설정은 availableNow 구조적 스트리밍이 원본 데이터 세트에서 이전에 처리되지 않은 모든 레코드를 처리한 다음 종료하도록 지시하므로 스트림을 실행하지 않고도 다음 코드를 안전하게 실행할 수 있습니다.

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

이 예제에서는 데이터 원본에 새 레코드가 도착하지 않으므로 이 코드의 반복 실행은 새 레코드를 수집하지 않습니다.

Warning

구조적 스트리밍 실행은 자동 종료가 컴퓨팅 리소스를 종료하는 것을 방지할 수 있습니다. 예기치 않은 비용을 방지하려면 스트리밍 쿼리를 종료해야 합니다.

프로덕션을 위한 구조적 스트리밍 코드 준비

Databricks는 대부분의 구조적 스트리밍 워크로드에 델타 라이브 테이블을 사용하는 것이 좋습니다. 다음 권장 사항은 프로덕션을 위한 구조적 스트리밍 워크로드를 준비하기 위한 시작점을 제공합니다.

  • 와 같은 displaycount결과를 반환하는 Notebook에서 불필요한 코드를 제거합니다.
  • 대화형 클러스터에서 구조적 스트리밍 워크로드를 실행하지 마세요. 항상 스트림을 작업으로 예약합니다.
  • 스트리밍 작업이 자동으로 복구되도록 하려면 무한 다시 시도로 작업을 구성합니다.
  • 구조적 스트리밍을 사용하는 워크로드에는 자동 크기 조정을 사용하지 마세요.

자세한 권장 사항은 구조적 스트리밍에 대한 프로덕션 고려 사항을 참조 하세요.

Delta Lake에서 데이터를 읽고, 변환하고, Delta Lake에 쓰기

Delta Lake는 구조적 스트리밍을 원본 및 싱크로 사용할 수 있는 광범위한 지원을 제공합니다. 델타 테이블 스트리밍 읽기 및 쓰기를 참조 하세요.

다음 예제에서는 델타 테이블에서 모든 새 레코드를 증분 방식으로 로드하고, 다른 Delta 테이블의 스냅샷 조인하고, Delta 테이블에 쓰는 예제 구문을 보여 줍니다.

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

원본 테이블을 읽고 대상 테이블과 지정된 검사point 위치에 쓰도록 적절한 권한을 구성해야 합니다. 데이터 원본 및 싱크에 대한 관련 값을 사용하여 꺾쇠 괄호(<>)로 표시된 모든 매개 변수를 채웁니다.

참고 항목

Delta Live Tables는 Delta Lake 파이프라인을 만들기 위한 완전한 선언적 구문을 제공하고 트리거 및 검사포인트와 같은 속성을 자동으로 관리합니다. 델타 라이브 테이블이란?을 참조하세요.

Kafka에서 데이터를 읽고, 변환하고, Kafka에 쓰기

Apache Kafka 및 기타 메시징 버스는 큰 데이터 세트에 사용할 수 있는 가장 낮은 대기 시간의 일부를 제공합니다. Azure Databricks를 사용하여 Kafka에서 수집된 데이터에 변환을 적용한 다음, Kafka에 데이터를 다시 쓸 수 있습니다.

참고 항목

클라우드 개체 스토리지에 데이터를 쓰면 대기 시간 오버헤드가 추가됩니다. Delta Lake의 메시징 버스에서 데이터를 저장하지만 스트리밍 워크로드에 대해 가능한 가장 낮은 대기 시간이 필요한 경우 Databricks는 레이크하우스에 데이터를 수집하고 다운스트림 메시징 버스 싱크에 대해 거의 실시간으로 변환을 적용하도록 별도의 스트리밍 작업을 구성하는 것이 좋습니다.

다음 코드 예제에서는 델타 테이블의 데이터와 조인한 다음 Kafka에 다시 작성하여 Kafka에서 데이터를 보강하는 간단한 패턴을 보여 줍니다.

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Kafka 서비스에 액세스할 수 있도록 적절한 권한이 구성되어 있어야 합니다. 데이터 원본 및 싱크에 대한 관련 값을 사용하여 꺾쇠 괄호(<>)로 표시된 모든 매개 변수를 채웁니다. Apache Kafka 및 Azure Databricks를 사용한 스트림 처리를 참조 하세요.