Azure Databricks를 사용하는 스트림 처리

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

이 참조 아키텍처는 엔드투엔드 스트림 처리 파이프라인을 보여줍니다. 이 유형의 파이프라인은 수집, 처리, 저장, 분석 및 보고의 4단계로 구성됩니다. 이 참조 아키텍처의 경우 파이프라인은 실시간으로 두 원본에서 데이터를 수집하고, 각 스트림의 관련 레코드에 대해 조인을 수행하고, 결과를 보강하고, 평균을 계산합니다. 추가 분석을 위해 결과가 저장됩니다.

GitHub logo 이 아키텍처에 대한 참조 구현은 GitHub에서 사용할 수 있습니다.

아키텍처

Diagram showing a reference architecture for stream processing with Azure Databricks.

이 아키텍처의 Visio 파일을 다운로드합니다.

워크플로

이 아키텍처는 다음과 같은 구성 요소로 구성됩니다.

데이터 원본. 이 아키텍처에는 실시간으로 데이터 스트림을 생성하는 두 개의 데이터 원본이 있습니다. 첫 번째 스트림에는 승객 정보가 포함되고 두 번째 스트림에는 요금 정보가 포함됩니다. 참조 아키텍처에는 정적 파일 집합을 읽고 Event Hubs에 데이터를 푸시하는 시뮬레이트된 데이터 생성기가 포함됩니다. 실제 애플리케이션의 데이터 원본은 택시에 설치되는 디바이스입니다.

Azure Event Hubs - Event Hubs는 이벤트 수집 서비스입니다. 이 아키텍처는 각 데이터 원본에 대해 하나씩 두 개의 이벤트 허브 인스턴스를 사용합니다. 각 데이터 원본은 연결된 이벤트 허브에 데이터 스트림을 보냅니다.

Azure Databricks. Databricks는 Microsoft Azure Cloud Services 플랫폼에 대해 최적화된 Apache Spark 기반 분석 플랫폼입니다. Databricks는 Databricks 파일 시스템에 저장된 환경 데이터를 사용하여 택시 운행과 요금 데이터 간의 상관 관계를 지정하고 상관 관계 데이터를 보강하는 데 사용됩니다.

Azure Cosmos DB. Azure Databricks 작업의 출력은 Azure Cosmos DB for Apache Cassandra에 기록되는 일련의 레코드입니다. Azure Cosmos DB for Apache Cassandra는 시계열 데이터 모델링을 지원하기 때문에 사용됩니다.

  • Azure Cosmos DB용 Azure Synapse Link를 사용하면 Azure Synapse 작업 영역에서 제공되는 두 가지 분석 엔진인 SQL 서버리스Spark 풀을 사용하여 트랜잭션 워크로드의 성능이나 비용에 영향을 주지 않고 Azure Cosmos DB의 운영 데이터에 대해 거의 실시간으로 분석을 실행할 수 있습니다.

Azure Log Analytics. Azure Monitor를 통해 수집되는 애플리케이션 로그 데이터는 Log Analytics 작업 영역에 저장됩니다. Log Analytics 쿼리는 메트릭을 분석 및 시각화하고 로그 메시지를 검사하여 애플리케이션 내부의 문제를 식별하는 데 사용할 수 있습니다.

대안

  • Synapse Link는 Azure Cosmos DB 데이터를 기반으로 하는 분석을 위한 Microsoft 기본 솔루션입니다.

시나리오 정보

시나리오: 택시 회사는 각 택시 여정에 대한 데이터를 수집합니다. 이 시나리오의 경우 두 개의 별도 디바이스가 데이터를 전송하고 있다고 가정합니다. 택시에는 각 승객의 기간, 거리, 승차 및 하차 위치에 대한 정보를 전송하는 미터가 있습니다. 별도 디바이스는 고객의 지불을 수락하고 요금에 대한 데이터를 보냅니다. 택시 회사는 승객 추세를 파악하기 위해 각 환경의 마일당 평균 팁을 실시간으로 계산하려고 합니다.

잠재적인 사용 사례

