데이터 파이프라인에서 종속성 관리

데이터 처리 파이프라인을 개발 하 고 배포 하려면 작업 간 복잡 한 종속성을 관리 해야 하는 경우가 많습니다. 예를 들어 파이프라인은 소스에서 데이터를 읽고, 데이터를 정리 하 고, 정리 된 데이터를 변환 하 고, 변환 된 데이터를 대상에 쓸 수 있습니다. 운영 때 데이터 파이프라인을 테스트 하 고 예약 하 고 문제를 해결 해야 합니다.

워크플로 시스템은 작업 간 종속성을 정의 하 고, 파이프라인이 실행 되는 일정을 예약 하 고, 워크플로를 모니터링할 수 있도록 하 여 이러한 문제를 해결 합니다. Databricks는 외부 시스템에 의존 하지 않고 워크플로를 관리 하는 여러 태스크가 포함 된 작업 을 권장 합니다. Azure Databricks 작업은 표준 인증 및 액세스 제어 메서드를 사용 하 여 작업 오케스트레이션을 제공 합니다. 친숙 한 사용자에 게 친숙 한 인터페이스를 사용 하 여 작업을 관리 하 여 복잡 한 워크플로를 만들고 관리할 수 있습니다. 여러 작업을 포함 하는 작업을 정의할 수 있습니다. 여기서 각 작업은 노트북 또는 JAR 등의 코드를 실행 하 고 두 작업 간의 종속성을 지정 하 여 작업의 작업 실행 순서를 제어 합니다. 작업의 태스크가 순서 또는 병렬 실행 되도록 구성할 수 있습니다.

Azure Databricks은 Azure Data Factory 또는 Apache 공기 흐름이있는 워크플로 관리도 지원 합니다.

Azure Data Factory

Azure Data Factory 는 데이터 저장소, 이동 및 처리 서비스를 자동화 된 데이터 파이프라인으로 작성할 수 있는 클라우드 데이터 통합 서비스입니다. Azure Data Factory 데이터 파이프라인에서 노트북 Databricks을 운영 수 있습니다. Azure Databricks 클러스터에서 Databricks 노트북을 실행 하는 Azure Data Factory 파이프라인을 만든 다음 Databricks 노트북을 실행 하 여 데이터를 변환하는 방법에 대 한 지침은 Azure Data Factory Databricks 노트북으로 Databricks 노트북 실행 을 참조 하세요.

Apache 공기 흐름

Apache 공기 흐름 은 데이터 파이프라인을 관리 하 고 예약 하기 위한 오픈 소스 솔루션입니다. 공기 흐름은 작업의 Dag (방향이 지정 된 비순환 그래프)로 데이터 파이프라인을 나타냅니다. Python 파일에서 워크플로를 정의 하 고 공기 흐름이 예약 및 실행을 관리 합니다.

공기 흐름은 Azure Databricks와 공기 흐름 간의 긴밀 한 통합을 제공 합니다. 공기 흐름 Azure Databricks 통합을 사용 하면 Azure Databricks에서 제공 하는 최적화 된 Spark 엔진을 사용 하 여 공기의 일정 기능을 활용할 수 있습니다.

요구 사항

  • 공기 흐름과 Azure Databricks 간의 통합은 1.9.0 이상 버전에서 사용할 수 있습니다. 이 문서의 예제는 공기 흐름 버전 2.1.0 테스트 됩니다.
  • 통풍에는 Python 3.6, 3.7 또는 3.8가 필요 합니다. 이 문서의 예제는 Python 3.8를 사용 하 여 테스트 되었습니다.

공기 흐름 Azure Databricks 통합 설치

공기 흐름 Azure Databricks 통합을 설치 하려면 터미널을 열고 다음 명령을 실행 합니다.

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow==2.1.0
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email your@email.com

다음 명령을 실행합니다.

  1. 라는 디렉터리를 만들고 airflow 해당 디렉터리로 변경 합니다.
  2. pipenv를 사용 하 여 pipenv만들고 생성 합니다. Databricks는 Python 가상 환경을 사용 하 여 패키지 버전 및 코드 종속성을 해당 환경에 격리 하는 것을 권장 합니다. 이러한 격리를 통해 예기치 않은 패키지 버전 불일치 및 코드 종속성 충돌을 줄일 수 있습니다.
  3. AIRFLOW_HOME디렉터리의 경로에 설정 된 환경 변수를 초기화 airflow 합니다.
  4. 공기 흐름 및 공기 흐름 Databricks 공급자 패키지를 설치 합니다.
  5. airflow/dags 디렉터리를 만듭니다. 기류는 디렉터리를 사용 하 여 dags DAG 정의를 저장 합니다.
  6. 공기 흐름이 메타 데이터를 추적 하는 데 사용 하는 SQLite 데이터베이스를 초기화 합니다. 프로덕션 통풍 배포에서는 표준 데이터베이스를 사용 하 여 공기 흐름을 구성 합니다. 흐름 배포에 대 한 SQLite 데이터베이스 및 기본 구성은 디렉터리에서 초기화 됩니다 airflow .
  7. 공기 흐름에 대 한 관리 사용자를 만듭니다.

