DevOps para um pipeline de ingestão de dados

Na maioria dos cenários, uma solução de ingestão de dados é uma composição de scripts, invocações de serviço e um pipeline que orquestra todas as atividades. Neste artigo, você aprende a aplicar as práticas de DevOps ao ciclo de vida de desenvolvimento de um pipeline de ingestão de dados comum que prepara dados para treinamento de modelo de aprendizado de máquina. O pipeline é criado usando os seguintes serviços do Azure:

  • Azure Data Factory: lê os dados brutos e orquestra a preparação de dados.
  • Azure Databricks: executa um notebook Python que transforma os dados.
  • Azure Pipelines: automatiza um processo contínuo de integração e desenvolvimento.

Fluxo de trabalho do pipeline de ingestão de dados

O pipeline de ingestão de dados implementa o seguinte fluxo de trabalho:

  1. Os dados brutos são lidos em um pipeline do ADF (Azure Data Factory).
  2. O pipeline do ADF envia os dados para um cluster do Azure Databricks, que executa um notebook Python para transformar os dados.
  3. Os dados são armazenados em um contêiner de blob, onde podem ser usados pelo Azure Machine Learning para treinar um modelo.

data ingestion pipeline workflow

Visão geral de integração e entrega contínuas

Assim como ocorre com muitas soluções de software, há uma equipe (por exemplo, Engenheiros de Dados) trabalhando nisso. Eles colaboram e compartilham os mesmos recursos do Azure, como o Azure Data Factory, Azure Databricks e contas de Armazenamento do Azure. A coleção desses recursos é um ambiente de Desenvolvimento. Os engenheiros de dados contribuem para a mesma base de código-fonte.

Um sistema de integração e entrega contínuas automatiza o processo de criação, teste e fornecimento (implantação) da solução. O processo de CI (Integração Contínua) executa as seguintes tarefas:

  • Monta o código
  • Verifica com base nos testes de qualidade de código
  • Executar testes de unidade
  • Produz artefatos como código testado e modelos do Azure Resource Manager

O processo de CD (Entrega Contínua) implanta os artefatos nos ambientes downstream.

cicd data ingestion diagram

Este artigo demonstra como automatizar os processos de CI e CD com o Azure Pipelines.

Gerenciamento do controle do código-fonte

O gerenciamento de controle do código-fonte é necessário para acompanhar as alterações e habilitar a colaboração entre os membros da equipe. Por exemplo, o código seria armazenado em um repositório do Azure DevOps, GitHub ou GitLab. O fluxo de trabalho de colaboração baseia-se em um modelo de ramificação.

Código-fonte do Notebook Python

Os engenheiros de dados trabalham com o código-fonte do notebook Python localmente em um IDE (por exemplo, Visual Studio Code) ou diretamente no workspace do Databricks. Depois que as alterações de código são concluídas, elas são mescladas ao repositório após uma política de ramificação.

Dica

É recomendável armazenar o código em arquivos .py ao invés do formato Jupyter Notebook .ipynb. Ele melhora a capacidade de leitura do código e permite verificações automáticas de qualidade de código no processo de CI.

Código-Fonte do Azure Data Factory

O código-fonte dos pipelines do Azure Data Factory é uma coleção de arquivos JSON gerados por um workspace do Azure Data Factory. Normalmente, os engenheiros de dados trabalham com um designer visual no workspace do Azure Data Factory e não com os arquivos de código-fonte diretamente.

Para configurar o workspace para usar um repositório de controle do código-fonte, consulte Criar com a integração do Git ao Azure Repos.

CI (Integração contínua)

O objetivo final do processo de Integração Contínua é reunir o trabalho conjunto em equipe do código-fonte, e prepará-lo para a implantação nos ambientes downstream. Assim como no gerenciamento de código-fonte, esse processo é diferente para os notebooks Python e pipelines do Azure Data Factory.

CI do Notebook Python

O processo de CI para os Notebooks Python obtém o código do branch de colaboração (por exemplo,master ou develop) e executa as seguintes atividades:

  • Lint de código
  • Teste de unidade
  • Salva o código como um artefato

O trecho de código a seguir demonstra a implementação dessas etapas em um pipeline yaml do Azure DevOps:

steps:
- script: |
   flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
  workingDirectory: '$(Build.SourcesDirectory)'
  displayName: 'Run flake8 (code style analysis)'  
  
- script: |
   python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
  displayName: 'Run unit tests'

- task: PublishTestResults@2
  condition: succeededOrFailed()
  inputs:
    testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
    testRunTitle: 'Linting & Unit tests'
    failTaskOnFailedTests: true
  displayName: 'Publish linting and unit test results'

