DevOps для конвейера приема данных

В большинстве сценариев решение по приему данных представляет собой набор из скриптов, вызовов служб и управляющего всеми действиями конвейера. Из этой статьи вы узнаете, как применять методы DevOps к жизненному циклу разработки общего конвейера по приему данных, который готовит их для использования в модели машинного обучения. Конвейер создается с использованием таких служб Azure:

  • Фабрика данных Azure: считывает необработанные данные и осуществляет их подготовку.
  • Azure Databricks: запускает записную книжку Python, преобразующую данные.
  • Azure Pipelines: автоматизируют процесс непрерывной интеграции и разработки.

Рабочий процесс конвейера приема данных

Конвейер приема данных реализует следующий рабочий процесс:

  1. Необработанные данные считываются в конвейер Фабрики данных Azure (ADF).
  2. Конвейер ADF отправляет данные в кластер Azure Databricks, который запускает записную книжку Python для преобразования данных.
  3. Данные хранятся в контейнере больших двоичных объектов (BLOB), где их использует система Машинного обучения Azure для обучения модели.

data ingestion pipeline workflow

Обзор непрерывной интеграции и доставки

Как и в случае других программных решений, над этим тоже работает группа (например, специалисты по данным). Они ведут совместную работу и вместе используют одни и те же ресурсы Azure, такие как Фабрику данных Azure, Azure Databricks и учетные записи службы хранилища Azure. Коллекция этих ресурсов является средой разработки. Специалисты по данным развивают ту же базу исходного кода.

Система непрерывной интеграции и доставки автоматизирует процесс сборки, тестирования и доставки (развертывания) решения. Процесс непрерывной интеграции (CI) выполняет следующие задачи:

  • Сборка программного кода
  • Проверка с помощью тестов на качество кода
  • Выполнение тестов модулей
  • Создание артефактов, например, протестированного кода и шаблонов Azure Resource Manager

Процесс непрерывной поставки (CD) развертывает эти артефакты в нижестоящих средах.

cicd data ingestion diagram

В этой статье показано, как автоматизировать процессы CI и CD с помощью Azure Pipelines.

Управление системой управления версиями

Управление системой управления версиями требуется для контроля изменений и обеспечения совместной работы между членами команды. Например, код хранится в репозитории Azure DevOps, GitHub или GitLab. Рабочий процесс совместной работы основан на модели ветвления.

Исходный код Python Notebook

Специалисты по данным работают с исходным кодом для записной книжки Python локально в интегрированной среде разработки (например, в Visual Studio Code) или непосредственно в рабочей области Databricks. После внесения изменений в код они объединяются в репозиторий с последующим применением политики ветвления.

Совет

Мы рекомендуем хранить код в файлах .py, а не в формате .ipynb Jupyter Notebook. Он повышает удобочитаемость кода и включает автоматическую проверку его качества в процессе непрерывной интеграции.

Исходный код Фабрики данных Azure

Исходный код конвейеров Фабрики данных Azure — это коллекция JSON-файлов, созданных рабочей областью этой системы. Обычно специалисты по данным используют визуальный конструктор в рабочей области Фабрики данных Azure, а не работают напрямую с файлами исходного кода.

Сведения о настройке рабочей области для использования репозитория системы управления версиями см. в разделе Author with Azure Repos Git integration.

Непрерывная интеграция (CI)

Основная цель процесса непрерывной интеграции состоит в том, чтобы объединить совместную работу группы по исходному коду и подготовить этот код к развертыванию в нижестоящих средах. Как и в случае с управлением исходным кодом, этот процесс отличается для записных книжек Python и конвейеров Фабрики данных Azure.

Python Notebook по непрерывной интеграции

Процесс CI для записных книжек Python получает код из ветви совместной работы (например, master или develop) и выполняет следующие действия:

  • Сведение кода
  • Модульное тестирование
  • Сохранение кода как артефакта

В следующем фрагменте кода показано, как реализовать эти шаги в конвейере Azure DevOps yaml:

steps:
- script: |
   flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
  workingDirectory: '$(Build.SourcesDirectory)'
  displayName: 'Run flake8 (code style analysis)'  
  
- script: |
   python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
  displayName: 'Run unit tests'

- task: PublishTestResults@2
  condition: succeededOrFailed()
  inputs:
    testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
    testRunTitle: 'Linting & Unit tests'
    failTaskOnFailedTests: true
  displayName: 'Publish linting and unit test results'

- publish: $(Build.SourcesDirectory)
    artifact: di-notebooks

