Köra en befintlig pipeline med Workflow Orchestration Manager

GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics

Dricks

Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!

Kommentar

Workflow Orchestration Manager drivs av Apache Airflow.

Kommentar

Workflow Orchestration Manager för Azure Data Factory förlitar sig på öppen källkod Apache Airflow-programmet. Dokumentation och fler självstudier för Airflow finns på Apache Airflow-dokumentationen eller community-sidorna.

Data Factory-pipelines tillhandahåller över 100 anslutningsappar för datakällor som ger skalbar och tillförlitlig dataintegrering/dataflöden. Det finns scenarier där du vill köra en befintlig datafabrikspipeline från Apache Airflow DAG. Den här självstudien visar hur du gör just det.

Förutsättningar

  • Azure-prenumeration. Om du inte har en Azure-prenumeration kan du skapa ett kostnadsfritt Azure-konto innan du börjar.
  • Azure Storage-konto. Om du inte har ett lagringskonto finns det anvisningar om hur du skapar ett i Skapa ett Azure Storage-konto. Kontrollera att lagringskontot endast tillåter åtkomst från valda nätverk.
  • Azure Data Factory-pipeline. Du kan följa någon av självstudierna och skapa en ny datafabrikspipeline om du inte redan har en, eller skapa en med ett val i Kom igång och prova din första datafabrikspipeline.
  • Konfigurera ett huvudnamn för tjänsten. Du måste skapa ett nytt huvudnamn för tjänsten eller använda ett befintligt och ge det behörighet att köra pipelinen (till exempel deltagarrollen i datafabriken där befintliga pipelines finns), även om arbetsflödesorkestreringshanterarens miljö och pipelines finns i samma datafabrik. Du måste hämta tjänstens huvudnamns klient-ID och klienthemlighet (API-nyckel).

Steg

  1. Skapa en ny Python-fil adf.py med innehållet nedan:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Du måste skapa anslutningen med hjälp av UI-administratören för Arbetsflödesorkestreringshanteraren –> Anslut ions –> "+" –> Välj "Anslut ionstyp" som "Azure Data Factory" och fyll sedan i dina client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name och pipeline_name.

  2. Ladda upp adf.py-filen till bloblagringen i en mapp med namnet DAGS.

  3. Importera MAPPEN DAGS till arbetsflödesorkestreringshanterarens miljö. Om du inte har en skapar du en ny

    Skärmbild som visar fliken datafabrikshantering med avsnittet Airflow valt.