Veri alımı işlem hattı için DevOps
Çoğu senaryoda veri alımı çözümü betiklerin, hizmet çağrılarının ve tüm etkinliklerin düzenleniyor olduğu bir işlem hattıdır. Bu makalede, veri alımı DevOps makine öğrenmesi modeli eğitimine hazır olan ortak bir veri alımı işlem hattının geliştirme yaşam döngüsüne uygulama hakkında bilgi edinebilirsiniz. İşlem hattı aşağıdaki Azure hizmetleri kullanılarak tasarlanmıştır:
- Azure Data Factory: Ham verileri okur ve veri hazırlığı sağlar.
- Azure Databricks: Verileri dönüştüren bir Python not defteri çalıştırır.
- Azure Pipelines: Sürekli tümleştirme ve geliştirme sürecini otomatik hale sağlar.
Veri alımı işlem hattı iş akışı
Veri alımı işlem hattı aşağıdaki iş akışını uygulamaya almaktadır:
- Ham veriler bir Azure Data Factory (ADF) işlem hattına okunur.
- ADF işlem hattı, verileri dönüştürmek için python Azure Databricks bir kümeye gönderir.
- Veriler bir blob kapsayıcısı içinde depolanır ve burada model eğitmek Azure Machine Learning veriler tarafından kullanılabilir.

Sürekli tümleştirme ve teslime genel bakış
Birçok yazılım çözümü gibi, üzerinde çalışan bir ekip (örneğin, Veri Mühendisleri) vardır. Azure Data Factory, Azure Databricks ve Azure Depolama gibi azure kaynakları üzerinde işbirliği Depolama paylaşır. Bu kaynakların koleksiyonu bir Geliştirme ortamıdır. Veri mühendisleri aynı kaynak kod tabanına katkıda bulunmalıdır.
Sürekli tümleştirme ve teslim sistemi, çözümü oluşturma, test etme ve dağıtma (dağıtma) sürecini otomatik hale getiriyor. Sürekli Tümleştirme (CI) işlemi aşağıdaki görevleri gerçekleştirir:
- Kodu bir araya toplar
- Kod kalitesi testleriyle denetler
- Birim testleri çalıştırır
- Test edilmiş kod ve şablon oluşturma gibi Azure Resource Manager oluşturur
Sürekli Teslim (CD) işlemi, yapıtları aşağı akış ortamlara dağıtır.

Bu makalede, Azure Pipelines ile CI ve CD işlemlerini otomatikleştirme işlemi Azure Pipelines.
Kaynak denetimi yönetimi
Değişiklikleri izlemek ve ekip üyeleri arasında işbirliğini etkinleştirmek için kaynak denetimi yönetimi gerekir. Örneğin, kod bir Azure DevOps, GitHub veya GitLab deposunda depolanır. İşbirliği iş akışı bir dallama modelini temel alan bir iş akışıdır. Örneğin, GitFlow.
Python Not Defteri Kaynak Kodu
Veri mühendisleri Python not defteri kaynak koduyla yerel olarak (örneğin,Visual Studio Code ) veya doğrudan Databricks çalışma alanında çalışır. Kod değişiklikleri tamamlandıktan sonra, dallama ilkesinden sonra depoyla birleştirilir.
İpucu
Kodun dosya biçiminde değil .py dosyalarda .ipynb depolanması Jupyter Notebook önerilir. Kodun okunabilirliğini artırır ve CI sürecinde otomatik kod kalitesi denetimlerine olanak sağlar.
Azure Data Factory Kaynak Kodu
İşlem hatlarının Azure Data Factory kodu, bir çalışma alanı tarafından oluşturulan JSON Azure Data Factory koleksiyonudur. Normalde veri mühendisleri doğrudan kaynak kod dosyaları yerine Azure Data Factory çalışma alanında bir görsel tasarımcıyla çalışır.
Çalışma alanını kaynak denetim deposu kullanmak üzere yapılandırmak için bkz. Git tümleştirmesi ile Azure Repos yazma.
Sürekli tümleştirme (CI)
Sürekli Tümleştirme sürecinin nihai amacı, kaynak koddan ortak ekip çalışmalarını toplamak ve aşağı akış ortamlarına dağıtım için hazırlamaktır. Kaynak kodu yönetiminde olduğu gibi bu işlem de Python not defterleri ve iş Azure Data Factory farklıdır.
Python Notebook CI
Python Notebooks için CI işlemi, işbirliği daldan kodu alır (örneğin, ana _ veya _geliştirme**) ve aşağıdaki etkinlikleri gerçekleştirir:
- Kod lint
- Birim testi
- Kodu yapıt olarak kaydetme
Aşağıdaki kod parçacığı, bu adımların bir yaml işlem hattında Azure DevOps gösteriyor:
steps:
- script: |
flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml
workingDirectory: '$(Build.SourcesDirectory)'
displayName: 'Run flake8 (code style analysis)'
- script: |
python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
displayName: 'Run unit tests'
- task: PublishTestResults@2
condition: succeededOrFailed()
inputs:
testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
testRunTitle: 'Linting & Unit tests'
failTaskOnFailedTests: true
displayName: 'Publish linting and unit test results'
- publish: $(Build.SourcesDirectory)
artifact: di-notebooks
İşlem hattı, Python kod lintleri yapmak için flake8 kullanır. Kaynak kodda tanımlanan birim testlerini çalıştırır ve lint ve test sonuçlarını Yayımlar, böylece Azure Pipeline yürütme ekranında kullanılabilir:

Lint ve birim testi başarılı olursa, işlem hattı kaynak kodu sonraki dağıtım adımları tarafından kullanılacak yapıt deposuna kopyalar.
Azure Data Factory CI
Bir işlem hattı Azure Data Factory CI işlemi, veri alımı işlem hattında performans sorununa neden olur. Sürekli tümleştirme yoktur. Azure Data Factory için dağıtılabilir yapıt, Azure Resource Manager koleksiyonudur. Bu şablonları oluşturmanın tek yolu, Azure Data Factory çalışma alanında yayımla düğmesine tıklamaktır.
- Veri mühendisleri, özellik dallarından kaynak kodu işbirliği dalı ile birleşiyor. Örneğin, ana _ veya _geliştirme**.
- İzin verilen birisi yayımla düğmesine tıklayarak işbirliği Azure Resource Manager kaynak kodundan şablon oluşturabilir.
- Çalışma alanı işlem hatlarını doğrular (lint ve birim testi olarak düşünebilirsiniz), Azure Resource Manager şablonları oluşturur (oluşturma olarak düşünebilirsiniz) ve oluşturulan şablonları aynı kod deposundaki teknik bir dal adf_publish'ye kaydeder (yapıt yayımlama olarak düşünebilirsiniz). Bu dal, çalışma alanı tarafından Azure Data Factory oluşturulur.
Bu işlem hakkında daha fazla bilgi için bkz. Azure Data Factory.
Oluşturulan şablon şablonlarının ortamdan bağımsız Azure Resource Manager emin olmak önemlidir. Bu, ortamlar arasında farklılık gösterebilir tüm değerlerin karşılaştırılmış olduğu anlamına gelir. Azure Data Factory parametre gibi değerlerin çoğunu ortaya çıkarmak için yeterince akıllıdır. Örneğin, aşağıdaki şablonda bir çalışma alanına Azure Machine Learning özellikleri parametre olarak ortaya çıkar:
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"factoryName": {
"value": "devops-ds-adf"
},
"AzureMLService_servicePrincipalKey": {
"value": ""
},
"AzureMLService_properties_typeProperties_subscriptionId": {
"value": "0fe1c235-5cfa-4152-17d7-5dff45a8d4ba"
},
"AzureMLService_properties_typeProperties_resourceGroupName": {
"value": "devops-ds-rg"
},
"AzureMLService_properties_typeProperties_servicePrincipalId": {
"value": "6e35e589-3b22-4edb-89d0-2ab7fc08d488"
},
"AzureMLService_properties_typeProperties_tenant": {
"value": "72f988bf-86f1-41af-912b-2d7cd611db47"
}
}
}
Ancak, varsayılan olarak çalışma alanı tarafından iş Azure Data Factory özel özelliklerinizi açığa çıkarmak istiyor olabilirsiniz. Bu makalenin senaryosunda bir Azure Data Factory işlem hattı, verileri işlemeye bir Python not defteri çağırır. Not defteri, giriş veri dosyası adıyla bir parametre kabul eder.
import pandas as pd
import numpy as np
data_file_name = getArgument("data_file_name")
data = pd.read_csv(data_file_name)
labels = np.array(data['target'])
...
Bu ad Geliştirme _, _QA,_ _UAT_ ve _PROD ortamları için farklıdır._ Birden çok etkinlik içeren karmaşık bir işlem hattında birkaç özel özellik olabilir. Tüm bu değerleri tek bir yerde toplamak ve bunları işlem hattı _ değişkenleri ** olarak tanımlamak iyi bir uygulamadır:

İşlem hattı etkinlikleri, bunları kullanırken işlem hattı değişkenlerine başvurur:

Çalışma Azure Data Factory, işlem hattı değişkenlerini varsayılan olarak Azure Resource Manager olarak açığa çıkarmaz. Çalışma alanı, şablon parametrelerinde hangi işlem hattı özelliklerinin kullanılabilir olması gerektiğini belirten Varsayılan Parametreleştirme Azure Resource Manager kullanır. Listeye işlem hattı değişkenleri eklemek için Varsayılan Parametreleştirme Şablonu bölümünü aşağıdaki kod parçacığıyla güncelleştirin ve sonuç json dosyasını kaynak klasörün "Microsoft.DataFactory/factories/pipelines" köküne ekleyin:
"Microsoft.DataFactory/factories/pipelines": {
"properties": {
"variables": {
"*": {
"defaultValue": "="
}
}
}
}
Bunu yapmak, Azure Data Factory çalışma alanını, yayımla düğmesine tıkıldığında değişkenleri parametreler listesine eklemeye zorlar:
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"factoryName": {
"value": "devops-ds-adf"
},
...
"data-ingestion-pipeline_properties_variables_data_file_name_defaultValue": {
"value": "driver_prediction_train.csv"
}
}
}
JSON dosyasındaki değerler, işlem hattı tanımında yapılandırılan varsayılan değerlerdir. Azure Resource Manager şablonu dağıtıldığında hedef ortam değerleriyle geçersiz kılınmaları beklenir.
Sürekli teslim (CD)
Sürekli Teslim işlemi yapıtları alır ve ilk hedef ortama dağıtır. Testleri çalıştırarak çözümün çalışır olduğundan emin olur. Başarılı olursa, sonraki ortama devam eder.
CD Azure Pipelines, ortamları temsil eden birden çok aşamadan oluşur. Her aşama aşağıdaki adımları gerçekleştiren dağıtımları ve işleri içerir:
- Çalışma alanına Python Not Defteri Azure Databricks dağıtma
- İşlem hattı Azure Data Factory dağıtma
- İşlem hattını çalıştırma
- Veri alımının sonucundaki sonuçları denetleme
İşlem hattı aşamaları, ortam zinciri aracılığıyla dağıtım işleminin nasıl geliştiğini denetlemeye ek denetim sağlayan onaylar ve geçitlerle yalıtılır.
Python Not Defteri Dağıtma
Aşağıdaki kod parçacığı, Bir Python not defterini Databricks kümesine kopyaleyen bir Azure Pipeline dağıtımını tanımlar:
- stage: 'Deploy_to_QA'
displayName: 'Deploy to QA'
variables:
- group: devops-ds-qa-vg
jobs:
- deployment: "Deploy_to_Databricks"
displayName: 'Deploy to Databricks'
timeoutInMinutes: 0
environment: qa
strategy:
runOnce:
deploy:
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
displayName: 'Use Python3'
- task: configuredatabricks@0
inputs:
url: '$(DATABRICKS_URL)'
token: '$(DATABRICKS_TOKEN)'
displayName: 'Configure Databricks CLI'
- task: deploynotebooks@0
inputs:
notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
workspaceFolder: '/Shared/devops-ds'
displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'
CI tarafından üretilen yapıtlar otomatik olarak dağıtım aracıya kopyalanır ve klasöründe $(Pipeline.Workspace) bulunur. Bu durumda, dağıtım görevi Python not defterini di-notebooks içeren yapıtı ifade eder. Bu dağıtım, not defteri Azure DevOps Databricks çalışma alanına kopyalamak için Databricks Azure DevOps uzantısını kullanır.
Aşama, Deploy_to_QA Azure DevOps devops-ds-qa-vg projesinde tanımlanan değişken grubuna başvuru içerir. Bu aşamadaki adımlar, bu değişken grubundan (örneğin, ve ) değişkenlere $(DATABRICKS_URL) $(DATABRICKS_TOKEN) başvurur. Burada fikir, sonraki aşamanın (örneğin, ) kendi UAT kapsamlı değişken grubunda tanımlanan aynı değişken Deploy_to_UAT adlarına sahip olmasıdır.
İşlem hattı Azure Data Factory dağıtma
Azure Data Factory için dağıtılabilir yapıt, Azure Resource Manager şablonudur. Aşağıdaki kod parçacığında da olduğu gibi Azure Kaynak Grubu Dağıtımı göreviyle birlikte dağıtılacaktır:
- deployment: "Deploy_to_ADF"
displayName: 'Deploy to ADF'
timeoutInMinutes: 0
environment: qa
strategy:
runOnce:
deploy:
steps:
- task: AzureResourceGroupDeployment@2
displayName: 'Deploy ADF resources'
inputs:
azureSubscription: $(AZURE_RM_CONNECTION)
resourceGroupName: $(RESOURCE_GROUP)
location: $(LOCATION)
csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
Veri dosya adı parametresinin değeri, QA aşaması $(DATA_FILE_NAME) değişken grubunda tanımlanan değişkenden gelir. Benzer şekilde ARMTemplateForFactory.json içinde tanımlanan tüm parametreler geçersiz kılınabilir. Yoksa varsayılan değerler kullanılır.
İşlem hattını çalıştırma ve veri alımının sonucundaki sonuçları denetleme
Sonraki adım, dağıtılan çözümün çalışa olduğundan emin olmaktır. Aşağıdaki iş tanımı, PowerShell betiği Azure Data Factory bir işlem hattı çalıştırır ve bir Python not defterini bir Azure Databricks yürütür. Not defteri, verilerin doğru şekilde alındı olup olmadığını denetler ve sonuç veri dosyasını adıyla $(bin_FILE_NAME) doğrular.
- job: "Integration_test_job"
displayName: "Integration test job"
dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
pool:
vmImage: 'ubuntu-latest'
timeoutInMinutes: 0
steps:
- task: AzurePowerShell@4
displayName: 'Execute ADF Pipeline'
inputs:
azureSubscription: $(AZURE_RM_CONNECTION)
ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
azurePowerShellVersion: LatestVersion
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
displayName: 'Use Python3'
- task: configuredatabricks@0
inputs:
url: '$(DATABRICKS_URL)'
token: '$(DATABRICKS_TOKEN)'
displayName: 'Configure Databricks CLI'
- task: executenotebook@0
inputs:
notebookPath: '/Shared/devops-ds/test-data-ingestion'
existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
displayName: 'Test data ingestion'
- task: waitexecution@0
displayName: 'Wait until the testing is done'
İşle ilgili son görev, not defteri yürütmenin sonucu denetler. Hata döndürürse işlem hattı yürütme durumunu başarısız olarak ayarlar.
Parçaları bir araya koyma
Ci/CD Azure Pipeline'ın tam olarak aşağıdaki aşamalardan oluşur:
- CI
- QA'ya dağıtma
- Databricks'e Dağıtma + ADF'ye Dağıtma
- Tümleştirme Testi
Sahip olduğu hedef ortam sayısına eşit bir dizi * Dağıtma _ aşaması içerir. Her _ Dağıtma* aşaması, paralel olarak çalışan iki dağıtım ve çözümü ortamda test etmek için dağıtımların ardından çalışan bir iş içerir.
İşlem hattının örnek uygulaması aşağıdaki yaml kod parçacığında bir araya gelir:
variables:
- group: devops-ds-vg
stages:
- stage: 'CI'
displayName: 'CI'
jobs:
- job: "CI_Job"
displayName: "CI Job"
pool:
vmImage: 'ubuntu-latest'
timeoutInMinutes: 0
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
displayName: 'Use Python3'
- script: pip install --upgrade flake8 flake8_formatter_junit_xml
displayName: 'Install flake8'
- checkout: self
- script: |
flake8 --output-file=$(Build.BinariesDirectory)/lint-testresults.xml --format junit-xml
workingDirectory: '$(Build.SourcesDirectory)'
displayName: 'Run flake8 (code style analysis)'
- script: |
python -m pytest --junitxml=$(Build.BinariesDirectory)/unit-testresults.xml $(Build.SourcesDirectory)
displayName: 'Run unit tests'
- task: PublishTestResults@2
condition: succeededOrFailed()
inputs:
testResultsFiles: '$(Build.BinariesDirectory)/*-testresults.xml'
testRunTitle: 'Linting & Unit tests'
failTaskOnFailedTests: true
displayName: 'Publish linting and unit test results'
# The CI stage produces two artifacts (notebooks and ADF pipelines).
# The pipelines Azure Resource Manager templates are stored in a technical branch "adf_publish"
- publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/code/dataingestion
artifact: di-notebooks
- checkout: git://${{variables['System.TeamProject']}}@adf_publish
- publish: $(Build.SourcesDirectory)/$(Build.Repository.Name)/devops-ds-adf
artifact: adf-pipelines
- stage: 'Deploy_to_QA'
displayName: 'Deploy to QA'
variables:
- group: devops-ds-qa-vg
jobs:
- deployment: "Deploy_to_Databricks"
displayName: 'Deploy to Databricks'
timeoutInMinutes: 0
environment: qa
strategy:
runOnce:
deploy:
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
displayName: 'Use Python3'
- task: configuredatabricks@0
inputs:
url: '$(DATABRICKS_URL)'
token: '$(DATABRICKS_TOKEN)'
displayName: 'Configure Databricks CLI'
- task: deploynotebooks@0
inputs:
notebooksFolderPath: '$(Pipeline.Workspace)/di-notebooks'
workspaceFolder: '/Shared/devops-ds'
displayName: 'Deploy (copy) data processing notebook to the Databricks cluster'
- deployment: "Deploy_to_ADF"
displayName: 'Deploy to ADF'
timeoutInMinutes: 0
environment: qa
strategy:
runOnce:
deploy:
steps:
- task: AzureResourceGroupDeployment@2
displayName: 'Deploy ADF resources'
inputs:
azureSubscription: $(AZURE_RM_CONNECTION)
resourceGroupName: $(RESOURCE_GROUP)
location: $(LOCATION)
csmFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateForFactory.json'
csmParametersFile: '$(Pipeline.Workspace)/adf-pipelines/ARMTemplateParametersForFactory.json'
overrideParameters: -data-ingestion-pipeline_properties_variables_data_file_name_defaultValue "$(DATA_FILE_NAME)"
- job: "Integration_test_job"
displayName: "Integration test job"
dependsOn: [Deploy_to_Databricks, Deploy_to_ADF]
pool:
vmImage: 'ubuntu-latest'
timeoutInMinutes: 0
steps:
- task: AzurePowerShell@4
displayName: 'Execute ADF Pipeline'
inputs:
azureSubscription: $(AZURE_RM_CONNECTION)
ScriptPath: '$(Build.SourcesDirectory)/adf/utils/Invoke-ADFPipeline.ps1'
ScriptArguments: '-ResourceGroupName $(RESOURCE_GROUP) -DataFactoryName $(DATA_FACTORY_NAME) -PipelineName $(PIPELINE_NAME)'
azurePowerShellVersion: LatestVersion
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
displayName: 'Use Python3'
- task: configuredatabricks@0
inputs:
url: '$(DATABRICKS_URL)'
token: '$(DATABRICKS_TOKEN)'
displayName: 'Configure Databricks CLI'
- task: executenotebook@0
inputs:
notebookPath: '/Shared/devops-ds/test-data-ingestion'
existingClusterId: '$(DATABRICKS_CLUSTER_ID)'
executionParams: '{"bin_file_name":"$(bin_FILE_NAME)"}'
displayName: 'Test data ingestion'
- task: waitexecution@0
displayName: 'Wait until the testing is done'