تشغيل مسار موجود باستخدام Workflow Orchestration Manager

ينطبق على:Azure Data Factory Azure Synapse Analytics

تلميح

جرب Data Factory في Microsoft Fabric، وهو حل تحليلي متكامل للمؤسسات. يغطي Microsoft Fabric كل شيء بدءا من حركة البيانات إلى علم البيانات والتحليلات في الوقت الحقيقي والمعلومات المهنية وإعداد التقارير. تعرف على كيفية بدء إصدار تجريبي جديد مجانا!

إشعار

يتم تشغيل إدارة تنسيق سير العمل بواسطة Apache Airflow.

إشعار

يعتمد مدير تنسيق سير العمل ل Azure Data Factory على تطبيق مصدر مفتوح Apache Airflow. يمكن العثور على وثائق والمزيد من البرامج التعليمية ل Airflow على وثائق Apache Airflow أو صفحات المجتمع.

توفر مسارات Data Factory أكثر من 100 موصل مصدر بيانات توفر تكامل البيانات/ تدفقات البيانات القابلة للتطوير والموثوقة. هناك سيناريوهات حيث ترغب في تشغيل مسار مصنع بيانات موجود من Apache Airflow DAG. يوضح لك هذا البرنامج التعليمي كيفية القيام بذلك فقط.

المتطلبات الأساسية

  • اشتراك Azure. إذا لم تكن مشتركًا في Azure، فيمكنك إنشاء حساب مجاني على Azure قبل البدء.
  • حساب Azure Storage. إذا لم يكن لديك حساب تخزين، فشاهد إنشاء حساب تخزين Azure للحصول على خطوات لإنشاء حساب. تأكد من أن حساب التخزين يسمح بالوصول فقط من الشبكات المحددة.
  • البنية الأساسية لبرنامج ربط العمليات التجارية ل Azure Data Factory. يمكنك متابعة أي من البرامج التعليمية وإنشاء مسار مصنع بيانات جديد في حالة عدم وجود واحد بالفعل، أو إنشاء واحد مع تحديد واحد في بدء الاستخدام وتجربة خط أنابيب مصنع البيانات الأول.
  • إعداد كيان الخدمة. ستحتاج إلى إنشاء كيان خدمة جديد أو استخدام كيان موجود ومنحه الإذن لتشغيل البنية الأساسية لبرنامج ربط العمليات التجارية (مثال - دور المساهم في مصنع البيانات حيث توجد المسارات الحالية)، حتى إذا كانت بيئة إدارة تنسيق سير العمل والتدفقات موجودة في نفس مصنع البيانات. ستحتاج إلى الحصول على معرف عميل كيان الخدمة وسر العميل (مفتاح API).

‏‏الخطوات

  1. إنشاء ملف Python جديد adf.py بالمحتويات أدناه:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    سيتعين عليك إنشاء الاتصال باستخدام مسؤول واجهة مستخدم إدارة تنسيق سير العمل -> الاتصال -> '+' -> اختر "نوع الاتصال" ك "Azure Data Factory"، ثم قم بتعبئة client_id client_secret tenant_id subscription_id resource_group_name data_factory_name pipeline_name.

  2. قم بتحميل ملف adf.py إلى تخزين الكائن الثنائي كبير الحجم داخل مجلد يسمى DAGS.

  3. استيراد مجلد DAGS إلى بيئة إدارة تنسيق سير العمل. إذا لم يكن لديك واحد، فبادر بإنشاء واحد جديد

    لقطة شاشة تعرض علامة تبويب إدارة مصنع البيانات مع تحديد قسم Airflow.