이 솔루션은 소매 업계에 최적화되어 있습니다.

데이터 수집

데이터 원본을 시뮬레이션하기 위해 이 참조 아키텍처는 뉴욕시 택시 데이터 데이터 세트[1]를 사용합니다. 이 데이터 세트에는 4년(2010~2013) 간의 뉴욕시 택시 운행 데이터가 포함됩니다. 여기에는 승객 데이터 및 요금 데이터라는 두 가지 형식의 레코드가 포함됩니다. 승객 데이터에는 여정 기간, 여정 거리, 승차 및 하차 위치가 포함됩니다. 요금 데이터에는 요금, 세금 및 팁 금액이 포함됩니다. 레코드 형식 모두의 공통 필드에는 등록 번호, 택시 라이선스 및 공급 업체 ID가 포함됩니다. 이러한 세 필드는 택시와 드라이버를 고유하게 식별합니다. 데이터가 CSV 형식으로 저장됩니다.

[1] Donovan, Brian; Work, Dan (2016): 뉴욕시 택시 여정 데이터(2010-2013) University of Illinois at Urbana-Champaign https://doi.org/10.13012/J8PN93H8

데이터 생성기는 레코드를 읽고 Azure Event Hubs로 전송하는 .NET Core 애플리케이션입니다. 생성기는 승객 데이터를 JSON 형식으로 보내고, 요금 데이터를 CSV 형식으로 전송합니다.

Event Hubs는 파티션을 사용하여 데이터를 분할합니다. 파티션을 사용하면 소비자가 각 파티션을 병렬로 읽을 수 있습니다. Event Hubs에 데이터를 보낼 때 파티션 키를 명시적으로 지정할 수 있습니다. 그렇지 않으면 레코드는 라운드 로빈 방식으로 파티션에 할당됩니다.

이 시나리오에서 승객 데이터 및 요금 데이터는 지정된 택시에 대해 동일한 파티션 ID를 사용해야 합니다. 그러면 Databricks가 두 스트림의 상관 관계를 지정할 때 일정 수준의 병렬 처리를 적용할 수 있습니다. 승객 데이터의 n 파티션에 있는 레코드는 요금 데이터의 n 파티션에 있는 레코드와 일치합니다.

Diagram of stream processing with Azure Databricks and Event Hubs.

이 아키텍처의 Visio 파일을 다운로드합니다.

데이터 생성기에서 두 레코드 형식에 대한 공통 데이터 모델에는 Medallion, HackLicenseVendorId의 연결인 PartitionKey 속성이 있습니다.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

이 속성을 사용하여 Event Hubs로 전송하는 경우 명시적인 파티션 키를 제공합니다.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Event Hubs의 처리량 용량은 처리량 단위로 제어됩니다. 자동 팽창을 사용하도록 설정하여 이벤트 허브를 자동 크기 조정할 수 있습니다. 그러면 트래픽에 따라 처리량 단위를 구성된 최댓값까지 자동으로 크기 조정합니다.

스트림 처리

Azure Databricks에서 데이터 처리는 작업을 통해 수행됩니다. 작업은 클러스터에 할당되고 클러스터에서 실행됩니다. 작업은 Java로 작성된 사용자 지정 코드이거나 Spark Notebook입니다.

이 참조 아키텍처의 작업은 클래스가 Java 및 Scala로 작성된 Java 아카이브입니다. Databricks 작업의 Java 아카이브를 지정할 때 Databricks 클러스터의 실행에 대한 클래스가 지정됩니다. 여기서 com.microsoft.pnp.TaxiCabReader 클래스의 메서드에는 데이터 처리 논리가 포함됩니다.

두 이벤트 허브 인스턴스에서 스트림 읽기

데이터 처리 논리는 Spark 구조적 스트리밍을 사용하여 두 Azure 이벤트 허브 인스턴스에서 스트림을 읽습니다.

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

환경 정보를 사용하여 데이터 보강

승객 데이터에는 승차 및 하차 위치의 위도와 경도 좌표가 포함됩니다. 이러한 좌표는 유용하기는 하지만 분석에 쉽게 사용하기 어렵습니다. 따라서 셰이프 파일에서 읽은 환경 데이터를 사용하여 이 데이터를 보강합니다.