- publish: $(Build.SourcesDirectory)
    artifact: di-notebooks

O pipeline usa flake8 para fazer o lint do código Python. Ele executa os testes de unidade definidos no código-fonte e publica os resultados de lint e de teste para que fiquem disponíveis na tela de execução do Azure Pipelines.

Se os testes de unidade e lint forem bem-sucedidos, o pipeline copiará o código-fonte para o repositório de artefatos a ser usado pelas etapas de implantação subsequentes.

CI do Azure Data Factory

O processo de CI para um pipeline do Azure Data Factory é um gargalo para um pipeline de ingestão de dados. Não há integração contínua. Um artefato implantável do Azure Data Factory é uma coleção de modelos do Azure Resource Manager. A única maneira de produzir esses modelos é clicando no botão publicar no workspace do Azure Data Factory.

  1. Os engenheiros de dados mesclam o código-fonte de suas ramificações de recursos no branch de colaboração, por exemplo, master ou develop.
  2. Uma pessoa com as permissões concedidas clica no botão publicar para gerar modelos do Azure Resource Manager do código-fonte no branch de colaboração.
  3. O workspace valida os pipelines (considere isso como teste de unidade e lint), gera modelos do Azure Resource Manager (considere isso como compilação) e salva os modelos gerados em uma ramificação técnica adf_publish no mesmo repositório de código (considere isso como artefatos de publicação). Essa ramificação é criada automaticamente pelo workspace do Azure Data Factory.

Para obter mais informações sobre este processo, consulte Integração e entrega contínuas em Azure data Factory.

É importante garantir que os modelos gerados do Azure Resource Manager sejam independentes do ambiente. Isso significa que todos os valores que podem ser diferentes entre ambientes são parametrizados. O Azure Data Factory é inteligente o suficiente para expor a maioria dos valores como parâmetros. Por exemplo, no modelo a seguir, as propriedades de conexão com um workspace do Azure Machine Learning são expostas como parâmetros:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        "AzureMLService_servicePrincipalKey": {
            "value": ""
        },
        "AzureMLService_properties_typeProperties_subscriptionId": {
            "value": "0fe1c235-5cfa-4152-17d7-5dff45a8d4ba"
        },
        "AzureMLService_properties_typeProperties_resourceGroupName": {
            "value": "devops-ds-rg"
        },
        "AzureMLService_properties_typeProperties_servicePrincipalId": {
            "value": "6e35e589-3b22-4edb-89d0-2ab7fc08d488"
        },
        "AzureMLService_properties_typeProperties_tenant": {
            "value": "72f988bf-86f1-41af-912b-2d7cd611db47"
        }
    }
}

No entanto, talvez você queira expor suas propriedades personalizadas que não são manipuladas pelo workspace do Azure Data Factory por padrão. No cenário deste artigo, um pipeline do Azure Data Factory invoca um notebook Python que está processando os dados. O notebook aceita um parâmetro com o nome de um arquivo de dados de entrada.

import pandas as pd
import numpy as np

data_file_name = getArgument("data_file_name")
data = pd.read_csv(data_file_name)

labels = np.array(data['target'])
...

Esse nome é diferente para ambientes Dev, QA, UAT, e PROD Em um pipeline complexo com várias atividades, pode haver várias propriedades personalizadas. É uma boa prática coletar todos esses valores em um único lugar e defini-los como variables de pipeline:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Variables tab selected below with the option to add new variables, each with a name, type, and default value.

As atividades de pipeline podem se referir às variáveis de pipeline ao usá-las:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Settings tab selected below.

O workspace do Azure Data Factory não expõe variáveis de pipeline como parâmetros de modelos do Azure Resource Manager por padrão. O workspace usa o Modelo de Parametrização Padrão que dita quais propriedades de pipeline devem ser expostas como parâmetros do modelo do Azure Resource Manager. Para adicionar variáveis de pipeline à lista, atualize a seção "Microsoft.DataFactory/factories/pipelines" do Modelo de Parametrização Padrão com o trecho de código a seguir, e coloque o arquivo json resultante na raiz da pasta de origem:

"Microsoft.DataFactory/factories/pipelines": {
        "properties": {
            "variables": {
                "*": {
                    "defaultValue": "="
                }
            }
        }
    }

Isso forçará o workspace do Azure Data Factory a adicionar as variáveis à lista de parâmetros quando o botão publicar for clicado:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        ...
        "data-ingestion-pipeline_properties_variables_data_file_name_defaultValue": {
            "value": "driver_prediction_train.csv"
        }        
    }
}

Os valores no arquivo JSON são valores padrão configurados na definição do pipeline. Eles devem ser substituídos pelos valores do ambiente de destino quando o modelo do Azure Resource Manager é implantado.

