Executar um pipeline existente com o Workflow Orchestration Manager

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Nota

O Workflow Orchestration Manager é alimentado pelo Apache Airflow.

Nota

O Workflow Orchestration Manager para Azure Data Factory depende do aplicativo Apache Airflow de código aberto. Documentação e mais tutoriais sobre o fluxo de ar podem ser encontrados nas páginas de documentação ou comunidade do Apache Airflow.

Os pipelines do Data Factory fornecem 100+ conectores de fonte de dados que fornecem integração de dados/fluxos de dados escaláveis e confiáveis. Há cenários em que você gostaria de executar um pipeline de fábrica de dados existente a partir do seu Apache Airflow DAG. Este tutorial mostra como fazer exatamente isso.

Pré-requisitos

  • Subscrição do Azure. Se não tiver uma subscrição do Azure, crie uma conta do Azure gratuita antes de começar.
  • Conta de armazenamento do Azure. Se não tiver uma conta de armazenamento, veja Criar uma conta de armazenamento do Azure para seguir os passos para criar uma. Certifique-se de que a conta de armazenamento permite o acesso apenas a partir de redes selecionadas.
  • Pipeline do Azure Data Factory. Você pode seguir qualquer um dos tutoriais e criar um novo pipeline de data factory caso ainda não tenha um, ou criar um com uma seleção em Introdução e experimentar seu primeiro pipeline de data factory.
  • Configure uma entidade de serviço. Você precisará criar uma nova entidade de serviço ou usar uma existente e conceder-lhe permissão para executar o pipeline (exemplo - função de colaborador na fábrica de dados onde os pipelines existentes existem), mesmo que o ambiente do Workflow Orchestration Manager e os pipelines existam no mesmo data factory. Você precisará obter a ID do Cliente e o Segredo do Cliente (Chave de API) da Entidade de Serviço.

Passos

  1. Crie um novo arquivo Python adf.py com o conteúdo abaixo:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Você terá que criar a conexão usando o Workflow Orchestration Manager UI Admin -> Connections -> '+' -> Escolha 'Tipo de conexão' como 'Azure Data Factory' e, em seguida, preencha sua client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name e pipeline_name.

  2. Carregue o arquivo adf.py para seu armazenamento de blob dentro de uma pasta chamada DAGS.

  3. Importe a pasta DAGS para o ambiente do Workflow Orchestration Manager. Se você não tiver um, crie um novo

    Captura de tela mostrando a guia de gerenciamento de fábrica de dados com a seção Fluxo de ar selecionada.