셰이프 파일 형식은 이진이고 간단하게 구문 분석할 수 없지만, GeoTools 라이브러리에서 셰이프 파일 형식을 사용하는 지리 공간 데이터를 분석할 수 있는 도구를 제공합니다. 이 라이브러리는 com.microsoft.pnp.GeoFinder 클래스에 사용되어 승차 및 하차 좌표를 기준으로 환경 이름을 확인합니다.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

승객 및 요금 데이터 조인

먼저 승객 및 요금 데이터가 변환됩니다.

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

그런 다음, 승객 데이터가 요금 데이터에 조인됩니다.

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

데이터를 처리하여 Azure Cosmos DB에 삽입

지정된 시간 간격마다 각 환경의 평균 요금이 계산됩니다.

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

계산된 요금은 Azure Cosmos DB에 삽입됩니다.

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

고려 사항

이러한 고려 사항은 워크로드의 품질을 향상시키는 데 사용할 수 있는 일단의 지침 원칙인 Azure Well-Architected Framework의 핵심 요소를 구현합니다. 자세한 내용은 Microsoft Azure Well-Architected Framework를 참조하세요.

보안

우수한 보안은 중요한 데이터 및 시스템에 대한 고의적인 공격과 악용을 방어합니다. 자세한 내용은 보안 요소의 개요를 참조하세요.

Azure Databricks 작업 영역에 대한 액세스는 관리자 콘솔을 사용하여 제어됩니다. 관리자 콘솔에는 사용자를 추가하고, 사용자 권한을 관리하고, Single Sign-On을 설정하는 기능이 포함되어 있습니다. 관리자 콘솔을 통해 작업 영역, 클러스터, 작업 및 테이블에 대한 액세스 제어를 설정할 수도 있습니다.

암호 관리

Azure Databricks에는 연결 문자열, 액세스 키, 사용자 이름 및 암호를 포함하여 비밀을 저장하는 데 사용되는 비밀 저장소가 포함되어 있습니다. Azure Databricks 비밀 저장소의 비밀은 범위에 따라 분할됩니다.

databricks secrets create-scope --scope "azure-databricks-job"

비밀은 범위 수준에서 추가됩니다.

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

참고

기본 Azure Databricks 범위 대신 Azure Key Vault에서 지원되는 범위를 사용할 수 있습니다. 자세한 내용은 Azure Key Vault에서 지원하는 범위를 참조하세요.

코드에서 비밀은 Azure Databricks 비밀 유틸리티를 통해 액세스됩니다.

모니터링

Azure Databricks는 Apache Spark를 기반으로 하며, 둘 다 로깅의 표준 라이브러리로 log4j를 사용합니다. Apache Spark에서 제공하는 기본 로깅 외에도 Azure Databricks 모니터링 문서에 따라 Azure Log Analytics에 대한 로깅을 구현할 수 있습니다.

com.microsoft.pnp.TaxiCabReader 클래스가 승객 및 요금 메시지를 처리할 때 둘 중 하나의 형식이 잘못되어 유효하지 않을 수 있습니다. 프로덕션 환경에서는 신속히 문제를 해결하여 데이터 손실을 방지할 수 있도록 이처럼 잘못된 형식의 메시지를 분석하여 데이터 원본의 문제를 파악하는 것이 중요합니다. com.microsoft.pnp.TaxiCabReader 클래스는 형식이 잘못된 요금 및 승객 레코드 수를 추적하는 Apache Spark 누적기를 등록합니다.

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark는 Dropwizard 라이브러리를 사용하여 메트릭을 보내며, 네이티브 Dropwizard 메트릭 필드 중 일부는 Azure Log Analytics와 호환되지 않습니다. 이러한 이유로 이 참조 아키텍처에는 사용자 지정 Dropwizard 싱크 및 보고자가 포함되어 있습니다. 사용자 지정 Dropwizard 싱크 및 보고자는 Azure Log Analytics에서 예상하는 형식으로 메트릭을 포맷합니다. Apache Spark가 메트릭을 보고할 때 잘못된 승객 및 요금 데이터에 대한 사용자 지정 메트릭도 전송됩니다.

