Databricks에서 엔드 투 엔드 데이터 파이프라인 빌드

이 문서에서는 원시 데이터를 수집하고, 데이터를 변환하고, 처리된 데이터에 대한 분석을 실행하는 방법을 포함하여 엔드 투 엔드 데이터 처리 파이프라인을 만들고 배포하는 방법을 보여 줍니다.

참고 항목

이 문서에서는 Databricks Notebook 및 Azure Databricks 작업을 사용하여 워크플로를 오케스트레이션하는 전체 데이터 파이프라인을 만드는 방법을 보여 주지만 Databricks는 신뢰할 수 있고 기본 달성 가능하며 테스트 가능한 데이터 처리 파이프라인을 빌드하기 위한 선언적 인터페이스인 Delta Live Tables를 사용하는 것이 좋습니다.

데이터 파이프라인이란?

데이터 파이프라인은 원본 시스템에서 데이터를 이동하고, 요구 사항에 따라 해당 데이터를 변환하고, 대상 시스템에 데이터를 저장하는 데 필요한 단계를 구현합니다. 데이터 파이프라인에는 원시 데이터를 사용자가 사용할 수 있는 준비된 데이터로 전환하는 데 필요한 모든 프로세스가 포함됩니다. 예를 들어 데이터 파이프라인은 데이터 분석가와 데이터 과학자가 분석 및 보고를 통해 데이터에서 가치를 추출할 수 있도록 데이터를 준비할 수 있습니다.

ETL(추출, 변환 및 로드) 워크플로는 데이터 파이프라인의 일반적인 예입니다. ETL 처리에서 데이터는 원본 시스템에서 수집되어 준비 영역에 기록되고, 요구 사항(데이터 품질 보장, 레코드 중복 제거 등)에 따라 변환된 다음, 데이터 웨어하우스 또는 데이터 레이크와 같은 대상 시스템에 기록됩니다.

데이터 파이프라인 단계

Azure Databricks에서 데이터 파이프라인 빌드를 시작하는 데 도움이 되도록 이 문서에 포함된 예제에서는 데이터 처리 워크플로를 만드는 방법을 안내합니다.

  • Azure Databricks 기능을 사용하여 원시 데이터 세트를 탐색합니다.
  • 원시 원본 데이터를 수집하고 대상 테이블에 원시 데이터를 쓰는 Databricks Notebook을 만듭니다.
  • Databricks Notebook을 만들어 원시 원본 데이터를 변환하고 변환된 데이터를 대상 테이블에 씁니다.
  • Databricks Notebook을 만들어 변환된 데이터를 쿼리합니다.
  • Azure Databricks 작업을 사용하여 데이터 파이프라인을 자동화합니다.

요구 사항

  • Azure Databricks 및 데이터 과학 및 엔지니어링 작업 영역에 로그인했습니다.
  • 클러스터를 만들거나 클러스터에 액세스할 수 있는 권한이 있습니다.
  • (선택 사항) Unity 카탈로그에 테이블을 게시하려면 Unity 카탈로그에서 카탈로그스키마 를 만들어야 합니다.

예: 백만 송 데이터 세트

이 예제에 사용된 데이터 세트는 밀리언 송 데이터 세트하위 집합으로, 현대 음악 트랙에 대한 기능 및 메타데이터 컬렉션입니다. 이 데이터 세트는 Azure Databricks 작업 영역에 포함된 샘플 데이터 세트에서 사용할 수 있습니다.

1단계: 클러스터 만들기

이 예제에서 데이터 처리 및 분석을 수행하려면 명령을 실행하는 데 필요한 컴퓨팅 리소스를 제공하는 클러스터를 만듭니다.

참고 항목