추가 기능 (예: 및)을 설치 하려면 password 다음을 실행 합니다.

pip install "apache-airflow[databricks, celery, password]"

통풍 웹 서버 및 스케줄러 시작

공기 흐름 UI를 보려면 통풍 웹 서버가 필요 합니다. 웹 서버를 시작 하려면 터미널을 열고 다음 명령을 실행 합니다.

airflow webserver

스케줄러는 Dag을 예약 하는 공기 흐름 구성 요소입니다. 이를 실행 하려면 새 터미널을 열고 다음 명령을 실행 합니다.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

공기 흐름 설치 테스트

공기 흐름이 설치 되었는지 확인 하려면 Dag에 포함 된 예제 중 하나를 실행할 수 있습니다.

  1. 브라우저 창에서를 엽니다 http://localhost:8080/home . 공기 흐름이 dag 화면이 나타납니다.
  2. 예를 들어 Dag /해제할 DAG 해제할를 클릭 하 여 예를 들면와 같이 .
  3. 시작 단추를 클릭 하 여 DAG 예제를 트리거합니다.
  4. DAG 이름을 클릭 하 여 DAG의 실행 상태를 포함 한 세부 정보를 확인 합니다.

공기 흐름에서 Azure Databricks 작업 실행

공기 흐름 Azure Databricks 통합은 작업을 트리거하는 두 가지 다른 연산자를 제공 합니다.

  • DatabricksRunNowOperator 에는 기존 Azure Databricks 작업이 필요 하며, 실행을 트리거하기 위해 새 작업 실행 ( ) API 요청 트리거를 사용 합니다. Databricks DatabricksRunNowOperator 작업 정의의 중복을 줄이고이 연산자를 사용 하 여 트리거된 작업 실행을 작업 DatabricksRunNowOperator에서 쉽게 찾을 수 있으므로를 사용 하는 것이 좋습니다.
  • DatabricksSubmitRunOperator 는 Azure Databricks에 존재 하는 작업이 필요 하지 않으며, Create and trigger a time run () API 요청을 사용 하 여 작업 사양을 제출 하 고 실행을 트리거합니다.

Databricks 공기 흐름이 적용 되는 경우 작업 실행 페이지 URL은 각 polling_period_seconds (기본값은 30 초)으로 흐름 로그에 기록 됩니다. 자세한 내용은 공기 흐름 웹 사이트의 apache-databricks 패키지 페이지를 참조 하세요.

예제

다음 예제에서는 로컬 컴퓨터에서 실행 되는 간단한 공기 흐름 배포를 만들고 Azure Databricks에서 실행을 트리거하는 예제 DAG를 배포 하는 방법을 보여 줍니다. 이 예에서는 다음을 수행 합니다.

  1. 새 노트북을 만들고 구성 된 매개 변수를 기반으로 인사말을 인쇄 하는 코드를 추가 합니다.
  2. 노트북을 실행 하는 단일 작업을 사용 하 여 Azure Databricks 작업을 만듭니다.
  3. Azure Databricks 작업 영역에 대 한 공기 흐름 연결을 구성 합니다.
  4. 노트북 작업을 트리거하는 공기 흐름 DAG을 만듭니다. 을 사용 하 여 Python 스크립트에서 DAG를 정의 합니다 DatabricksRunNowOperator .
  5. 공기 흐름 UI를 사용 하 여 DAG를 트리거하고 실행 상태를 확인 합니다.

Notebook 만들기

이 예제에서는 두 개의 셀이 포함 된 노트북을 사용 합니다.

  • 첫 번째 셀에는 기본 값으로 설정 된 변수를 정의 하는 Databricks 유틸리티 텍스트 위젯이 있습니다 world .
  • 두 번째 셀은 greeting 접두사가 접두사로 붙는 변수의 값을 인쇄 합니다 hello .