다음은 Azure Log Analytics 작업 영역에서 스트리밍 작업의 실행을 모니터링하는 데 사용할 수 있는 예제 쿼리입니다. 각 쿼리의 인수 ago(1d)는 마지막 날에 생성된 모든 레코드를 반환하며 다른 기간을 보려면 조정할 수 있습니다.

스트림 쿼리를 실행하는 동안 기록된 예외

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

잘못된 형식의 요금 및 승객 데이터의 누적

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

시간에 따른 작업 실행

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

자세한 내용은 Azure Databricks 모니터링을 참조하세요.

DevOps

  • 프로덕션, 개발 및 테스트 환경에 대해 별도의 리소스 그룹을 만듭니다. 별도의 리소스 그룹을 만들면 배포 관리, 테스트 배포 삭제, 액세스 권한 할당 등이 더 간단해집니다.

  • Azure Resource Manager 템플릿을 사용하여 IaC(코드 제공 인프라) 프로세스에 따라 Azure 리소스를 배포합니다. 템플릿을 사용하면 Azure DevOps Services 또는 기타 CI/CD 솔루션을 사용하여 배포를 자동화하는 것이 더 쉽습니다.

  • 각 워크로드를 별도의 배포 템플릿에 배치하고 리소스를 소스 제어 시스템에 저장합니다. 템플릿을 CI/CD 프로세스의 일부로 함께 또는 개별적으로 배포하여 자동화 프로세스를 더 쉽게 수행할 수 있습니다.

    이 아키텍처에서 Azure Event Hubs, Log Analytics, Azure Cosmos DB는 단일 워크로드로 식별됩니다. 이러한 리소스는 단일 ARM 템플릿에 포함됩니다.

  • 워크로드를 준비하는 것이 좋습니다. 다음 단계로 이동하기 전에 다양한 단계에 배포하고 각 단계에서 유효성 검사를 실행합니다. 이렇게 하면 고도로 통제된 방식으로 프로덕션 환경에 업데이트를 푸시하고 예상치 못한 배포 문제를 최소화할 수 있습니다.

    이 아키텍처에는 여러 배포 단계가 있습니다. Azure DevOps 파이프라인을 만들고 해당 단계를 추가하는 것이 좋습니다. 자동화할 수 있는 단계의 몇 가지 예는 다음과 같습니다.

    • Databricks 클러스터 시작
    • Databricks CLI 구성
    • Scala 도구 설치
    • Databricks 비밀 추가

    또한 Databricks 코드 및 수명 주기의 품질과 안정성을 개선하기 위해 자동화된 통합 테스트를 작성하는 것이 좋습니다.

  • Azure Monitor를 사용하여 스트림 처리 파이프라인의 성능을 분석하는 것이 좋습니다. 자세한 내용은 Azure Databricks 모니터링을 참조하세요.

자세한 내용은 Microsoft Azure Well-Architected Framework의 DevOps 섹션을 참조하세요.

비용 최적화

비용 최적화는 불필요한 비용을 줄이고 운영 효율성을 높이는 방법을 찾는 것입니다. 자세한 내용은 비용 최적화 핵심 요소 개요를 참조하세요.

Azure 가격 계산기를 사용하여 비용을 예측합니다. 다음은 이 참조 아키텍처에서 사용되는 서비스에 대한 몇 가지 고려 사항입니다.

Event Hubs

이 참조 아키텍처는 표준 계층에 Event Hubs를 배포합니다. 가격 책정 모델은 처리량 단위, 수신 이벤트, 캡처 이벤트를 기반으로 합니다. 수신 이벤트는 64KB 이하의 데이터 단위입니다. 더 큰 메시지는 64KB의 배수로 청구됩니다. Azure Portal 또는 Event Hubs 관리 API를 통해 처리량 단위를 지정합니다.

