Exécuter un pipeline existant avec le gestionnaire du flux de travail

S’APPLIQUE À : Azure Data Factory Azure Synapse Analytics

Conseil

Essayez Data Factory dans Microsoft Fabric, une solution d’analyse tout-en-un pour les entreprises. Microsoft Fabric couvre tous les aspects, du déplacement des données à la science des données, en passant par l’analyse en temps réel, l’aide à la décision et la création de rapports. Découvrez comment démarrer un nouvel essai gratuitement !

Remarque

Le gestionnaire du flux de travail est basé sur Apache Airflow.

Remarque

Le gestionnaire du flux de travail pour Azure Data Factory s’appuie sur l’application Apache Airflow open source. Vous trouverez de la documentation et d’autres tutoriels sur Airflow dans les pages Documentation ou Communauté d’Apache Airflow.

Les pipelines Data Factory fournissent plus de 100 connecteurs de source de données qui fournissent une intégration de données/des flux de données évolutifs et fiables. Il existe des scénarios dans lesquels vous souhaitez exécuter un pipeline de fabrique de données existant à partir de votre DAG Apache Airflow. Ce tutoriel vous explique comment faire exactement cela.

Prérequis

  • Abonnement Azure. Si vous n’avez pas d’abonnement Azure, créez un compte Azure gratuit avant de commencer.
  • Compte Azure Storage. Si vous ne possédez pas de compte de stockage, consultez l’article Créer un compte de stockage Azure pour découvrir comment en créer un. Vérifiez que le compte de stockage autorise l’accès provenant des réseaux sélectionnés uniquement.
  • Pipeline Azure Data Factory. Vous pouvez suivre l’un des tutoriels et créer un pipeline de fabrique de données au cas où vous n’en avez pas déjà un, ou en créer un en une seule sélection dans Démarrer et tester votre premier pipeline de fabrique de données.
  • Configurer un principal de service. Vous devez créer un principal de service ou en utiliser un qui existe déjà et lui octroyer l’autorisation d’exécuter le pipeline (par exemple, le rôle Contributeur dans la fabrique de données contenant les pipelines existants), même si l’environnement du gestionnaire du flux de travail et les pipelines sont dans la même fabrique de données. Vous devez obtenir l’ID client et la clé secrète client (clé d’API) du principal de service.

Étapes

  1. Créez un fichier Python adf.py avec le contenu ci-dessous :

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Vous devez créer la connexion à l’aide de l’interface utilisateur du gestionnaire du flux de travail Administration -> Connexions -> '+' -> Choisissez 'Type de connexion' comme 'Azure Data Factory', puis renseignez vos client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name et pipeline_name.

  2. Chargez le fichier adf.py dans votre stockage d’objets blob dans un dossier appelé DAGS.

  3. Importez le dossier DAGS dans votre environnement du gestionnaire du flux de travail. Si vous n’en avez pas, créez-en un

    Capture d’écran montrant l’onglet de gestion de la fabrique de données avec la section Airflow sélectionnée.