Meglévő folyamat futtatása a Workflow Orchestration Managerrel
A következőkre vonatkozik: Azure Data Factory Azure Synapse Analytics
Tipp.
Próbálja ki a Data Factoryt a Microsoft Fabricben, amely egy teljes körű elemzési megoldás a nagyvállalatok számára. A Microsoft Fabric az adattovábbítástól az adatelemzésig, a valós idejű elemzésig, az üzleti intelligenciáig és a jelentéskészítésig mindent lefed. Ismerje meg, hogyan indíthat új próbaverziót ingyenesen!
Feljegyzés
A Workflow Orchestration Managert az Apache Airflow működteti.
Feljegyzés
Az Azure Data Factory munkafolyamat-vezénylés-kezelője az Nyílt forráskód Apache Airflow-alkalmazásra támaszkodik. Az Airflow dokumentációja és további oktatóanyagai az Apache Airflow dokumentációjában vagy a közösségi oldalakon találhatók.
A Data Factory-folyamatok több mint 100 adatforrás-összekötőt biztosítanak, amelyek méretezhető és megbízható adatintegrációt/ adatfolyamokat biztosítanak. Vannak olyan helyzetek, amikor egy meglévő data factory-folyamatot szeretne futtatni az Apache Airflow DAG-ból. Ez az oktatóanyag bemutatja, hogyan teheti ezt meg.
Előfeltételek
- Azure-előfizetés. Ha nem rendelkezik Azure-előfizetéssel, mindössze néhány perc alatt létrehozhat egy ingyenes Azure-fiókot a virtuális gép létrehozásának megkezdése előtt.
- Egy Azure Storage-fiók. Ha még nem rendelkezik tárfiókkal, tekintse meg az Azure Storage-fiók létrehozásának lépéseit ismertető cikket. Győződjön meg arról, hogy a tárfiók csak a kiválasztott hálózatokról engedélyezi a hozzáférést.
- Azure Data Factory-folyamat. Kövesse az oktatóanyagok bármelyikét, és hozzon létre egy új data factory-folyamatot, ha még nem rendelkezik ilyennel, vagy hozzon létre egyet az Első lépések területen , és próbálja ki az első data factory-folyamatot.
- Szolgáltatásnév beállítása. Létre kell hoznia egy új szolgáltatásnevet, vagy egy meglévőt kell használnia, és engedélyt kell adnia neki a folyamat futtatásához (például közreműködői szerepkört abban az adat-előállítóban, ahol a meglévő folyamatok léteznek), még akkor is, ha a Munkafolyamat-vezényléskezelő környezet és a folyamatok ugyanabban az adat-előállítóban találhatók. Le kell kérnie a szolgáltatásnév ügyfél-azonosítóját és titkos ügyfélkulcsát (API-kulcsot).
Lépések
Hozzon létre egy új Python-fájlt adf.py az alábbi tartalommal:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
Létre kell hoznia a kapcsolatot a Workflow Orchestration Manager felhasználói felületén Rendszergazda –> Csatlakozás ions –> "+" –> Válassza a "Csatlakozás ion type" (Csatlakozás ion típus) lehetőséget azure Data Factoryként, majd töltse ki a client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name és pipeline_name.
Töltse fel a adf.py fájlt a blobtárolóba egy DAGS nevű mappában.
Importálja a DAGS mappát a Workflow Orchestration Manager-környezetbe. Ha nincs , hozzon létre egy újat