이 예제에서는 DBFS에 저장된 샘플 데이터 세트를 사용하고 테이블을 Unity 카탈로그유지하는 것이 좋습니다. 단일 사용자 액세스 모드로 구성된 클러스터를 만듭니다. 단일 사용자 액세스 모드는 DBFS에 대한 모든 권한을 제공하는 동시에 Unity 카탈로그에 대한 액세스를 사용하도록 설정합니다. DBFS 및 Unity 카탈로그에 대한 모범 사례를 참조하세요.

  1. 사이드바에서 컴퓨팅을 클릭합니다.
  2. 클러스터 페이지에서 클러스터 만들기를 클릭합니다.
  3. 새 클러스터 페이지에서 클러스터의 고유한 이름을 입력합니다.
  4. 액세스 모드에서 단일 사용자를 선택합니다.
  5. 단일 사용자 또는 서비스 주체 액세스에서 사용자 이름을 선택합니다.
  6. 다시 기본 값을 기본 상태로 두고 클러스터 만들기를 클릭합니다.

Databricks 클러스터에 대한 자세한 내용은 Compute를 참조 하세요.

2단계: 원본 데이터 탐색

Azure Databricks 인터페이스를 사용하여 원시 원본 데이터를 탐색하는 방법을 알아보려면 데이터 파이프라인에 대한 원본 데이터 탐색을 참조하세요. 데이터 수집 및 준비로 직접 이동하려면 3단계: 원시 데이터 수집을 계속합니다.

3단계: 원시 데이터 수집

이 단계에서는 원시 데이터를 테이블에 로드하여 추가 처리에 사용할 수 있도록 합니다. 테이블과 같은 Databricks 플랫폼에서 데이터 자산을 관리하기 위해 Databricks는 Unity 카탈로그를 권장합니다. 그러나 Unity 카탈로그에 테이블을 게시하는 데 필요한 카탈로그 및 스키마를 만들 수 있는 권한이 없는 경우에도 Hive 메타스토어에 테이블을 게시하여 다음 단계를 완료할 수 있습니다.

데이터를 수집하기 위해 Databricks는 자동 로더를 사용하는 것이 좋습니다. 자동 로더는 클라우드 개체 스토리지에 도착하는 새 파일을 자동으로 감지하고 처리합니다.

로드된 데이터의 스키마를 자동으로 감지하도록 자동 로더를 구성하여 데이터 스키마를 명시적으로 선언하지 않고 테이블을 초기화하고, 새 열이 도입될 때 테이블 스키마를 진화시킬 수 있습니다. 이렇게 하면 시간이 지남에 따라 스키마 변경 내용을 수동으로 추적하고 적용할 필요가 없습니다. Databricks는 자동 로더를 사용할 때 스키마 유추를 권장합니다. 그러나 데이터 탐색 단계에서 볼 수 있듯이 노래 데이터에는 헤더 정보가 포함되지 않습니다. 헤더는 데이터와 함께 저장되지 않으므로 다음 예제와 같이 스키마를 명시적으로 정의해야 합니다.

  1. 사이드바에서 새로 만들기를 클릭하고 New Icon메뉴에서 전자 필기장을 선택합니다. Notebook 만들기 대화 상자가 나타납니다.

  2. 전자 필기장 이름을 입력합니다(예: Ingest songs data.). 기본적으로:

    • Python 은 선택한 언어입니다.
    • Notebook은 사용한 마지막 클러스터에 연결됩니다. 이 경우 1단계에서 만든 클러스터: 클러스터를 만듭니다.
  3. Notebook의 첫 번째 셀에 다음을 입력합니다.

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Unity 카탈로그를 사용하는 경우 수집된 레코드(예data_pipelines.songs_data.raw_song_data: )를 포함하도록 카탈로그, 스키마 및 테이블 이름으로 바꿉 <table-name> 니다. 그렇지 않으면 수집된 레코드를 포함할 테이블의 이름으로 바꿉 <table-name> 습니다. 예를 들면 다음과 같습니다 raw_song_data.

    예를 들어 /tmp/pipeline_get_started/_checkpoint/song_data검사포인트 파일을 기본 DBFS의 디렉터리에 대한 경로로 바꿉 <checkpoint-path> 니다.

  4. 를 클릭하고 Run Menu 실행을 선택합니다. 이 예제에서는 정보를 README사용하여 데이터 스키마를 정의하고, 포함된 file_path모든 파일에서 노래 데이터를 수집하고, 지정된 table_name테이블에 데이터를 씁니다.

