Надежное преобразование данных с помощью потоков данных для сопоставления

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Если вы еще не работали с фабрикой данных Azure, ознакомьтесь со статьей Введение в фабрику данных Azure.

В этом руководстве вы задействуете пользовательский интерфейс фабрики данных для создания конвейера, который копирует и преобразует данные из источника Data Lake Storage 2-го поколения в приемник Data Lake Storage 2-го поколения (разрешив обоим доступ только к выбранным сетям) с помощью потока данных для сопоставления в виртуальной сети, управляемой Фабрикой данных. Шаблон конфигурации, приведенный в этом кратком руководстве, можно расширить при преобразовании данных с использованием потоков данных для сопоставления.

Вот какие шаги выполняются в этом руководстве:

  • Создали фабрику данных.
  • создание конвейера с использованием действия "Поток данных";
  • создание потока данных для сопоставления с четырьмя преобразованиями;
  • тестовый запуск конвейера;
  • отслеживание действий потока данных.

Предварительные требования

  • Подписка Azure. Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.
  • Учетная запись хранения Azure. Для хранения данных источника и приемника используется Data Lake Storage. Если у вас нет учетной записи хранения, создайте ее, следуя действиям в этом разделе. Убедитесь, что получить доступ к учетной записи хранения можно только из выбранных сетей.

Файл moviesDB.csv, который будет преобразован в этом руководстве, который можно найти на сайте содержимого GitHub. Чтобы извлечь файл из GitHub, скопируйте его содержимое в любой текстовый редактор, а затем сохраните его на локальном компьютере в виде CSV-файла. Сведения о передаче файла в учетную запись хранения см. в статье Отправка BLOB-объектов с помощью портала Azure. В примерах будет использоваться контейнер под названием sample-data.

Создание фабрики данных

На этом этапе вы создадите фабрику данных и откроете пользовательский интерфейс службы "Фабрика данных" для создания конвейера в фабрике данных.

  1. Откройте Microsoft Edge или Google Chrome. Сейчас только эти браузеры поддерживают пользовательский интерфейс фабрики данных.

  2. В меню слева выберите Создать ресурс > Аналитика > Фабрика данных.

  3. На странице Новая фабрика данных в поле Имя введите ADFTutorialDataFactory.

    Имя фабрики данных должно быть глобально уникальным. Если вы увидите следующую ошибку касательно значения имени, введите другое имя фабрики данных (например, yournameADFTutorialDataFactory). Дополнительные сведения о правилах именования артефактов фабрики данных см. в статье Фабрика данных Azure — правила именования.

  4. Выберите подписку Azure, в рамках которой вы хотите создать фабрику данных.

  5. Для группы ресурсов выполните одно из следующих действий:

    • Выберите Использовать существующую и укажите существующую группу ресурсов в раскрывающемся списке.
    • Выберите Создать новую и укажите имя группы ресурсов.

    Сведения о группах ресурсов см. в статье Общие сведения об Azure Resource Manager.

  6. В качестве версии выберите V2.

  7. В поле Расположение выберите расположение фабрики данных. В раскрывающемся списке отображаются только поддерживаемые расположения. Хранилища данных (например, Служба хранилища Azure и База данных Azure SQL) и вычислительные ресурсы (например, Azure HDInsight), используемые фабрикой данных, могут располагаться в других регионах.

  8. Нажмите кнопку создания.

  9. После завершения создания вы увидите уведомление в центре уведомлений. Выберите Перейти к ресурсу, чтобы открыть страницу фабрики данных.

  10. Выберите Создание и мониторинг, чтобы запустить на отдельной вкладке пользовательский интерфейс фабрики данных.

Создание среды выполнения интеграции Azure IR в управляемой виртуальной сети Фабрики данных

