DevOps i en pipeline för datainmatning

I de flesta fall är en datainmatningslösning en sammansättning av skript, tjänstanrop och en pipeline som samordnar alla aktiviteter. I den här artikeln får du lära dig hur du tillämpar DevOps-metoder på utvecklingslivscykeln för en gemensam pipeline för datainmatning som förbereder data för maskininlärningsmodellträning. Pipelinen skapas med följande Azure-tjänster:

  • Azure Data Factory: Läser rådata och samordnar förberedelse av data.
  • Azure Databricks: Kör en Python-notebook-fil som transformerar data.
  • Azure Pipelines: Automatiserar en kontinuerlig integrerings- och utvecklingsprocess.

Arbetsflöde för datainmatningspipeline

Pipelinen för datainmatning implementerar följande arbetsflöde:

  1. Rådata läss in i en Azure Data Factory-pipeline (ADF).
  2. ADF-pipelinen skickar data till ett Azure Databricks-kluster, som kör en Python-notebook-fil för att transformera data.
  3. Data lagras i en blobcontainer, där de kan användas av Azure Machine Learning för att träna en modell.

data ingestion pipeline workflow

Översikt över kontinuerlig integrering och leverans

Precis som med många programvarulösningar finns det ett team (till exempel Dataingenjör) som arbetar med det. De samarbetar och delar samma Azure-resurser som Azure Data Factory, Azure Databricks och Azure Storage-konton. Samlingen av dessa resurser är en utvecklingsmiljö. Datateknikerna bidrar till samma källkodsbas.

Ett system för kontinuerlig integrering och leverans automatiserar processen med att skapa, testa och leverera (distribuera) lösningen. Processen för kontinuerlig integrering (CI) utför följande uppgifter:

  • Sammanställer koden
  • Kontrollerar det med kodkvalitetstesterna
  • Kör enhetstester
  • Skapar artefakter som testad kod och Azure Resource Manager-mallar

Processen kontinuerlig leverans (CD) distribuerar artefakterna till nedströmsmiljöerna.

cicd data ingestion diagram

Den här artikeln visar hur du automatiserar CI- och CD-processerna med Azure Pipelines.

Hantering av källkontroll

Källkontrollhantering krävs för att spåra ändringar och möjliggöra samarbete mellan gruppmedlemmar. Koden skulle till exempel lagras på en Azure DevOps-, GitHub- eller GitLab-lagringsplats. Arbetsflödet för samarbete baseras på en förgreningsmodell.

Python Notebook-källkod

Datatekniker arbetar med Python Notebook-källkoden antingen lokalt i en IDE (till exempel Visual Studio Code) eller direkt i Databricks-arbetsytan. När kodändringarna har slutförts sammanfogas de till lagringsplatsen efter en förgreningsprincip.

Dricks

Vi rekommenderar att du lagrar koden i filer i .py stället för i .ipynb Jupyter Notebook-format. Det förbättrar kodens läsbarhet och möjliggör automatiska kodkvalitetskontroller i CI-processen.

Källkod för Azure Data Factory

Källkoden för Azure Data Factory-pipelines är en samling JSON-filer som genereras av en Azure Data Factory-arbetsyta. Normalt arbetar datatekniker med en visuell designer på Azure Data Factory-arbetsytan i stället för med källkodsfilerna direkt.

Information om hur du konfigurerar arbetsytan för att använda en källkontrolllagringsplats finns i Redigera med Azure Repos Git-integrering.

Kontinuerlig integrering (CI)

Det slutliga målet med processen för kontinuerlig integrering är att samla in det gemensamma teamets arbete från källkoden och förbereda det för distributionen till de underordnade miljöerna. Precis som med källkodshanteringen skiljer sig den här processen åt för Python-notebook-filer och Azure Data Factory-pipelines.

Python Notebook CI

CI-processen för Python Notebooks hämtar koden från samarbetsgrenen (till exempel master eller develop) och utför följande aktiviteter:

  • Kodlintning
  • Enhetstestning
  • Spara koden som en artefakt

Följande kodfragment visar implementeringen av de här stegen i en Azure DevOps yaml-pipeline :

