다음을 통해 공유


Azure Event Hubs를 델타 라이브 테이블 데이터 원본으로 사용

이 문서에서는 Delta Live Tables를 사용하여 Azure Event Hubs의 메시지를 처리하는 방법을 설명합니다. 이 라이브러리는 Databricks 런타임의 일부로 사용할 수 없으며 델타 라이브 테이블은 타사 JVM 라이브러리를 사용할 수 없으므로 구조적 스트리밍 Event Hubs 커넥터를 사용할 수 없습니다.

Delta Live Tables는 Azure Event Hubs에 어떻게 연결할 수 있나요?

Azure Event Hubs는 Databricks Runtime에서 Azure Event Hubs 메시지를 처리하는 데 사용할 수 있는 구조적 스트리밍 Kafka 커넥터와 함께 Apache Kafka와 호환되는 엔드포인트를 제공합니다. Azure Event Hubs 및 Apache Kafka 호환성에 대한 자세한 내용은 Apache Kafka 애플리케이션에서 Azure Event Hubs 사용을 참조하세요.

다음 단계에서는 Delta Live Tables 파이프라인을 기존 Event Hubs 인스턴스에 연결하고 토픽에서 이벤트를 사용하는 방법을 설명합니다. 이러한 단계를 완료하려면 다음 Event Hubs 연결 값이 필요합니다.

  • Event Hubs 네임스페이스의 이름입니다.
  • Event Hubs 네임스페이스에 있는 이벤트 허브 인스턴스의 이름입니다.
  • Event Hubs에 대한 공유 액세스 정책 이름 및 정책 키입니다. 기본적으로 각 Event Hubs 네임스페이스에 대해 RootManageSharedAccessKey 정책이 만들어집니다. 이 정책에는 manage, send, listen 권한이 있습니다. 파이프라인이 Event Hubs에서만 읽는 경우 Databricks는 수신 대기 권한으로만 새 정책을 만드는 것이 좋습니다.

Event Hubs 연결 문자열에 대한 자세한 내용은 Event Hubs 연결 문자열 가져오기를 참조하세요.

참고 항목

  • Azure Event Hubs는 보안 리소스에 대한 액세스 권한을 부여하는 OAuth 2.0 및 SAS(공유 액세스 서명) 옵션을 모두 제공합니다. 이러한 지침은 SAS 기반 인증을 사용합니다.
  • Azure Portal에서 Event Hubs 연결 문자열을 가져오는 경우 EntityPath 값이 포함되지 않을 수 있습니다. EntityPath 값은 구조적 스트리밍 Event Hub 커넥터를 사용하는 경우에만 필요합니다. 구조적 스트리밍 Kafka 커넥터를 사용하려면 토픽 이름만 제공해야 합니다.

Azure Databricks 비밀에 정책 키 저장

정책 키는 중요한 정보이므로 Databricks는 파이프라인 코드의 값을 하드 코딩하지 않는 것이 좋습니다. 대신 Azure Databricks 비밀을 사용하여 키에 대한 액세스를 저장하고 관리합니다.

다음 예제에서는 Databricks CLI를 사용하여 비밀 범위를 만들고 해당 비밀 범위에 키를 저장합니다. 파이프라인 코드에서 및 dbutils.secrets.get() 함수를 scope-nameshared-policy-name과 함께 사용하여 키 값을 검색합니다.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Azure Databricks 비밀에 대한 자세한 내용은 비밀 관리를 참조하세요.

Notebook을 만들고 파이프라인 코드를 추가하여 이벤트 사용

다음 예제에서는 토픽에서 IoT 이벤트를 읽지만 애플리케이션의 요구 사항에 맞게 예제를 조정할 수 있습니다. Databricks는 Delta Live Tables 파이프라인 설정을 사용하여 애플리케이션 변수를 구성하는 것이 좋습니다. 그러면 파이프라인 코드는 spark.conf.get() 함수를 사용하여 값을 검색합니다. 파이프라인 설정을 사용하여 파이프라인을 매개 변수화하는 방법에 대한 자세한 내용은 파이프라인 매개 변수화를 참조하세요.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

파이프라인 만들기

다음 설정을 사용하여 새 파이프라인을 만들고 자리 표시자 값을 환경에 적합한 값으로 바꿉니다.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

바꾸기

  • <container-name>을 Azure Storage 계정 컨테이너의 이름으로 바꿉니다.
  • <storage-account-name>을 ADLS Gen2 스토리지 계정 이름으로 바꿉니다.
  • <eh-namespace>을 Event Hubs 네임스페이스 이름으로 바꿉니다.
  • <eh-policy-name>을 Event Hubs 정책 키에 대한 비밀 범위 키로 바꿉니다.
  • <eventhub>을 이벤트 허브 인스턴스의 이름으로 바꿉니다.
  • <secret-scope-name>을 Event Hubs 정책 키가 포함된 Azure Databricks 비밀 범위의 이름으로 바꿉니다.

이 파이프라인은 기본 DBFS 스토리지 경로를 사용하지 않고 대신 ADLS Gen2(Azure Data Lake Storage Gen2) 스토리지 계정을 사용하는 것이 좋습니다. ADLS Gen2 스토리지 계정에 대한 인증을 구성하는 방법에 대한 자세한 내용은 파이프라인에서 비밀을 사용하여 스토리지 자격 증명에 안전하게 액세스하는 방법을 참조하세요.