На этом шаге вы создадите среду выполнения интеграции Azure IR и включите управляемую виртуальную сеть Фабрики данных.

  1. На портале Фабрики данных перейдите в раздел Управление и выберите Создать, чтобы создать среду Azure IR.

    Снимок экрана, на котором показано создание Azure IR.

  2. На странице Integration runtime setup (Настройка среды выполнения интеграции) выберите, какую среду выполнения интеграции следует создать на основе требуемых возможностей. По условиям этого руководства выберите Azure и нажмите кнопку Продолжить.

  3. Выберите Azure и щелкните Продолжить, чтобы создать среду выполнения интеграции Azure.

    Снимок экрана, на котором показана новая среда Azure IR.

  4. В разделе Virtual network configuration (Preview) (Конфигурация виртуальной сети (предварительная версия)) выберите Включить.

    Снимок экрана, на котором показано включение новой среды Azure IR.

  5. Щелкните Создать.

Создание конвейера с помощью действия потока данных

На этом этапе вы создадите конвейер, содержащий действие потока данных.

  1. На странице Начало работы выберите Create pipeline (Создать конвейер).

    Снимок экрана: создание конвейера.

  2. На панели свойств для конвейера введите TransformMovies в поле имени конвейера.

  3. На верхней панели фабрики переместите ползунок Отладка Потока данных во включенное положение. Режим отладки позволяет в интерактивном режиме тестировать логику преобразования в динамическом кластере Spark. Для прогрева кластеров потоков данных требуется от пяти до семи минут. Если планируется разработка потока данных, сначала включите отладку Потока данных. Дополнительные сведения см. в статье Режим отладки.

    Снимок экрана, на котором показано, где находится ползунок отладки Потока данных.

  4. В области Действия разверните меню Перемещение и преобразование. Перетащите действие Поток данных из области на холст конвейера.

  5. Во всплывающем окне Добавление потока данных выберите Создать поток данных, а затем — Поток данных для сопоставления. Завершив настройку, нажмите кнопку ОК.

    Снимок экрана, на котором показан поток данных для сопоставления.

  6. Присвойте потоку данных имя TransformMovies на странице свойств.

Встраивание логики преобразования в холст потока данных

После создания потока данных он будет автоматически отправлен на холст потока данных. На этом шаге будет создан поток данных, который извлекает файл MoviesDB.csv из хранилища Data Lake Storage и вычисляет среднюю оценку комедий с 1910 по 2000 гг. Затем этот файл будет записан обратно в хранилище Data Lake Storage.

Добавление преобразования источника

На этом шаге будет настроен источник Data Lake Storage 2-го поколения.

  1. На холсте потока данных добавьте источник, выбрав поле Добавить источник.

  2. Присвойте источнику имя MoviesDB. Выберите Создать, чтобы создать исходный набор данных.

  3. Выберите Azure Data Lake Storage 2-го поколения и нажмите кнопку Продолжить.

  4. Выберите DelimitedText и нажмите кнопку Продолжить.

  5. Присвойте набору данных имя MoviesDB. В раскрывающемся списке связанных служб выберите Создать.

  6. На экране создания связанной службы присвойте связанной службе Data Lake Storage Gen2 имя ADLSGen2 и укажите метод проверки подлинности. Затем введите учетные данные подключения. В этом руководстве для подключения к нашей учетной записи хранения используется ключ учетной записи.

  7. Обязательно включите режим Интерактивная разработка. Его включение может занять около минуты.

    Снимок экрана: режим "Интерактивная разработка".

  8. Выберите Проверить подключение. Это должно привести к сбою, так как учетная запись хранения не разрешает к себе доступ к ней без создания и утверждения частной конечной точки. В сообщении об ошибке должна присутствовать ссылка, по которой вы можете перейти к интерфейсу создания управляемой частной конечной точки. Кроме того, можно сразу открыть вкладку Управление и выполнить инструкции из этого раздела, чтобы создать управляемую частную конечную точку.

  9. Не закрывая это диалоговое окно, перейдите к учетной записи хранения.

  10. Следуйте инструкциям в этом разделе, чтобы утвердить частную ссылку.

  11. Вернитесь к диалоговому окну. Выберите Проверить соединение, а затем нажмите кнопку Создать, чтобы развернуть связанную службу.

  12. На экране создания набора данных в поле Путь к файлу введите расположение файла. В этом руководстве файл moviesDB.csv находится в контейнере sample-data. Так как файл содержит заголовки, установите флажок Использовать первую строку как заголовок. Выберите Из подключения/хранилища, чтобы импортировать схему заголовка непосредственно из файла в хранилище. Завершив настройку, нажмите кнопку ОК.

    Снимок экрана, на котором показан исходный путь.

  13. Если кластер отладки запущен, откройте вкладку Предварительный просмотр данных преобразования источника и нажмите кнопку Обновить, чтобы получить моментальный снимок данных. Предварительный просмотр данных дает возможность убедиться, что преобразование настроено правильно.

    Снимок экрана, на котором показана вкладка "Предварительный просмотр".