4단계: 원시 데이터 준비

분석을 위해 원시 데이터를 준비하기 위해 다음 단계에서는 불필요한 열을 필터링하고 새 레코드를 만들기 위한 타임스탬프가 포함된 새 필드를 추가하여 원시 노래 데이터를 변환합니다.

  1. 사이드바에서 새로 만들기를 클릭하고 New Icon메뉴에서 전자 필기장을 선택합니다. Notebook 만들기 대화 상자가 나타납니다.

  2. 전자 필기장 이름을 입력합니다. 예들 들어 Prepare songs data입니다. 기본 언어를 SQL변경합니다.

  3. Notebook의 첫 번째 셀에 다음을 입력합니다.

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Unity 카탈로그를 사용하는 경우 카탈로그, 스키마 및 테이블 이름으로 바꿔 <table-name> 필터링되고 변환된 레코드(예: data_pipelines.songs_data.prepared_song_data)를 포함합니다. 그렇지 않으면 필터링 및 변환된 레코드를 포함할 테이블의 이름으로 바꿉 <table-name> 니다(예: prepared_song_data).

    이전 단계에서 수집한 원시 노래 레코드를 포함하는 테이블의 이름으로 바꿉 <raw-songs-table-name> 습니다.

  4. 를 클릭하고 Run Menu 실행을 선택합니다.

5단계: 변환된 데이터 쿼리

이 단계에서는 노래 데이터를 분석하는 쿼리를 추가하여 처리 파이프라인을 확장합니다. 이러한 쿼리는 이전 단계에서 만든 준비된 레코드를 사용합니다.

  1. 사이드바에서 새로 만들기를 클릭하고 New Icon메뉴에서 전자 필기장을 선택합니다. Notebook 만들기 대화 상자가 나타납니다.

  2. 전자 필기장 이름을 입력합니다. 예들 들어 Analyze songs data입니다. 기본 언어를 SQL변경합니다.

  3. Notebook의 첫 번째 셀에 다음을 입력합니다.

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    준비된 데이터가 포함된 테이블의 이름으로 바꿉 <prepared-songs-table-name> 습니다. 예들 들어 data_pipelines.songs_data.prepared_song_data입니다.

  4. 셀 작업 메뉴를 클릭하고 Down Caret 아래 셀 추가를 선택하고 새 셀에 다음을 입력합니다.

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    이전 단계에서 만든 준비된 테이블의 이름으로 바꿉 <prepared-songs-table-name> 습니다. 예들 들어 data_pipelines.songs_data.prepared_song_data입니다.

  5. 쿼리를 실행하고 출력을 보려면 모두 실행을 클릭합니다.

6단계: 파이프라인을 실행하는 Azure Databricks 작업 만들기