노트북을 만들려면 다음을 수행 합니다.

  1. Azure Databricks 방문 페이지로 이동 하 여 빈 노트북 만들기 를 선택 하거나 Create Icon 사이드바에서 만들기 를 클릭 하 고 메뉴에서 노트북 을 선택 합니다. [ 노트북 만들기 ] 대화 상자가 나타납니다.

  2. 노트북 만들기 대화 상자에서 Hello 공기와 같은 노트북 이름을 지정 합니다. 기본 언어Python으로 설정 합니다. 클러스터 를 기본값으로 설정 된 상태로 둡니다. 이 노트북을 사용 하는 작업을 만들 때 클러스터를 구성 합니다.

  3. 만들기를 클릭합니다.

  4. 다음 Python 코드를 복사 하 여 노트북의 첫 번째 셀에 붙여 넣습니다.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  5. 첫 번째 셀 아래에 새 셀을 추가 하 고 다음 Python 코드를 복사 하 여 새 셀에 붙여 넣습니다.

    print("hello {}".format(greeting))
    

작업 만들기

  1. Jobs Icon사이드바에서 작업 을 클릭 합니다.

  2. Create Job Button을 클릭합니다.

    작업 탭은 작업 만들기 대화 상자와 함께 표시 됩니다.

    Create first task dialog

  3. 작업 이름에 작업 이름 추가 를 바꿉니다.

  4. 작업 이름 필드에 작업 이름 (예: 인사말-작업)을 입력 합니다.

  5. 유형 드롭다운에서 노트북을 선택 합니다.

  6. 파일 브라우저를 사용 하 여 만든 노트북을 찾고, 노트북 이름을 클릭 하 고, 확인을 클릭 합니다.

  7. 매개 변수아래에서 추가 를 클릭 합니다. 필드에를 입력 합니다. 필드에를 입력 합니다.

  8. 작업 만들기를 클릭 합니다.

작업 실행

작업을 즉시 실행 하려면 Run Now Button 오른쪽 위 모서리를 클릭 합니다. 실행 탭을 클릭하고 활성 실행 테이블에서 지금 실행을 클릭하여 작업을 실행할 수도 있습니다.

실행 세부 정보 보기

  1. 실행 탭을 클릭하고 활성 실행 테이블 또는 완료된 실행(지난 60일) 테이블에서 자세히 보기를 클릭합니다.

  2. 작업 ID 값을 복사합니다. 이 값은 Airflow에서 작업을 트리거하는 데 필요합니다.

    View example job results

Azure Databricks 개인용 액세스 토큰 만들기

Airflow는 AZURE DATABRICKS PAT(개인용 액세스 토큰)를 사용하여 Databricks에 연결합니다. PAT를 만드는 방법에 대한 지침은 개인용 액세스 토큰을 참조하세요.

Azure Databricks 연결 구성

Airflow 설치에는 Azure Databricks 대한 기본 연결이 포함되어 있습니다. 위에서 만든 개인용 액세스 토큰을 사용하여 작업 영역에 연결하도록 연결을 업데이트하려면 다음을 수행합니다.

  1. 브라우저 창에서 를 http://localhost:8080/connection/list/ 엽니다.

  2. Conn ID아래에서 databricks_default 찾아 레코드 편집 단추를 클릭합니다.

  3. 호스트 필드의 값을 Azure Databricks 배포의 작업 영역 인스턴스 이름으로 대체합니다.

  4. 추가 필드에 다음 값을 입력합니다.

    {"token": "PERSONAL_ACCESS_TOKEN"}
    

    PERSONAL_ACCESS_TOKEN를 Azure Databricks 개인용 액세스 토큰으로 대체합니다.

새 DAG 만들기

Python 파일에서 Airflow DAG를 정의합니다. 예제 Notebook 작업을 트리거하는 DAG를 만들려면 다음을 수행합니다.

  1. 텍스트 편집기 또는 IDE에서 다음 내용이 있는 라는 새 파일을 databricks_dag.py 만듭니다.

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    JOB_ID를 이전에 저장한 작업 ID의 값으로 대체합니다.

  2. airflow/dags파일을 디렉터리에 저장합니다. Airflow는 에 저장된 DAG 파일을 자동으로 읽고 airflow/dags/ 설치합니다.

Airflow에서 DAG 설치 및 확인

Airflow UI에서 DAG를 트리거하고 확인하려면 다음을 수행합니다.

  1. 브라우저 창에서 를 http://localhost:8080/home 엽니다. Airflow DAG 화면이 나타납니다.
  2. databricks_dagdatabricks_dag 토글을 찾아서 클릭하여 DAG를 일시 중지 해제합니다.
  3. 시작 단추를 클릭하여 DAG를 트리거합니다.
  4. 실행 열에서 실행을 클릭하여 실행의 상태 및 세부 정보를 확인합니다.