Eseguire una pipeline esistente con Workflow Orchestration Manager

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi completa per le aziende. Microsoft Fabric copre tutti gli elementi, dallo spostamento dei dati all'analisi scientifica dei dati, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Scopri come avviare gratuitamente una nuova versione di valutazione .

Nota

Workflow Orchestration Manager è basato su Apache Airflow.

Nota

Workflow Orchestration Manager per Azure Data Factory si basa sull'applicazione Apache Airflow open source. La documentazione e altre esercitazioni per Airflow sono disponibili nelle pagine della documentazione o della community di Apache Airflow.

Le pipeline di Data Factory offrono oltre 100 connettori di origine dati che offrono flussi di dati/integrazione dati scalabili e affidabili. Esistono scenari in cui si vuole eseguire una pipeline di data factory esistente dal daG Apache Airflow. Questa esercitazione illustra come eseguire questa operazione.

Prerequisiti

  • Sottoscrizione di Azure. Se non si ha una sottoscrizione di Azure, creare un account Azure gratuito prima di iniziare.
  • Account di archiviazione di Azure. Se non si ha un account di archiviazione, vedere Creare un account di archiviazione di Azure per informazioni su come crearne uno. Assicurarsi che l'account di archiviazione consenta l'accesso solo da reti selezionate.
  • Pipeline di Azure Data Factory. È possibile seguire una qualsiasi delle esercitazioni e creare una nuova pipeline di data factory nel caso in cui non ne sia già disponibile una o crearne una con una selezione in Introduzione e provare la prima pipeline della data factory.
  • Configurare un'entità servizio. Sarà necessario creare una nuova entità servizio o usarne una esistente e concedergli l'autorizzazione per eseguire la pipeline (ad esempio il ruolo collaboratore nella data factory in cui esistono le pipeline esistenti), anche se l'ambiente di Workflow Orchestration Manager e le pipeline esistono nella stessa data factory. Sarà necessario ottenere l'ID client dell'entità servizio e il segreto client (chiave API).

Passaggi

  1. Creare un nuovo file Python adf.py con il contenuto seguente:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Sarà necessario creare la connessione usando l'interfaccia utente di Workflow Orchestration Manager Amministrazione -> Connessione ions -> '+' -> Scegliere "tipo di Connessione ion" come "Azure Data Factory", quindi compilare il client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name e pipeline_name.

  2. Caricare il file adf.py nell'archivio BLOB all'interno di una cartella denominata DAGS.

  3. Importare la cartella DAGS nell'ambiente workflow Orchestration Manager. Se non ne hai uno, crearne uno nuovo

    Screenshot che mostra la scheda di gestione della data factory con la sezione Airflow selezionata.