Ветвление и создание цепочки действий в конвейере Фабрики данных Azure с помощью портала Azure

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

В этом руководстве создается конвейер фабрики данных, который демонстрирует некоторые функции потока управления. Этот конвейер просто копирует данные из контейнера в хранилище BLOB-объектов Azure в другой контейнер в той же учетной записи хранения. Если действие копирования завершается успешно, конвейер отправляет по электронной почте подробную информацию об успешной операции копирования (например, количество записанных данных). Если происходит сбой действия копирования, конвейер отправляет по электронной почте данные об ошибке копирования (например, текст сообщения об ошибке). В этом руководстве вы научитесь передавать параметры.

Обзор основных элементов сценария. Diagram shows Azure Blob Storage, which is the target of a copy, which, on success, sends an email with details or, on failure, sends an email with error details.

В этом руководстве вы выполните следующие шаги:

  • Создали фабрику данных.
  • Создание связанной службы хранилища Azure.
  • Создание набора данных больших двоичных объектов Azure
  • Создание конвейера, содержащего действия копирования и веб-действие.
  • Отправка выходных данных действий для последующих действий.
  • Использование передачи параметров и системных переменных.
  • Запуск конвейера.
  • Мониторинг конвейера и выполнения действий.

В этом руководстве используется портал Azure. Вы можете использовать другие механизмы для взаимодействия с фабрикой данных Azure (см. раздел "Быстрое начало работы" в оглавлении).

Предварительные требования

  • Подписка Azure. Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.
  • Учетная запись хранения Azure. В этом руководстве в качестве источника будет использоваться хранилище BLOB-объектов. Если у вас нет учетной записи хранения Azure, ознакомьтесь с разделом Создание учетной записи хранения.
  • База данных SQL Azure. Используйте базу данных как хранилище данных-приемник. Если у вас нет базы данных в службе "База данных SQL Azure", вы можете создать ее, выполнив инструкции из статьи Создание отдельной базы данных в Базе данных SQL Azure.

Создание таблицы больших двоичных объектов

  1. Запустите Блокнот. Скопируйте следующий текст и сохраните его в файл input.txt на диске.

    John,Doe
    Jane,Doe
    
  2. Используйте специальные инструменты, например Обозреватель службы хранилища Azure, чтобы выполнить следующие действия.

    1. Создайте контейнер adfv2branch.
    2. Создайте папку input в контейнере adfv2branch.
    3. Передайте файл input.txt в этот контейнер.

Создание конечных точек рабочего процесса электронной почты

Чтобы инициировать отправку сообщения электронной почты из конвейера, определите рабочий процесс с помощью Logic Apps. Сведения о создании рабочего процесса приложения логики см. в статье Создание первого рабочего процесса приложения логики для автоматизации процессов между облачными приложениями и облачными службами.

Рабочий процесс успешной отправки сообщения электронной почты

Создайте рабочий процесс приложения логики с именем CopySuccessEmail. Определите триггер рабочего процесса When an HTTP request is received и добавьте действие Office 365 Outlook – Send an email.

Success email workflow

Для триггера запроса укажите в Request Body JSON Schema следующий фрагмент кода JSON:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

В конструкторе приложения логики запрос должен выглядеть так, как на следующем изображении:

Logic App designer - request

Для действия Отправка электронного сообщения настройте способ форматирования электронного сообщения, используя свойства, переданные в схеме запроса текста JSON. Например:

Logic App designer - send email action

Сохраните рабочий процесс. Запишите URL-адрес запроса HTTP Post рабочего процесса успешной отправки электронной почты:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Рабочий процесс сбоя отправки сообщения электронной почты

Повторите этот же процесс, чтобы создать еще один рабочий процесс Logic Apps с именем CopyFailEmail. В триггере запроса действие Request Body JSON schema такое же. Измените формат сообщения электронной почты, например поле Subject, чтобы создать другое сообщение на случай сбоя. Например:

Logic App designer - fail email workflow