Azure Databricks 작업을 사용하여 데이터 수집, 처리 및 분석 단계 실행을 자동화하는 워크플로를 만들 수 있습니다.

  1. 데이터 과학 및 엔지니어링 작업 영역에서 다음 중 하나를 수행합니다.
    • 사이드바에서 워크플로를 클릭하고 을 클릭합니다Create Job Button.Jobs Icon
    • 사이드바에서 새로 만들기를 클릭하고 New Icon작업을 선택합니다.
  2. 작업작업 대화 상자에서 작업 이름 추가... 예를 들어 "노래 워크플로"입니다.
  3. 작업 이름첫 번째 작업의 이름을 입력합니다(예: Ingest_songs_data.).
  4. 형식에서 Notebook 작업 유형을 선택합니다.
  5. 원본에서 작업 영역을 선택합니다.
  6. 파일 브라우저를 사용하여 데이터 수집 전자 필기장을 찾고 전자 필기장 이름을 클릭한 다음 확인을 클릭합니다.
  7. 클러스터에서 Shared_job_cluster 또는 단계에서 만든 클러스터를 Create a cluster 선택합니다.
  8. 만들기를 클릭합니다.
  9. 방금 만든 작업 아래를 클릭하고 Add Task Button 전자 필기장을 선택합니다.
  10. 작업 이름에 작업 이름(예: Prepare_songs_data)을 입력합니다.
  11. 형식에서 Notebook 작업 유형을 선택합니다.
  12. 원본에서 작업 영역을 선택합니다.
  13. 파일 브라우저를 사용하여 데이터 준비 전자 필기장을 찾고 전자 필기장 이름을 클릭한 다음 확인을 클릭합니다.
  14. 클러스터에서 Shared_job_cluster 또는 단계에서 만든 클러스터를 Create a cluster 선택합니다.
  15. 만들기를 클릭합니다.
  16. 방금 만든 작업 아래를 클릭하고 Add Task Button 전자 필기장을 선택합니다.
  17. 작업 이름에 작업 이름(예: Analyze_songs_data)을 입력합니다.
  18. 형식에서 Notebook 작업 유형을 선택합니다.
  19. 원본에서 작업 영역을 선택합니다.
  20. 파일 브라우저를 사용하여 데이터 분석 Notebook을 찾고 전자 필기장 이름을 클릭한 다음 확인을 클릭합니다.
  21. 클러스터에서 Shared_job_cluster 또는 단계에서 만든 클러스터를 Create a cluster 선택합니다.
  22. 만들기를 클릭합니다.
  23. 워크플로를 실행하려면 을 클릭합니다 Run Now Button. 실행 세부 정보를 보려면 작업 실행 보기에서 실행의 시작 시간 열에 있는 링크를 클릭합니다. 각 작업을 클릭하여 작업 실행에 대한 세부 정보를 봅니다.
  24. 워크플로가 완료된 경우 결과를 보려면 최종 데이터 분석 작업을 클릭합니다. 출력 페이지가 나타나고 쿼리 결과가 표시됩니다.

7단계: 데이터 파이프라인 작업 예약

참고 항목

Azure Databricks 작업을 사용하여 예약된 워크플로를 오케스트레이션하는 방법을 보여주기 위해 이 시작 예제는 수집, 준비 및 분석 단계를 별도의 Notebook으로 구분하고 각 Notebook을 사용하여 작업에서 작업을 만듭니다. 모든 처리가 단일 Notebook에 포함된 경우 Azure Databricks Notebook UI에서 직접 Notebook을 쉽게 예약할 수 있습니다. 예약된 Notebook 작업 만들기 및 관리를 참조하세요.

일반적인 요구 사항은 일정에 따라 데이터 파이프라인을 실행하는 것입니다. 파이프라인을 실행하는 작업에 대한 일정을 정의하려면 다음을 수행합니다.

  1. 사이드바에서 워크플로를 클릭합니다Jobs Icon.
  2. 이름 열에서 작업 이름을 클릭합니다. 측면 패널에 작업 세부 정보가 표시됩니다.
  3. 작업 세부 정보 패널에서 트리거 추가를 클릭하고 트리거 유형에서 예약을 선택합니다.
  4. 기간, 시작 시간 및 표준 시간대를 지정합니다. 필요에 따라 Cron 구문 표시 확인란을 선택하여 Quartz Cron 구문에서 일정을 표시하고 편집합니다.
  5. 저장을 클릭합니다.

자세한 정보

  • Databricks Notebook에 대한 자세한 내용은 Databricks Notebook 소개를 참조 하세요.
  • Azure Databricks 작업에 대한 자세한 내용은 Azure Databricks 작업란?을 참조하세요.
  • Delta Lake에 대한 자세한 내용은 Delta Lake란?을 참조하세요.
  • Delta Live Tables를 사용한 데이터 처리 파이프라인에 대한 자세한 내용은 델타 라이브 테이블이란?을 참조하세요.