Конвейер использует flake8 для работы с кодом сведения Python. Он выполняет модульные тесты, определенные в исходном коде, и публикует сведение и результаты тестов, чтобы они были доступны на экране выполнения Azure Pipelines.

Если сведение и модульное тестирование выполнены успешно, конвейер скопирует исходный код в репозиторий артефактов для применения при последующих шагах развертывания.

Непрерывная интеграция Фабрики данных Azure

Процесс непрерывной интеграции для конвейера Фабрики данных Azure является узким местом конвейера приема данных. Непрерывная интеграция здесь отсутствует. Развертываемый артефакт Фабрики данных Azure — это коллекция шаблонов Azure Resource Manager. Единственный способ создать эти шаблоны — нажать кнопку Опубликовать в рабочей области Фабрики данных Azure.

  1. Специалисты по данным объединяют исходный код из компонентов в ветвь совместной работы, например master или develop.
  2. Пользователь с соответствующими правами нажимает кнопку Опубликовать, чтобы создать шаблоны Azure Resource Manager из исходного кода в ветви совместной работы.
  3. Рабочая область проверяет конвейеры (с учетом сведения и модульного тестирования), создает шаблоны Azure Resource Manager (считайте это построением) и сохраняет созданные шаблоны в техническом ветви adf_publish в том же репозитории кода (считайте это артефактами публикации). Эта ветвь создается автоматически рабочей областью Фабрики данных Azure.

Дополнительные сведения об этом процессе см. в статье "Непрерывная интеграция и доставка в Фабрике данных Azure".

Важно убедиться, что созданные шаблоны Azure Resource Manager не зависят от среды. Это означает, что все отличимые в разных средах значения являются параметризованным. Фабрика данных Azure достаточно интеллектуальна для предоставления большинства таких значений в качестве параметров. Например, в следующем шаблоне свойства подключения к рабочей области "Машинное обучение Azure" предоставляются в виде параметров:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        "AzureMLService_servicePrincipalKey": {
            "value": ""
        },
        "AzureMLService_properties_typeProperties_subscriptionId": {
            "value": "0fe1c235-5cfa-4152-17d7-5dff45a8d4ba"
        },
        "AzureMLService_properties_typeProperties_resourceGroupName": {
            "value": "devops-ds-rg"
        },
        "AzureMLService_properties_typeProperties_servicePrincipalId": {
            "value": "6e35e589-3b22-4edb-89d0-2ab7fc08d488"
        },
        "AzureMLService_properties_typeProperties_tenant": {
            "value": "72f988bf-86f1-41af-912b-2d7cd611db47"
        }
    }
}

Тем не менее, вам могут понадобиться пользовательские свойства, которые не обрабатываются рабочей областью Фабрики данных Azure по умолчанию. В сценарии этой статьи конвейер Фабрики данных Azure вызывает записную книжку Python, обрабатывающую данные. Записная книжка принимает параметр с именем входного файла данных.

import pandas as pd
import numpy as np

data_file_name = getArgument("data_file_name")
data = pd.read_csv(data_file_name)

labels = np.array(data['target'])
...

Это имя отличается для сред Dev, QA, UAT и PROD. В сложном конвейере с несколькими действиями может быть несколько пользовательских свойств. Рекомендуется объединить все эти значения в одно целое и определить их как конвейер variables:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Variables tab selected below with the option to add new variables, each with a name, type, and default value.

Действия конвейера могут ссылаться на переменные конвейера при их фактическом использовании:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Settings tab selected below.

В рабочей области Фабрики данных Azure не предоставляются переменные конвейера по умолчанию в качестве параметров шаблонов Azure Resource Manager. В рабочей области используется шаблон параметризации по умолчанию, определяющий, какие свойства конвейера должны предоставляться как параметры шаблона Azure Resource Manager. Чтобы добавить в список переменные конвейера, обновите "Microsoft.DataFactory/factories/pipelines"раздел Шаблона параметризации по умолчанию, используя следующий фрагмент кода, и поместите файл result JSON в корень исходной папки:

"Microsoft.DataFactory/factories/pipelines": {
        "properties": {
            "variables": {
                "*": {
                    "defaultValue": "="
                }
            }
        }
    }

При этом Рабочая область Фабрики данных Azure будет добавлять переменные в список параметров при нажатии кнопки Опубликовать:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        ...
        "data-ingestion-pipeline_properties_variables_data_file_name_defaultValue": {
            "value": "driver_prediction_train.csv"
        }        
    }
}

Значения в файле JSON являются значениями по умолчанию, настроенными в определении конвейера. Они должны переопределяться значениями целевой среды при развертывании шаблона Azure Resource Manager.