Сохраните рабочий процесс. Запишите URL-адрес запроса HTTP Post для рабочего процесса успешной отправки электронной почты:

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Теперь у вас будут два URL-адреса рабочих процессов.

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Создание фабрики данных

  1. Запустите веб-браузер Microsoft Edge или Google Chrome. Сейчас только эти браузеры поддерживают пользовательский интерфейс фабрики данных.

  2. В меню слева выберите Создать ресурс>Данные и аналитика>Фабрика данных.

    Data Factory selection in the "New" pane

  3. На странице Новая фабрика данных введите ADFTutorialDataFactory в поле Имя.

    New data factory page

    Имя фабрики данных Azure должно быть глобально уникальным. При возникновении указанной ниже ошибки измените имя фабрики данных (например, на ваше_имя_ADFTutorialDataFactory) и попробуйте создать фабрику данных снова. Ознакомьтесь со статьей Фабрика данных Azure — правила именования, чтобы узнать правила именования для артефактов службы "Фабрика данных".

    Имя фабрики данных ADFTutorialDataFactory недоступно.

  4. Выберите подписку Azure, в рамках которой вы хотите создать фабрику данных.

  5. Для группы ресурсов выполните одно из следующих действий.

  6. Укажите V2 при выборе версии.

  7. Укажите расположение фабрики данных. В раскрывающемся списке отображаются только поддерживаемые расположения. Хранилища данных (служба хранилища Azure, база данных SQL Azure и т. д.) и вычисления (HDInsight и т. д.), используемые фабрикой данных, могут располагаться в других регионах.

  8. Кроме того, установите флажок Закрепить на панели мониторинга.

  9. Нажмите кнопку Создать.

  10. На панели мониторинга вы увидите приведенный ниже элемент с состоянием Deploying data factory (Развертывание фабрики данных).

    deploying data factory tile

  11. Когда завершится создание, откроется страница Фабрика данных, как показано на рисунке ниже.

    Data factory home page

  12. Щелкните плитку Создание и мониторинг, чтобы открыть на отдельной вкладке пользовательский интерфейс Фабрики данных Azure.

Создание конвейера

На этом этапе вы создадите конвейер с одним действием копирования и двумя веб-действиями. Для этого конвейера вам потребуются следующие компоненты:

  • параметры конвейера, определяющие доступ на основе наборов данных;
  • веб-действие для вызова рабочих процессов приложений логики, которые отправляют электронные письма при успешном завершении или при сбое;
  • связи между разными действиями (после успешного выполнения и сбоя);
  • использование выходных данных действия в качестве входных данных для последующего действия.
  1. На домашней странице пользовательского интерфейса Фабрики данных выберите элемент Orchestrate (Оркестрация).

    Screenshot that shows the ADF home page.

  2. В окне свойств для конвейера перейдите на вкладку Параметры и нажмите кнопку Создать, чтобы добавить три строковых параметра sourceBlobContainer, sinkBlobContainer и receiver.

    • Параметр sourceBlobContainer в конвейере используется в наборе данных для исходного BLOB-объекта.
    • Параметр sinkBlobContainer в конвейере используется в наборе данных для целевого BLOB-объекта.
    • Параметр receiver используется в конвейере двумя веб-действиями, которые отправляют сообщения электронной почты об успешном выполнении или сбое на адрес электронной почты, указанный в этом параметре.

    New pipeline menu

  3. На панели элементов Действия разверните Потоки данных и перетащите действие Копирование в область конструктора конвейера.

    Drag-drop copy activity

  4. В окне Свойства для второго действия Копирование в нижней части страницы перейдите на вкладку Источник и щелкните + Создать. На этом шаге вы создаете исходный набор данных для действия копирования.

    Screenshot that shows how to create a source dataset for teh copy activity.

  5. В окне Новый набор данных выберите Хранилище BLOB-объектов Azure и щелкните Готово.

    Select Azure Blob Storage

  6. Вы увидите новую вкладку с именем AzureBlob1. Измените имя этого набора данных на SourceBlobDataset.

    Dataset general settings

  7. Перейдите на вкладку Подключение в окне Свойства и щелкните ссылку создания связанной службы. Это действие создает связанную службу, которая свяжет учетную запись хранения Azure с фабрикой данных.

    Dataset connection - new linked service

  8. В окне New Linked Service (Новая связанная служба) выполните следующие действия:

    1. Введите AzureStorageLinkedService в поле имени.
    2. Выберите учетную запись хранения Azure в списке Имя учетной записи хранения.
    3. Выберите команду Сохранить.

    New Azure Storage linked service

  9. Введите @pipeline().parameters.sourceBlobContainer в качестве имени папки и emp.txt в качестве имени файла. Параметр sourceBlobContainer в конвейере позволяет задать для набора данных путь к папке с данными.

    Source dataset settings

  10. Перейдите на вкладку Конвейер или щелкните конвейер в представлении в виде дерева. Убедитесь, что в списке Source Dataset (Исходный набор данных) выбрано значение SourceBlobDataset.