Создание управляемой частной конечной точки

Если вы не переходили по гиперссылке при проверке подключения, перейдите по указанному пути. Здесь вам нужно создать управляемую частную конечную точку, которая будет подключаться к созданной связанной службе.

  1. Перейдите на вкладку Управление.

    Примечание

    Вкладка Управление может быть доступна не для всех экземпляров фабрики данных. Если она не отображается, вы можете получить доступ к частным конечным точкам, выбрав Создание > Подключения > Частная конечная точка.

  2. Перейдите в раздел Managed private endpoints (Управляемые частные конечные точки).

  3. В разделе управляемых частных конечных точек выберите + Создать.

    Снимок экрана: кнопка "Создать" в разделе Managed private endpoints (Управляемые частные конечные точки).

  4. Выберите плитку Azure Data Lake Storage 2-го поколения в списке и нажмите кнопку Продолжить.

  5. Введите имя созданной учетной записи хранения.

  6. Нажмите кнопку создания.

  7. Через несколько секунд для созданной частной ссылки отобразится состояние ожидания утверждения.

  8. Выберите созданную частную конечную точку. Вы увидите гиперссылку, которая ведет к интерфейсу утверждения частной конечной точки на уровне учетной записи хранения.

    Снимок экрана, на котором показана область управления частной конечной точки.

  1. В разделе Параметры для учетной записи хранения выберите Подключения частных конечных точек.

  2. Установите флажок для созданной частной конечной точки и выберите Утвердить.

    Снимок экрана, на котором показана кнопка "Утвердить" для частной конечной точки.

  3. Добавьте описание и выберите Да.

  4. Вернитесь к разделу Managed private endpoints (Управляемые частные конечные точки) на вкладке Управление для Фабрики данных.

  5. Примерно через минуту отобразится сообщение об утверждении частной конечной точки.

Добавление преобразования фильтрации

  1. Выберите значок "плюс" рядом с узлом источника на холсте потока данных, чтобы добавить новое преобразование. Первое добавляемое преобразование — Фильтр.

    Снимок экрана, на котором показано добавление фильтра.

  2. Назовите преобразование фильтра FilterYears. Выберите поле "Выражение" рядом с полем Фильтр, чтобы открыть построитель выражений. Здесь нужно указать условие фильтрации.

    Снимок экрана, на котором показано поле FilterYears.

  3. Построитель выражений потока данных позволяет интерактивно создавать выражения для использования в различных преобразованиях. Выражения могут включать встроенные функции, столбцы из входной схемы и задаваемые пользователем параметры. Дополнительные сведения о построении выражений см. в статье Построитель выражений Потока данных.

    • В этом руководстве будут отфильтрованы фильмы в жанре комедии, которые вышли между 1910 и 2000 годами. Поскольку в настоящее время год является строкой, его необходимо преобразовать в целое число с помощью функции toInteger(). Используйте операторы "больше или равно" (>=) и "меньше или равно" (<=) для сравнения значений года с литералами 1910 и 2000. Объедините эти выражения с помощью оператора AND (&&). Выражение будет выглядеть следующим образом:

      toInteger(year) >= 1910 && toInteger(year) <= 2000

    • Чтобы узнать, какие фильмы являются комедиями, можно использовать функцию rlike(), позволяющую найти слово "комедия" в столбце жанров. Объедините выражение rlike с выражением сравнения года и получите следующее выражение:

      toInteger(year) >= 1910 && toInteger(year) <= 2000 && rlike(genres, 'Comedy')

    • Если кластер отладки активен, можно проверить логику, нажав кнопку Обновить, чтобы увидеть результат выражения в сравнении с входными данными. Реализовать эту логику с помощью языка выражений потока данных можно разными способами.

      Снимок экрана, на котором показано выражение фильтра.

    • После завершения работы с выражением нажмите кнопку Сохранить и завершить.

  4. Загрузите результаты предварительного просмотра данных, чтобы убедиться, что фильтр работает правильно.

    Снимок экрана, на котором показаны отфильтрованные данные в режиме предварительного просмотра.