steps:
- script: |
   flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
  workingDirectory: '$(Build.SourcesDirectory)'
  displayName: 'Run flake8 (code style analysis)'  
  
- script: |
   python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
  displayName: 'Run unit tests'

- task: PublishTestResults@2
  condition: succeededOrFailed()
  inputs:
    testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
    testRunTitle: 'Linting & Unit tests'
    failTaskOnFailedTests: true
  displayName: 'Publish linting and unit test results'

- publish: $(Build.SourcesDirectory)
    artifact: di-notebooks

Pipelinen använder flake8 för att göra Python-kodlindning. Den kör enhetstesterna som definieras i källkoden och publicerar linting- och testresultaten så att de är tillgängliga på körningsskärmen för Azure Pipelines.

Om linting- och enhetstestningen lyckas kopierar pipelinen källkoden till artefaktlagringsplatsen som ska användas av efterföljande distributionssteg.

Azure Data Factory CI

CI-processen för en Azure Data Factory-pipeline är en flaskhals för en datainmatningspipeline. Det finns ingen kontinuerlig integrering. En distributionsbar artefakt för Azure Data Factory är en samling Azure Resource Manager-mallar. Det enda sättet att skapa dessa mallar är att klicka på publiceringsknappen på Azure Data Factory-arbetsytan.

  1. Datateknikerna sammanfogar källkoden från sina funktionsgrenar till samarbetsgrenen, till exempel master eller develop.
  2. Någon med de beviljade behörigheterna klickar på publiceringsknappen för att generera Azure Resource Manager-mallar från källkoden i samarbetsgrenen.
  3. Arbetsytan validerar pipelines (tänk på det som linting och enhetstestning), genererar Azure Resource Manager-mallar (tänk på det som att skapa) och sparar de genererade mallarna till en teknisk gren adf_publish på samma kodlagringsplats (tänk på det som publicering av artefakter). Den här grenen skapas automatiskt av Azure Data Factory-arbetsytan.

Mer information om den här processen finns i Kontinuerlig integrering och leverans i Azure Data Factory.

Det är viktigt att se till att de genererade Azure Resource Manager-mallarna är miljöagnostiska. Det innebär att alla värden som kan skilja sig åt mellan miljöer parametriseras. Azure Data Factory är tillräckligt smart för att exponera de flesta av dessa värden som parametrar. I följande mall exponeras till exempel anslutningsegenskaperna till en Azure Machine Learning-arbetsyta som parametrar:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        "AzureMLService_servicePrincipalKey": {
            "value": ""
        },
        "AzureMLService_properties_typeProperties_subscriptionId": {
            "value": "0fe1c235-5cfa-4152-17d7-5dff45a8d4ba"
        },
        "AzureMLService_properties_typeProperties_resourceGroupName": {
            "value": "devops-ds-rg"
        },
        "AzureMLService_properties_typeProperties_servicePrincipalId": {
            "value": "6e35e589-3b22-4edb-89d0-2ab7fc08d488"
        },
        "AzureMLService_properties_typeProperties_tenant": {
            "value": "72f988bf-86f1-41af-912b-2d7cd611db47"
        }
    }
}

Men du kanske vill exponera dina anpassade egenskaper som inte hanteras av Azure Data Factory-arbetsytan som standard. I scenariot med den här artikeln anropar en Azure Data Factory-pipeline en Python-notebook-fil som bearbetar data. Notebook-filen accepterar en parameter med namnet på en indatafil.

import pandas as pd
import numpy as np

data_file_name = getArgument("data_file_name")
data = pd.read_csv(data_file_name)

labels = np.array(data['target'])
...

Det här namnet skiljer sig åt för Miljöer med utveckling, QA, UAT och PROD. I en komplex pipeline med flera aktiviteter kan det finnas flera anpassade egenskaper. Det är bra att samla in alla dessa värden på ett ställe och definiera dem som pipelinevariabler:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Variables tab selected below with the option to add new variables, each with a name, type, and default value.

Pipelineaktiviteterna kan referera till pipelinevariablerna när de faktiskt används:

Screenshot shows a Notebook called PrepareData and M L Execute Pipeline called M L Execute Pipeline at the top with the Settings tab selected below.

Azure Data Factory-arbetsytan exponerar inte pipelinevariabler som Azure Resource Manager-mallparametrar som standard. Arbetsytan använder standardparametermallen som dikterar vilka pipelineegenskaper som ska exponeras som Azure Resource Manager-mallparametrar. Om du vill lägga till pipelinevariabler i listan uppdaterar "Microsoft.DataFactory/factories/pipelines" du avsnittet i standardparametermallen med följande kodfragment och placerar resultat-json-filen i roten i källmappen:

"Microsoft.DataFactory/factories/pipelines": {
        "properties": {
            "variables": {
                "*": {
                    "defaultValue": "="
                }
            }
        }
    }

Detta tvingar Azure Data Factory-arbetsytan att lägga till variablerna i parameterlistan när publiceringsknappen klickas på:

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "factoryName": {
            "value": "devops-ds-adf"
        },
        ...
        "data-ingestion-pipeline_properties_variables_data_file_name_defaultValue": {
            "value": "driver_prediction_train.csv"
        }        
    }
}

Värdena i JSON-filen är standardvärden som konfigurerats i pipelinedefinitionen. De förväntas åsidosättas med målmiljövärdena när Azure Resource Manager-mallen distribueras.

Kontinuerlig leverans (CD)

Processen kontinuerlig leverans tar artefakterna och distribuerar dem till den första målmiljön. Det ser till att lösningen fungerar genom att köra tester. Om det lyckas fortsätter den till nästa miljö.

CD Azure Pipelines består av flera steg som representerar miljöerna. Varje steg innehåller distributioner och jobb som utför följande steg:

  • Distribuera en Python Notebook till Azure Databricks-arbetsytan
  • Distribuera en Azure Data Factory-pipeline
  • Köra pipelinen
  • Kontrollera datainmatningsresultatet

Pipelinestegen kan konfigureras med godkännanden och grindar som ger ytterligare kontroll över hur distributionsprocessen utvecklas genom miljökedjan.

Distribuera en Python Notebook

Följande kodfragment definierar en Azure Pipeline-distribution som kopierar en Python-anteckningsbok till ett Databricks-kluster:

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'       

Artefakterna som skapas av CI kopieras automatiskt till distributionsagenten och är tillgängliga i $(Pipeline.Workspace) mappen. I det här fallet refererar distributionsuppgiften till artefakten di-notebooks som innehåller Python-notebook-filen. Den här distributionen använder Databricks Azure DevOps-tillägget för att kopiera notebook-filerna till Databricks-arbetsytan.

Deploy_to_QA Fasen innehåller en referens till variabelgruppen devops-ds-qa-vg som definierats i Azure DevOps-projektet. Stegen i det här steget refererar till variablerna från den här variabelgruppen (till exempel $(DATABRICKS_URL) och $(DATABRICKS_TOKEN)). Tanken är att nästa steg (till exempel Deploy_to_UAT) ska fungera med samma variabelnamn som definierats i en egen UAT-omfångsvariabelgrupp.

Distribuera en Azure Data Factory-pipeline

En distributionsbar artefakt för Azure Data Factory är en Azure Resource Manager-mall. Den kommer att distribueras med azure-resursgruppsdistributionsuppgiften, vilket visas i följande kodfragment:

  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"

Värdet för parametern datafilnamn kommer från variabeln $(DATA_FILE_NAME) som definierats i en variabelgrupp för QA-steg. På samma sätt kan alla parametrar som definierats i ARMTemplateForFactory.json åsidosättas. Om de inte är det används standardvärdena.

Kör pipelinen och kontrollera datainmatningsresultatet

Nästa steg är att se till att den distribuerade lösningen fungerar. Följande jobbdefinition kör en Azure Data Factory-pipeline med ett PowerShell-skript och kör en Python-notebook-fil i ett Azure Databricks-kluster. Notebook-filen kontrollerar om data har matats in korrekt och verifierar resultatdatafilen med $(bin_FILE_NAME) namn.

  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'