보존 기간이 더 필요한 경우 전용 계층을 고려하세요. 이 계층은 가장 까다로운 요구 사항을 가진 단일 테넌트 배포를 제공합니다. 이 제공은 처리량 단위로 제한되지 않는 CU(용량 단위)를 기반으로 클러스터를 구축합니다.

또한 표준 계층은 수신 이벤트 및 처리량 단위에 따라 요금이 청구됩니다.

Event Hubs 가격에 대한 자세한 내용은 Event Hubs 가격 책정을 참조하세요.

Azure Databricks

Azure Databricks는 각각 3개의 워크로드를 지원하는 두 계층 표준프리미엄을 제공합니다. 이 참조 아키텍처는 프리미엄 계층에 Azure Databricks 작업 영역을 배포합니다.

데이터 엔지니어링데이터 엔지니어링 Light 워크로드는 데이터 엔지니어가 작업을 빌드하고 실행하는 데 사용됩니다. 데이터 분석 워크로드는 데이터 과학자가 데이터와 인사이트를 대화식으로 탐색, 시각화, 조작, 공유하기 위한 것입니다.

Azure Databricks는 다양한 가격 책정 모델을 제공합니다.

  • 종량제 계획

    선택한 VM 인스턴스에 따라 클러스터 및 DBU(Databricks 단위)에 프로비저닝된 VM(가상 머신)에 대한 요금이 청구됩니다. DBU는 초당 사용량에 따라 청구되는 처리 기능 단위입니다. DBU 사용량은 Azure Databricks를 실행하는 인스턴스의 크기 및 유형에 따라 달라집니다. 가격 책정은 선택한 워크로드 및 계층에 따라 달라집니다.

  • 사전 구매 계획

    1년 또는 3년 동안 Azure DBU(Databricks 단위)를 DBCU(Databricks 커밋 단위)로 커밋합니다. 종량제 모델과 비교할 때 최대 37%까지 절약할 수 있습니다.

자세한 내용은 Azure Databricks 가격 책정을 참조하세요.

Azure Cosmos DB

이 아키텍처에서 일련의 레코드는 Azure Databricks 작업에 의해 Azure Cosmos DB에 기록됩니다. 삽입 작업을 수행하는 데 사용되는 RU/초(초당 요청 단위)로 표현된 예약 용량에 대한 요금이 청구됩니다. 청구 단위는 시간당 100RU/초입니다. 예를 들어 100KB 항목을 작성하는 비용은 50RU/초입니다.

쓰기 작업의 경우 초당 필요한 쓰기 수를 지원하기에 충분한 용량을 프로비저닝합니다. 쓰기 작업을 수행하기 전에 포털 또는 Azure CLI를 사용하여 프로비저닝된 처리량을 늘린 다음, 해당 작업이 완료된 후 처리량을 줄일 수 있습니다. 쓰기 기간의 처리량은 지정된 데이터에 필요한 최소 처리량에 다른 작업 부하가 실행되고 있지 않다고 가정할 때 삽입 작업에 필요한 처리량을 더한 값입니다.

비용 분석 예제

컨테이너에서 처리량 값 1,000RU/초를 구성한다고 가정합니다. 30일 동안 24시간, 총 720시간 동안 배포됩니다.

컨테이너는 매시간 시간당 100RU/초의 10단위 요금이 청구됩니다. $0.008(시간당 100RU/초당)의 10단위는 시간당 $0.08의 요금이 청구됩니다.

720시간 또는 7,200단위(100RU)의 경우 해당 월에 대해 $57.60의 요금이 청구됩니다.

저장된 데이터 및 인덱스에 사용된 각 GB에 대해 스토리지도 청구됩니다. 자세한 내용은 Azure Cosmos DB 가격 책정 모델을 참조하세요.

Azure Cosmos DB 용량 계산기를 사용하여 워크로드 비용을 빠르게 예측할 수 있습니다.

자세한 내용은 Microsoft Azure Well-Architected Framework의 비용 섹션을 참조하세요.

시나리오 배포

참조 구현을 배포하고 실행하려면 GitHub readme의 단계를 따릅니다.

다음 단계