DevOps voor een gegevensopnamepijplijn
In de meeste scenario's is een oplossing voor gegevensinname een samenstelling van scripts, service-aanroepen en een pijplijn die alle activiteiten in orchestrating. In dit artikel leert u hoe u DevOps-procedures kunt toepassen op de ontwikkelingslevenscyclus van een algemene pijplijn voor gegevensingestie die gegevens voorbereidt voor het trainen machine learning model. De pijplijn is gebouwd met behulp van de volgende Azure-services:
- Azure Data Factory: leest de onbewerkte gegevens en orden gegevensvoorbereiding.
- Azure Databricks: voert een Python-notebook uit dat de gegevens transformeert.
- Azure Pipelines: automatiseert een continu integratie- en ontwikkelingsproces.
Werkstroom voor gegevens opnamepijplijn
De pijplijn voor gegevensingestie implementeert de volgende werkstroom:
- Onbewerkte gegevens worden ingelezen in Azure Data Factory ADF-pijplijn (ADF).
- De ADF-pijplijn verzendt de gegevens naar Azure Databricks cluster, waarin een Python-notebook wordt uitgevoerd om de gegevens te transformeren.
- De gegevens worden opgeslagen in een blobcontainer, waar ze kunnen worden gebruikt door Azure Machine Learning een model te trainen.

Overzicht van continue integratie en levering
Net als bij veel andere softwareoplossingen is er een team (bijvoorbeeld data engineers) aan het werk. Ze werken samen en delen dezelfde Azure-resources, Azure Data Factory, Azure Databricks en Azure Storage accounts. De verzameling van deze resources is een ontwikkelomgeving. De data engineers dragen bij aan dezelfde broncodebasis.
Een systeem voor continue integratie en levering automatiseert het proces van het bouwen, testen en leveren (implementeren) van de oplossing. Het CI-proces (Continue integratie) voert de volgende taken uit:
- De code samenstellen
- Controleert deze met de kwaliteitstests van de code
- Voert eenheidstests uit
- Produceert artefacten zoals geteste code en Azure Resource Manager sjablonen
Het CD-proces (Continuous Delivery) implementeert de artefacten in de downstreamomgevingen.

In dit artikel wordt gedemonstreerd hoe u de CI- en CD-processen automatiseert met Azure Pipelines.
Broncodebeheer
Broncodebeheer is nodig om wijzigingen bij te houden en samenwerking tussen teamleden mogelijk te maken. De code wordt bijvoorbeeld opgeslagen in een Azure DevOps-, GitHub- of GitLab-opslagplaats. De samenwerkingswerkstroom is gebaseerd op een vertakkingsmodel. Bijvoorbeeld GitFlow.
Python Notebook-broncode
De data engineers werken met de broncode van het Python-notebook, lokaal in een IDE (bijvoorbeeld Visual Studio Code) ofrechtstreeks in de Databricks-werkruimte. Zodra de codewijzigingen zijn voltooid, worden ze samengevoegd met de opslagplaats volgens een vertakkingsbeleid.
Tip
We raden u aan de code op te slaan in .py bestanden in plaats van in Jupyter Notebook .ipynb indeling. Het verbetert de leesbaarheid van code en maakt automatische kwaliteitscontroles van code in het CI-proces mogelijk.
Azure Data Factory broncode
De broncode van Azure Data Factory pijplijnen is een verzameling JSON-bestanden die worden gegenereerd door een Azure Data Factory werkruimte. Normaal gesproken werken de data engineers met een visuele ontwerper in Azure Data Factory werkruimte in plaats van rechtstreeks met de broncodebestanden.
Zie Author with Azure Repos Git integration (Maken met Git-integratie van Azure-opslagplaatsen) als u de werkruimte wilt configureren voor het gebruik van een opslagplaats voor broncodebeheer.
Continue integratie (CI)
Het uiteindelijke doel van het proces voor continue integratie is het verzamelen van het gezamenlijke teamwerk van de broncode en het voorbereiden voor de implementatie naar de downstreamomgevingen. Net als bij het beheer van de broncode is dit proces anders voor de Python-notebooks en Azure Data Factory pijplijnen.
Python Notebook CI
Het CI-proces voor de Python Notebooks haalt de code op uit de samenwerkingstak (bijvoorbeeld master _ of _develop**) en voert de volgende activiteiten uit:
- Code linten
- Het testen van modules
- De code opslaan als een artefact
Het volgende codefragment toont de implementatie van deze stappen in een Azure DevOps yaml-pijplijn:
steps:
- script: |
flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml
workingDirectory: '$(Build.SourcesDirectory)'
displayName: 'Run flake8 (code style analysis)'
- script: |
python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
displayName: 'Run unit tests'
- task: PublishTestResults@2
condition: succeededOrFailed()
inputs:
testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
testRunTitle: 'Linting & Unit tests'
failTaskOnFailedTests: true
displayName: 'Publish linting and unit test results'
- publish: $(Build.SourcesDirectory)
artifact: di-notebooks
De pijplijn maakt gebruik van : 8 om python-code linting uit te voeren. De eenheidstests die in de broncode zijn gedefinieerd, worden uitgevoerd en de linting- en testresultaten worden gepubliceerd, zodat ze beschikbaar zijn in het uitvoeringsscherm van Azure Pipeline:

Als het linten en het testen van de eenheid is geslaagd, kopieert de pijplijn de broncode naar de artefactopslagplaats die moet worden gebruikt voor de volgende implementatiestappen.
Azure Data Factory CI
Ci-proces voor een Azure Data Factory pijplijn is een knelpunt voor een pijplijn voor gegevensingestie. Er is geen continue integratie. Een implementeerbaar artefact voor Azure Data Factory is een verzameling Azure Resource Manager-sjablonen. De enige manier om deze sjablonen te maken, is door te klikken op de knop Publiceren in Azure Data Factory werkruimte.
- De data engineers voegen de broncode van hun functiebranches samen in de samenwerkingstak, bijvoorbeeld master _ of _develop**.
- Iemand met de verleende machtigingen klikt op de knop Publiceren om Azure Resource Manager genereren op basis van de broncode in de samenwerkingstak.
- De werkruimte valideert de pijplijnen (u kunt deze zien als linting en eenheidstests), genereert Azure Resource Manager-sjablonen (u kunt het zien als een gebouw) en slaat de gegenereerde sjablonen op in een technische vertakking adf_publish in dezelfde codeopslagplaats (u kunt dit zien als het publiceren van artefacten). Deze vertakking wordt automatisch gemaakt door de Azure Data Factory werkruimte.
Zie Continue integratie en levering in Azure Data Factory voor meer informatie over dit Azure Data Factory.
Het is belangrijk om ervoor te zorgen dat de gegenereerde Azure Resource Manager sjablonen omgevingsagnostisch zijn. Dit betekent dat alle waarden die kunnen verschillen tussen omgevingen worden geparametriseerd. Azure Data Factory is slim genoeg om het merendeel van dergelijke waarden, zoals parameters, weer te geven. In de volgende sjabloon worden bijvoorbeeld de verbindingseigenschappen voor een Azure Machine Learning beschikbaar gemaakt als parameters:
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"factoryName": {
"value": "devops-ds-adf"
},
"AzureMLService_servicePrincipalKey": {
"value": ""
},
"AzureMLService_properties_typeProperties_subscriptionId": {
"value": "0fe1c235-5cfa-4152-17d7-5dff45a8d4ba"
},
"AzureMLService_properties_typeProperties_resourceGroupName": {
"value": "devops-ds-rg"
},
"AzureMLService_properties_typeProperties_servicePrincipalId": {
"value": "6e35e589-3b22-4edb-89d0-2ab7fc08d488"
},
"AzureMLService_properties_typeProperties_tenant": {
"value": "72f988bf-86f1-41af-912b-2d7cd611db47"
}
}
}
Mogelijk wilt u echter uw aangepaste eigenschappen beschikbaar maken die niet standaard door de Azure Data Factory worden verwerkt. In het scenario van dit artikel roept een Azure Data Factory een Python-notebook aan dat de gegevens verwerkt. Het notebook accepteert een parameter met de naam van een invoergegevensbestand.
import pandas as pd
import numpy as np
data_file_name = getArgument("data_file_name")
data = pd.read_csv(data_file_name)
labels = np.array(data['target'])
...
Deze naam is anders voor Dev _, _QA,_ _UAT_ en _PROD-omgevingen._ In een complexe pijplijn met meerdere activiteiten kunnen er verschillende aangepaste eigenschappen zijn. Het is een goed idee om al deze waarden op één plek te verzamelen en ze te definiëren als pijplijnvariabelen _ **:

De pijplijnactiviteiten kunnen verwijzen naar de pijplijnvariabelen terwijl ze daadwerkelijk worden gebruikt:

In Azure Data Factory werkruimte worden pijplijnvariabelen niet standaard weergegeven als Azure Resource Manager sjablonenparameters. De werkruimte maakt gebruik van de standaardparametersjabloon die bepaalt welke pijplijneigenschappen moeten worden weergegeven als Azure Resource Manager sjabloonparameters. Als u pijplijnvariabelen wilt toevoegen aan de lijst, moet u de sectie van de standaardparameterisatiesjabloon bijwerken met het volgende codefragment en het JSON-resultaatbestand in de hoofdmap van de "Microsoft.DataFactory/factories/pipelines" bronmap plaatsen:
"Microsoft.DataFactory/factories/pipelines": {
"properties": {
"variables": {
"*": {
"defaultValue": "="
}
}
}
}
Als u dit doet, Azure Data Factory de werkruimte de variabelen toe te voegen aan de lijst met parameters wanneer op de knop Publiceren wordt geklikt:
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"factoryName": {
"value": "devops-ds-adf"
},
...
"data-ingestion-pipeline_properties_variables_data_file_name_defaultValue": {
"value": "driver_prediction_train.csv"
}
}
}
De waarden in het JSON-bestand zijn standaardwaarden die zijn geconfigureerd in de pijplijndefinitie. Ze worden naar verwachting overschrijven met de waarden van de doelomgeving wanneer de Azure Resource Manager sjabloon wordt geïmplementeerd.
Continue levering (CD)
Het proces voor continue levering neemt de artefacten en implementeert deze in de eerste doelomgeving. Het zorgt ervoor dat de oplossing werkt door tests uit te voeren. Als dit lukt, gaat het door naar de volgende omgeving.
De CD Azure-pijplijn bestaat uit meerdere fasen die de omgevingen vertegenwoordigen. Elke fase bevat implementaties en taken die de volgende stappen uitvoeren:
- Een Python Notebook implementeren in Azure Databricks werkruimte
- Een pijplijn Azure Data Factory implementeren
- De pijplijn uitvoeren
- Het resultaat van de gegevens opname controleren
De pijplijnfasen kunnen worden geconfigureerd met goedkeuringen en poorten die extra controle bieden over hoe het implementatieproces zich ontwikkelt via de keten van omgevingen.
Een Python-notebook implementeren
Het volgende codefragment definieert een Azure Pipeline-implementatie die een Python-notebook kopieert naar een Databricks-cluster:
- stage: 'Deploy_to_QA'
displayName: 'Deploy to QA'
variables:
- group: devops-ds-qa-vg
jobs:
- deployment: "Deploy_to_Databricks"
displayName: 'Deploy to Databricks'
timeoutInMinutes: 0
environment: qa
strategy:
runOnce:
deploy:
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
displayName: 'Use Python3'
- task: configuredatabricks@0
inputs:
url: '$(DATABRICKS_URL)'
token: '$(DATABRICKS_TOKEN)'
displayName: 'Configure Databricks CLI'
- task: deploynotebooks@0
inputs:
notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
workspaceFolder: '/Shared/devops-ds'
displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'
De artefacten die door de CI worden geproduceerd, worden automatisch gekopieerd naar de implementatieagent en zijn beschikbaar in de $(Pipeline.Workspace) map . In dit geval verwijst de implementatietaak naar het di-notebooks artefact met het Python-notebook. Deze implementatie maakt gebruik van de Databricks Azure DevOps-extensie om de notebookbestanden te kopiëren naar de Databricks-werkruimte.
De Deploy_to_QA fase bevat een verwijzing naar de devops-ds-qa-vg variabelegroep die is gedefinieerd in het Azure DevOps-project. De stappen in deze fase verwijzen naar de variabelen uit deze variabelegroep (bijvoorbeeld $(DATABRICKS_URL) en $(DATABRICKS_TOKEN) ). Het idee is dat de volgende fase (bijvoorbeeld ) werkt met dezelfde variabelenamen die zijn gedefinieerd in de eigen Deploy_to_UAT variabelegroep met UAT-bereik.
Een pijplijn Azure Data Factory implementeren
Een implementeerbaar artefact voor Azure Data Factory is een Azure Resource Manager sjabloon. Deze wordt geïmplementeerd met de azure-resourcegroepimplementatietaak, zoals wordt gedemonstreerd in het volgende fragment:
- deployment: "Deploy_to_ADF"
displayName: 'Deploy to ADF'
timeoutInMinutes: 0
environment: qa
strategy:
runOnce:
deploy:
steps:
- task: AzureResourceGroupDeployment@2
displayName: 'Deploy ADF resources'
inputs:
azureSubscription: $(AZURE_RM_CONNECTION)
resourceGroupName: $(RESOURCE_GROUP)
location: $(LOCATION)
csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
De waarde van de parameter data filename is afkomstig van de $(DATA_FILE_NAME) variabele die is gedefinieerd in een variabelegroep voor de QA-fase. Op dezelfde manier kunnen alle parameters die zijn gedefinieerd in ARMTemplateForFactory.json worden overschreven. Als dat niet zo is, worden de standaardwaarden gebruikt.
Voer de pijplijn uit en controleer het resultaat van de gegevens opname
De volgende stap is om ervoor te zorgen dat de geïmplementeerde oplossing werkt. Met de volgende taakdefinitie wordt Azure Data Factory pijplijn uitgevoerd met een PowerShell-script en wordt een Python-notebook uitgevoerd op een Azure Databricks cluster. Het notebook controleert of de gegevens correct zijn opgenomen en valideert het resultaatgegevensbestand met de $(bin_FILE_NAME) naam.
- job: "Integration_test_job"
displayName: "Integration test job"
dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
pool:
vmImage: 'ubuntu-latest'
timeoutInMinutes: 0
steps:
- task: AzurePowerShell@4
displayName: 'Execute ADF Pipeline'
inputs:
azureSubscription: $(AZURE_RM_CONNECTION)
ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
azurePowerShellVersion: LatestVersion
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
displayName: 'Use Python3'
- task: configuredatabricks@0
inputs:
url: '$(DATABRICKS_URL)'
token: '$(DATABRICKS_TOKEN)'
displayName: 'Configure Databricks CLI'
- task: executenotebook@0
inputs:
notebookPath: '/Shared/devops-ds/test-data-ingestion'
existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
displayName: 'Test data ingestion'
- task: waitexecution@0
displayName: 'Wait until the testing is done'
De laatste taak in de taak controleert het resultaat van de uitvoering van het notebook. Als er een fout wordt weergegeven, wordt de status van de uitvoering van de pijplijn op mislukt.
Onderdelen samenstellen
De volledige CI/CD Azure-pijplijn bestaat uit de volgende fasen:
- CI
- Implementeren naar QA
- Implementeren naar Databricks + Implementeren naar ADF
- Integratietest
Deze bevat een aantal Implementatiefasen die gelijk zijn aan het aantal doelomgevingen dat u hebt. Elke fase __ Implementeren_ bevat twee implementaties die parallel worden uitgevoerd en een taak die wordt uitgevoerd na implementaties om de oplossing in de omgeving te testen.
In het volgende yaml-fragment wordt een voorbeeld van de implementatie van de pijplijn samengesteld:
variables:
- group: devops-ds-vg
stages:
- stage: 'CI'
displayName: 'CI'
jobs:
- job: "CI_Job"
displayName: "CI Job"
pool:
vmImage: 'ubuntu-latest'
timeoutInMinutes: 0
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
displayName: 'Use Python3'
- script: pip install --upgrade flake8 flake8_formatter_junit_xml
displayName: 'Install flake8'
- checkout: self
- script: |
flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml
workingDirectory: '$(Build.SourcesDirectory)'
displayName: 'Run flake8 (code style analysis)'
- script: |
python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
displayName: 'Run unit tests'
- task: PublishTestResults@2
condition: succeededOrFailed()
inputs:
testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
testRunTitle: 'Linting & Unit tests'
failTaskOnFailedTests: true
displayName: 'Publish linting and unit test results'
# The CI stage produces two artifacts (notebooks and ADF pipelines).
# The pipelines Azure Resource Manager templates are stored in a technical branch "adf_publish"
- publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/code/dataingestion
artifact: di-notebooks
- checkout: git://${{variables['System.TeamProject']}}@adf_publish
- publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/devops-ds-adf
artifact: adf-pipelines
- stage: 'Deploy_to_QA'
displayName: 'Deploy to QA'
variables:
- group: devops-ds-qa-vg
jobs:
- deployment: "Deploy_to_Databricks"
displayName: 'Deploy to Databricks'
timeoutInMinutes: 0
environment: qa
strategy:
runOnce:
deploy:
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
displayName: 'Use Python3'
- task: configuredatabricks@0
inputs:
url: '$(DATABRICKS_URL)'
token: '$(DATABRICKS_TOKEN)'
displayName: 'Configure Databricks CLI'
- task: deploynotebooks@0
inputs:
notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
workspaceFolder: '/Shared/devops-ds'
displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'
- deployment: "Deploy_to_ADF"
displayName: 'Deploy to ADF'
timeoutInMinutes: 0
environment: qa
strategy:
runOnce:
deploy:
steps:
- task: AzureResourceGroupDeployment@2
displayName: 'Deploy ADF resources'
inputs:
azureSubscription: $(AZURE_RM_CONNECTION)
resourceGroupName: $(RESOURCE_GROUP)
location: $(LOCATION)
csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
- job: "Integration_test_job"
displayName: "Integration test job"
dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
pool:
vmImage: 'ubuntu-latest'
timeoutInMinutes: 0
steps:
- task: AzurePowerShell@4
displayName: 'Execute ADF Pipeline'
inputs:
azureSubscription: $(AZURE_RM_CONNECTION)
ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
azurePowerShellVersion: LatestVersion
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
displayName: 'Use Python3'
- task: configuredatabricks@0
inputs:
url: '$(DATABRICKS_URL)'
token: '$(DATABRICKS_TOKEN)'
displayName: 'Configure Databricks CLI'
- task: executenotebook@0
inputs:
notebookPath: '/Shared/devops-ds/test-data-ingestion'
existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
displayName: 'Test data ingestion'
- task: waitexecution@0
displayName: 'Wait until the testing is done'