Den sista uppgiften i jobbet kontrollerar resultatet av notebook-körningen. Om det returnerar ett fel anges statusen för pipelinekörningen till misslyckad.

Sätta ihop bitar

Den fullständiga CI/CD Azure-pipelinen består av följande steg:

  • CI
  • Distribuera till QA
    • Distribuera till Databricks + Distribuera till ADF
    • Integrationstest

Den innehåller ett antal Distributionssteg som är lika med antalet målmiljöer som du har. Varje Distributionssteg innehåller två distributioner som körs parallellt och ett jobb som körs efter distributioner för att testa lösningen i miljön.

En exempelimplementering av pipelinen monteras i följande yaml-kodfragment:

variables:
- group: devops-ds-vg

stages:
- stage: 'CI'
  displayName: 'CI'
  jobs:
  - job: "CI_Job"
    displayName: "CI Job"
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'
    - script: pip install --upgrade flake8 flake8_formatter_junit_xml
      displayName: 'Install flake8'
    - checkout: self
    - script: |
       flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml  
    workingDirectory: '$(Build.SourcesDirectory)'
    displayName: 'Run flake8 (code style analysis)'  
    - script: |
       python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
    displayName: 'Run unit tests'
    - task: PublishTestResults@2
    condition: succeededOrFailed()
    inputs:
        testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
        testRunTitle: 'Linting & Unit tests'
        failTaskOnFailedTests: true
    displayName: 'Publish linting and unit test results'    

    # The CI stage produces two artifacts (notebooks and ADF pipelines).
    # The pipelines Azure Resource Manager templates are stored in a technical branch "adf_publish"
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/code/dataingestion
      artifact: di-notebooks
    - checkout: git://${{variables['System.TeamProject']}}@adf_publish    
    - publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/devops-ds-adf
      artifact: adf-pipelines

- stage: 'Deploy_to_QA'
  displayName: 'Deploy to QA'
  variables:
  - group: devops-ds-qa-vg
  jobs:
  - deployment: "Deploy_to_Databricks"
    displayName: 'Deploy to Databricks'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: UsePythonVersion@0
              inputs:
                versionSpec: '3.x'
                addToPath: true
                architecture: 'x64'
              displayName: 'Use Python3'

            - task: configuredatabricks@0
              inputs:
                url: '$(DATABRICKS_URL)'
                token: '$(DATABRICKS_TOKEN)'
              displayName: 'Configure Databricks CLI'    

            - task: deploynotebooks@0
              inputs:
                notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
                workspaceFolder: '/Shared/devops-ds'
              displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'             
  - deployment: "Deploy_to_ADF"
    displayName: 'Deploy to ADF'
    timeoutInMinutes: 0
    environment: qa
    strategy:
      runOnce:
        deploy:
          steps:
            - task: AzureResourceGroupDeployment@2
              displayName: 'Deploy ADF resources'
              inputs:
                azureSubscription: $(AZURE_RM_CONNECTION)
                resourceGroupName: $(RESOURCE_GROUP)
                location: $(LOCATION)
                csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
                csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
                overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
  - job: "Integration_test_job"
    displayName: "Integration test job"
    dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
    pool:
      vmImage: 'ubuntu-latest'
    timeoutInMinutes: 0
    steps:
    - task: AzurePowerShell@4
      displayName: 'Execute ADF Pipeline'
      inputs:
        azureSubscription: $(AZURE_RM_CONNECTION)
        ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
        ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
        azurePowerShellVersion: LatestVersion
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.x'
        addToPath: true
        architecture: 'x64'
      displayName: 'Use Python3'

    - task: configuredatabricks@0
      inputs:
        url: '$(DATABRICKS_URL)'
        token: '$(DATABRICKS_TOKEN)'
      displayName: 'Configure Databricks CLI'    

    - task: executenotebook@0
      inputs:
        notebookPath: '/Shared/devops-ds/test-data-ingestion'
        existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
        executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
      displayName: 'Test data ingestion'

    - task: waitexecution@0
      displayName: 'Wait until the testing is done'                

Nästa steg