CD (Entrega contínua)

O processo de Entrega Contínua usa os artefatos e os implanta no primeiro ambiente de destino. Ele verifica se a solução funciona executando testes. Se for bem-sucedido, ele continuará no próximo ambiente.

O Azure Pipelines de CD consiste em várias fases que representam os ambientes. Cada estágio contém implantações e trabalhos que executam as seguintes etapas:

  • Implantar um Notebook Python no workspace do Azure Databricks
  • Implantar um pipeline do Azure Data Factory
  • Executar o pipeline
  • Verificar o resultado da ingestão de dados

Os estágios de pipeline podem ser configurados com aprovações e portões que fornecem controle adicional sobre como o processo de implantação evolui pela cadeia de ambientes.

Implantar um Notebook Python

O trecho de código a seguir define uma implantação de pipeline do Azure que copia um notebook Python para um cluster do Databricks:

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'       

Os artefatos produzidos pela CI são copiados automaticamente para o agente de implantação e estão disponíveis na pasta $(Pipeline.Workspace). Nesse caso, a tarefa de implantação refere-se ao artefato di-notebooks que contém o notebook Python. Essa implantação usa a extensão DevOps do Databricks do Azure para copiar os arquivos do notebook para o workspace do Databricks.

O estágio Deploy_to_QA contém uma referência ao grupo de variáveis devops-ds-qa-vg definido no Projeto do Azure DevOps. As etapas neste estágio referem-se às variáveis desse grupo de variáveis (por exemplo, $(DATABRICKS_URL) e $(DATABRICKS_TOKEN)). A ideia é que o próximo estágio (por exemplo, Deploy_to_UAT) opere com os mesmos nomes de variável definidos em seu próprio grupo de variáveis com escopo UAT.

Implantar um pipeline do Azure Data Factory

Um artefato implantável do Azure Data Factory é um modelo do Azure Resource Manager. Ele será implantado com a tarefa Implantação do Grupo de Recursos do Azure, conforme demonstrado no trecho de código a seguir:

  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"

O valor do parâmetro nome de arquivo de dados é derivado da variável $(DATA_FILE_NAME) definida em um grupo de variáveis na fase de garantia de qualidade. Da mesma forma, todos os parâmetros definidos em ARMTemplateForFactory.json podem ser substituídos. Caso contrário, os valores padrão serão usados.

Executar o pipeline e verificar o resultado da ingestão de dados

A próxima etapa é garantir que a solução implantada está funcionando. A definição de trabalho a seguir executa um pipeline do Azure Data Factory com um script do PowerShell e executa um notebook Python em um cluster do Azure Databricks. O notebook verifica se os dados foram ingeridos corretamente e valida o arquivo de dados resultante com o nome $(bin_FILE_NAME).

  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'

A tarefa final no trabalho verifica o resultado da execução do notebook. Se retornar um erro, ele define o status da execução do pipeline como com falha.

Como juntar as peças

O Pipeline do Azure de CI/CD completo consiste nos seguintes estágios:

  • CI
  • Implantar na Garantia de Qualidade
    • Implantar no Databricks + Implantar no ADF
    • Teste de Integração

Ele contém um número de estágios Deploy igual ao número de ambientes de destino que você tem. Cada estágio Deploy contém duas implantações que são executados em paralelo e um trabalho que é executado após implantações para testar a solução no ambiente.

Uma implementação de exemplo do pipeline é montada no trecho de códigoyaml a seguir:

variables:
- group: devops-ds-vg

stages:
- stage: 'CI'
  displayName: 'CI'
  jobs:
  - job: "CI_Job"
    displayName: "CI Job"
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'
    - script: pip install --upgrade flake8 flake8_formatter_junit_xml
      displayName: 'Install flake8'
    - checkout: self
    - script: |
       flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
    workingDirectory: '$(Build.SourcesDirectory)'
    displayName: 'Run flake8 (code style analysis)'  
    - script: |
       python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
    displayName: 'Run unit tests'
    - task: PublishTestResults@2
    condition: succeededOrFailed()
    inputs:
        testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
        testRunTitle: 'Linting & Unit tests'
        failTaskOnFailedTests: true
    displayName: 'Publish linting and unit test results'    

    # The CI stage produces two artifacts (notebooks and ADF pipelines).
    # The pipelines Azure Resource Manager templates are stored in a technical branch "adf_publish"
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/code/dataingestion
      artifact: di-notebooks
    - checkout: git://${{variables['System.TeamProject']}}@adf_publish    
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/devops-ds-adf
      artifact: adf-pipelines

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'             
  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'                

Próximas etapas