Непрерывная поставка (CD)

Процесс непрерывной доставки принимает артефакты и развертывает их в первую целевую среду. Это гарантирует работоспособность решения путем тестирования. В случае успешного выполнения он переходит к следующей среде.

Процесс CD в Azure Pipelines состоит из нескольких этапов, представляющих среды. На каждом этапе имеются развертывания и задания для выполнения таких шагов:

  • Развертывание Python Notebook в Azure в рабочей области Databricks
  • Развертывание конвейера Фабрики данных Azure.
  • Запуск конвейера
  • Проверка результата приема данных

Эти этапы конвейера можно настраивать с помощью утверждений и шлюзов, которые обеспечивают дополнительный контроль над прохождением развертывания через цепочку сред.

Развертывание Python Notebook

В следующем фрагменте кода определяется развертывание конвейера Azure, которое копирует записную книжку Python в кластер Databricks:

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'       

Артефакты, созданные непрерывной интеграцией CI, автоматически копируются в агент развертывания и доступны в папке $(Pipeline.Workspace). В этом случае задача развертывания ссылается на артефакт di-notebooks с записной книжкой Python. В этом развертывании для копирования файлов записной книжки в рабочую область Databricks используется расширение Databricks Azure DevOps.

Этап Deploy_to_QA содержит ссылку на группу переменных devops-ds-qa-vg, определенную в проекте Azure DevOps. Действия на этом этапе относятся к переменным из этой группы переменных (например, $(DATABRICKS_URL) и $(DATABRICKS_TOKEN)). Идея состоит в том, что следующий этап (например, Deploy_to_UAT) будет действовать с теми же именами переменных, которые определены в отдельной группе переменных с областью действия в UAT.

Развертывание конвейера Фабрики данных Azure.

Развертываемый артефакт Фабрики данных Azure — это шаблон Azure Resource Manager. Он развертывается вместе с задачей развертывания группы ресурсов Azure, как показано в следующем фрагменте кода:

  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"

Значение параметра файла данных filename берется из переменной $(DATA_FILE_NAME), определенной в группе переменных этапа контроля качества. Аналогичным образом все параметры, определенные в ARMTemplateForFactory.json, можно переопределить. Если этого сделать нельзя — используются значения по умолчанию.

Запуск конвейера и проверка результата приема данных

Следующий шаг — убедиться, что развернутое решение работает. Ниже приводится определение задания, которое запускает конвейер Фабрики данных Azure с помощью скрипта PowerShell и выполняет записную книжку Python в кластере Azure Databricks. Записная книжка проверяет правильность приема данных и файла данных результатов по имени $(bin_FILE_NAME).

  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'

Последняя задача в задании проверяет результат выполнения записной книжки. Если она возвращает ошибку, то устанавливает для состояния выполнения конвейера значение Failed.

Совместное размещение элементов

Полный конвейер Azure CI/CD состоит из следующих этапов.

  • CI
  • Развертывание в QA
    • Развертывание в Databricks и развертывание в ADF
    • Тест интеграции

Конвейер содержит несколько этапов Deploy по количеству имеющихся целевых сред. Каждая стадия Deploy содержит два развертывания, которые выполняются параллельно, и задание, которое запускается после развертывания с целью тестирования решения в среде.

Пример реализации конвейера приводится в нижеследующем фрагменте кода yaml:

variables:
- group: devops-ds-vg

stages:
- stage: 'CI'
  displayName: 'CI'
  jobs:
  - job: "CI_Job"
    displayName: "CI Job"
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'
    - script: pip install --upgrade flake8 flake8_formatter_junit_xml
      displayName: 'Install flake8'
    - checkout: self
    - script: |
       flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
    workingDirectory: '$(Build.SourcesDirectory)'
    displayName: 'Run flake8 (code style analysis)'  
    - script: |
       python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
    displayName: 'Run unit tests'
    - task: PublishTestResults@2
    condition: succeededOrFailed()
    inputs:
        testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
        testRunTitle: 'Linting & Unit tests'
        failTaskOnFailedTests: true
    displayName: 'Publish linting and unit test results'    

    # The CI stage produces two artifacts (notebooks and ADF pipelines).
    # The pipelines Azure Resource Manager templates are stored in a technical branch "adf_publish"
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/code/dataingestion
      artifact: di-notebooks
    - checkout: git://${{variables['System.TeamProject']}}@adf_publish    
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/devops-ds-adf
      artifact: adf-pipelines

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'             
  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'                

Следующие шаги