Source dataset

  1. В окне свойств перейдите на вкладку Приемник и нажмите кнопку + Создать в поле Sink Dataset (Целевой набор данных). В этом шаге вы создадите целевой набор данных, точно так же, как исходный набор данных ранее.

    New sink dataset button

  2. В окне Новый набор данных выберите Хранилище BLOB-объектов Azure и щелкните Готово.

  3. На странице настроек Общие для набора данных введите SinkBlobDataset в поле Имя.

  4. Перейдите на вкладку Подключения и выполните следующие действия:

    1. Выберите AzureStorageLinkedService в списке Связанная служба.

    2. Введите имя папки @pipeline().parameters.sinkBlobContainer.

    3. Введите имя файла @CONCAT(pipeline().RunId, '.txt'). Это выражение использует идентификатор текущего запуска конвейера для создания имени файла. Списки поддерживаемых системных переменных и выражений вы найдете в статьях Системные переменные, поддерживаемые в фабрике данных Azure и Выражения и функции в фабрике данных Azure.

      Sink dataset settings

  5. Перейдите на вкладку Конвейер вверху. Разверните Общие в панели элементов Действия и перетащите действие Веб в область конструктора конвейера. Присвойте этому действию имя SendSuccessEmailActivity. Веб-действие разрешает выполнять вызов любой конечной точки REST. Дополнительные сведения о действиях см. в статье Веб-действие в фабрике данных Azure. Этот конвейер использует веб-действие для вызова рабочего процесса электронной почты Logic Apps.

    Drag-drop first Web activity

  6. Перейдите на вкладку Настройки из вкладки Общие и выполните следующие действия:

    1. В поле URL-адрес укажите URL-адрес для рабочего процесса приложений логики, который отправляет сообщение электронной почты об успешном выполнении.

    2. Выберите POST в поле Метод.

    3. Щелкните ссылку + Добавить заголовок в разделе Заголовки.

    4. Добавьте заголовок с именем Content-Type и значением application/json.

    5. Внесите следующий код JSON в поле Текст.

      {
          "message": "@{activity('Copy1').output.dataWritten}",
          "dataFactoryName": "@{pipeline().DataFactory}",
          "pipelineName": "@{pipeline().Pipeline}",
          "receiver": "@pipeline().parameters.receiver"
      }
      

      Текст сообщения содержит следующие свойства.

      • Сообщение — передает значение @{activity('Copy1').output.dataWritten. Обращается к свойству предыдущего действия копирования и передает значение dataWritten. В случае сбоя передает выходные данные ошибки вместо @{activity('CopyBlobtoBlob').error.message.

      • Имя фабрики данных — передает значение @{pipeline().DataFactory}. Это системная переменная, которая позволяет получить доступ к соответствующему имени фабрики данных. Список поддерживаемых системных переменных см. в статье Системные переменные, поддерживаемые фабрикой данных Azure.

      • Имя конвейера — передает значение @{pipeline().Pipeline}. Это системная переменная, которая позволяет обращаться к соответствующему имени конвейера.

      • Получатель — передает значение "@pipeline().parameters.receiver"). Получает доступ к параметрам конвейера.

        Settings for the first Web activity

  7. Подключите действие Копирование к веб-действию, перетащив зеленую кнопку рядом с действием копирования в блок веб-действия.

    Connect Copy activity with the first Web activity

  8. Перетащите второе Веб-действие из панели элементов "Действия" в область конструктора конвейера и задайте для него ИмяSendFailureEmailActivity.

    Name of the second Web activity

  9. Перейдите на вкладку Настройки и выполните здесь следующие действия:

    1. В поле URL-адрес укажите URL-адрес для рабочего процесса приложений логики, который отправляет сообщение электронной почты об ошибке при выполнении.

    2. Выберите POST в поле Метод.

    3. Щелкните ссылку + Добавить заголовок в разделе Заголовки.

    4. Добавьте заголовок с именем Content-Type и значением application/json.

    5. Внесите следующий код JSON в поле Текст.

      {
          "message": "@{activity('Copy1').error.message}",
          "dataFactoryName": "@{pipeline().DataFactory}",
          "pipelineName": "@{pipeline().Pipeline}",
          "receiver": "@pipeline().parameters.receiver"
      }
      

      Settings for the second Web activity

  10. Выберите действие Копирование в конструкторе конвейера и нажмите кнопку +->, затем выберите Ошибка.

    Screenshot that shows how to select Error on the Copy activity in the pipeline designer.

  11. Перетащите красную кнопку, расположенную рядом с действием копирования, на второе веб-действие SendFailureEmailActivity. Переставьте действия в области конструктора, чтобы конвейер выглядел примерно так:

    Full pipeline with all activities

  12. Чтобы проверить работу конвейера, нажмите кнопку Проверка на панели инструментов. Закройте окно Pipeline Validation Output (Выходные данные проверки конвейера), нажав кнопку >> .

    Validate pipeline

  13. Чтобы опубликовать сущности (наборы данных, конвейеры и т. д.) в службу фабрики данных, щелкните Опубликовать все. Дождитесь сообщения Successfully published (Публикация выполнена).

    Publish

Запуск конвейера, который будет выполнен успешно

  1. Чтобы запустить конвейер, щелкните Триггер на панели инструментов, а затем Trigger Now (Запустить сейчас).

    Trigger a pipeline run

  2. В окне Запуск конвейера выполните следующие действия.

    1. Введите adftutorial/adfv2branch/input в качестве значения для параметра sourceBlobContainer.

    2. Введите adftutorial/adfv2branch/output в качестве значения для параметра sinkBlobContainer.

    3. Введите адрес электронной почты для получателя.

    4. Нажмите кнопку Готово.

      Pipeline run parameters

Мониторинг успешного выполнения конвейера

  1. Чтобы отследить выполнение конвейера, перейдите на вкладку Мониторинг слева. Вы увидите здесь запуск конвейера, который вы ранее выполнили вручную. Нажмите кнопку Обновить, чтобы обновить этот список.

    Successful pipeline run

  2. Чтобы просмотреть запуски действий, связанные с этим запуском конвейера, щелкните первую ссылку в столбце Действия. Вы можете переключиться к предыдущему представлению, щелкнув раздел Конвейеры вверху страницы. Нажмите кнопку Обновить, чтобы обновить этот список.

    Screenshot that shows how to view the list of activity runs.

Запуск конвейера, который завершится сбоем

  1. Перейдите на вкладку Правка слева.

  2. Чтобы запустить конвейер, щелкните Триггер на панели инструментов, а затем Trigger Now (Запустить сейчас).

  3. В окне Запуск конвейера выполните следующие действия.

    1. Введите adftutorial/dummy/input в качестве значения для параметра sourceBlobContainer. Убедитесь, что папка dummy не существует в контейнере adftutorial.
    2. Введите adftutorial/dummy/output в качестве значения для параметра sinkBlobContainer.
    3. Введите адрес электронной почты для получателя.
    4. Нажмите кнопку Готово.

Отслеживание неудачного запуска конвейера.

  1. Чтобы отследить выполнение конвейера, перейдите на вкладку Мониторинг слева. Вы увидите здесь запуск конвейера, который вы ранее выполнили вручную. Нажмите кнопку Обновить, чтобы обновить этот список.

    Failure pipeline run

  2. Щелкните ссылку Ошибка рядом с запуском конвейера, чтобы просмотреть сведения об этой ошибке.

    Pipeline error

  3. Чтобы просмотреть запуски действий, связанные с этим запуском конвейера, щелкните первую ссылку в столбце Действия. Нажмите кнопку Обновить, чтобы обновить этот список. Обратите внимание, что действие копирования в конвейере завершилось сбоем. Веб-действие успешно выполнено, то есть отправило указанному получателю сообщение электронной почты о сбое.

    Activity runs

  4. Щелкните ссылку Ошибка в столбце Действия, чтобы просмотреть сведения об этой ошибке.

    Activity run error

Дальнейшие действия

В этом руководстве вы выполнили следующие шаги:

  • Создали фабрику данных.
  • Создание связанной службы хранилища Azure.
  • Создание набора данных больших двоичных объектов Azure
  • Создание конвейера, содержащего действия копирования и веб-действие.
  • Отправка выходных данных действий для последующих действий.
  • Использование передачи параметров и системных переменных.
  • Запуск конвейера.
  • Мониторинг конвейера и выполнения действий.

Теперь вы можете перейти к разделу ключевых концепций, чтобы получить дополнительные сведения о фабрике данных Azure.