Добавление преобразования статистической обработки

  1. Следующее преобразование, которое необходимо добавить, — статистическая обработка в модификаторе схемы.

    Снимок экрана, на котором показано добавление статистической обработки.

  2. Назовите преобразование статистической обработки AggregateComedyRatings. На вкладке Группировка выберите год из раскрывающегося списка, чтобы сгруппировать агрегаты по году, в котором вышел фильм.

    Снимок экрана, на котором показана группа агрегации.

  3. Перейдите на вкладку Статистическая обработка. В левом текстовом поле присвойте столбцу имя AverageComedyRating. Выберите правое поле выражения, чтобы ввести статистическое выражение с помощью построителя выражений.

    Снимок экрана, на котором показано имя столбца агрегации.

  4. Чтобы получить среднее значение столбца Оценка, используйте агрегатную функцию avg(). Так как оценка является строковым значением, а avg() принимает числовые входные данные, необходимо преобразовать значение в число с помощью функции toInteger(). Это выражение выглядит следующим образом:

    avg(toInteger(Rating))

  5. По завершении нажмите кнопку Сохранить и завершить.

    Снимок экрана, на котором показано сохранение статистической обработки.

  6. Откройте вкладку Предварительный просмотр данных, чтобы просмотреть выходные данные преобразования. Обратите внимание, что здесь есть только два столбца: year и AverageComedyRating.

Добавление преобразования приемника

  1. Затем необходимо добавить преобразование Приемник в разделе Назначение.

    Снимок экрана, на котором показано добавление приемника.

  2. Назовите приемник Sink. Нажмите Создать, чтобы создать набор данных приемника.

    Снимок экрана, на котором показано создание приемника.

  3. На странице Новый набор данных выберите Azure Data Lake Storage Gen2, а затем нажмите кнопку Продолжить.

  4. На странице Выбор формата выберите DelimitedText и нажмите кнопку Продолжить.

  5. Назовите приемный набор данных MoviesSink. Для связанной службы выберите ту же связанную службу ADLSGen2, которая была создана для преобразования источника. Укажите выходную папку для записи данных. В этом кратком руководстве мы записываем данные в папку output в контейнере sample-data. Папка не обязательно должна существовать заранее и может быть создана динамически. Установите флажок Использовать первую строку как заголовок и выберите значение Нет для параметра Импорт схемы. Щелкните ОК.

    Снимок экрана, на котором показан путь к приемнику.

Теперь создание потока данных завершено. Все готово для его запуска в конвейере.

Запуск и мониторинг потока данных

Перед публикацией можно выполнить отладку конвейера. На этом шаге будет активирован запуск отладки конвейера потока данных. При предварительном просмотре данных они не записываются, а в режиме отладки данные записываются в место назначения приемника.

  1. Перейдите на холст конвейера. Нажмите кнопку Отладка, чтобы запустить отладку.

  2. При отладке конвейера для действий Потока данных используется активный кластер отладки, но инициализация все равно занимает не менее минуты. Ход выполнения можно отслеживать с помощью вкладки Вывод. После успешного выполнения щелкните значок очков для просмотра сведений о выполнении.

  3. На странице сведений можно увидеть количество строк и время, потраченное на каждый шаг преобразования.

    Снимок экрана, на котором показан запуск мониторинга.

  4. Щелкните преобразование, чтобы получить подробные сведения о столбцах и секционировании данных.

Если все действия в этом кратком руководстве выполнены правильно, то в папку приемника должны быть записаны 83 строки и 2 столбца. Данные можно проверить, проверив хранилище BLOB-объектов.

Сводка

В этом руководстве вы задействуете пользовательский интерфейс фабрики данных для создания конвейера, который копирует и преобразует данные из источника Data Lake Storage 2-го поколения в приемник Data Lake Storage 2-го поколения (разрешив обоим доступ только к выбранным сетям) с помощью потока данных для сопоставления в виртуальной сети, управляемой Фабрикой данных.