Meglévő folyamat futtatása a Workflow Orchestration Managerrel

A következőkre vonatkozik: Azure Data Factory Azure Synapse Analytics

Tipp.

Próbálja ki a Data Factoryt a Microsoft Fabricben, amely egy teljes körű elemzési megoldás a nagyvállalatok számára. A Microsoft Fabric az adattovábbítástól az adatelemzésig, a valós idejű elemzésig, az üzleti intelligenciáig és a jelentéskészítésig mindent lefed. Ismerje meg, hogyan indíthat új próbaverziót ingyenesen!

Feljegyzés

A Workflow Orchestration Managert az Apache Airflow működteti.

Feljegyzés

Az Azure Data Factory munkafolyamat-vezénylés-kezelője az Nyílt forráskód Apache Airflow-alkalmazásra támaszkodik. Az Airflow dokumentációja és további oktatóanyagai az Apache Airflow dokumentációjában vagy a közösségi oldalakon találhatók.

A Data Factory-folyamatok több mint 100 adatforrás-összekötőt biztosítanak, amelyek méretezhető és megbízható adatintegrációt/ adatfolyamokat biztosítanak. Vannak olyan helyzetek, amikor egy meglévő data factory-folyamatot szeretne futtatni az Apache Airflow DAG-ból. Ez az oktatóanyag bemutatja, hogyan teheti ezt meg.

Előfeltételek

  • Azure-előfizetés. Ha nem rendelkezik Azure-előfizetéssel, mindössze néhány perc alatt létrehozhat egy ingyenes Azure-fiókot a virtuális gép létrehozásának megkezdése előtt.
  • Egy Azure Storage-fiók. Ha még nem rendelkezik tárfiókkal, tekintse meg az Azure Storage-fiók létrehozásának lépéseit ismertető cikket. Győződjön meg arról, hogy a tárfiók csak a kiválasztott hálózatokról engedélyezi a hozzáférést.
  • Azure Data Factory-folyamat. Kövesse az oktatóanyagok bármelyikét, és hozzon létre egy új data factory-folyamatot, ha még nem rendelkezik ilyennel, vagy hozzon létre egyet az Első lépések területen , és próbálja ki az első data factory-folyamatot.
  • Szolgáltatásnév beállítása. Létre kell hoznia egy új szolgáltatásnevet, vagy egy meglévőt kell használnia, és engedélyt kell adnia neki a folyamat futtatásához (például közreműködői szerepkört abban az adat-előállítóban, ahol a meglévő folyamatok léteznek), még akkor is, ha a Munkafolyamat-vezényléskezelő környezet és a folyamatok ugyanabban az adat-előállítóban találhatók. Le kell kérnie a szolgáltatásnév ügyfél-azonosítóját és titkos ügyfélkulcsát (API-kulcsot).

Lépések

  1. Hozzon létre egy új Python-fájlt adf.py az alábbi tartalommal:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Létre kell hoznia a kapcsolatot a Workflow Orchestration Manager felhasználói felületén Rendszergazda –> Csatlakozás ions –> "+" –> Válassza a "Csatlakozás ion type" (Csatlakozás ion típus) lehetőséget azure Data Factoryként, majd töltse ki a client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name és pipeline_name.

  2. Töltse fel a adf.py fájlt a blobtárolóba egy DAGS nevű mappában.

  3. Importálja a DAGS mappát a Workflow Orchestration Manager-környezetbe. Ha nincs , hozzon létre egy újat

    Képernyőkép az data factory felügyeleti lapról, amelyen az Airflow szakasz van kiválasztva.