您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

教程:使用 Python API 通过 Azure Batch 运行并行工作负荷Tutorial: Run a parallel workload with Azure Batch using the Python API

使用 Azure Batch 在 Azure 中高效运行大规模并行和高性能计算 (HPC) 批处理作业。Use Azure Batch to run large-scale parallel and high-performance computing (HPC) batch jobs efficiently in Azure. 本教程通过一个 Python 示例演示了如何使用 Batch 运行并行工作负荷。This tutorial walks through a Python example of running a parallel workload using Batch. 你可以学习常用的 Batch 应用程序工作流,以及如何以编程方式与 Batch 和存储资源交互。You learn a common Batch application workflow and how to interact programmatically with Batch and Storage resources. 学习如何:You learn how to:

  • 通过 Batch 和存储帐户进行身份验证Authenticate with Batch and Storage accounts
  • 将输入文件上传到存储Upload input files to Storage
  • 创建运行应用程序所需的计算节点池Create a pool of compute nodes to run an application
  • 创建用于处理输入文件的作业和任务Create a job and tasks to process input files
  • 监视任务执行情况Monitor task execution
  • 检索输出文件Retrieve output files

本教程使用 ffmpeg 开源工具将 MP4 媒体文件并行转换为 MP3 格式。In this tutorial, you convert MP4 media files in parallel to MP3 format using the ffmpeg open-source tool.

如果还没有 Azure 订阅,可以在开始前创建一个免费帐户If you don't have an Azure subscription, create a free account before you begin.

先决条件Prerequisites

登录 AzureSign in to Azure

https://portal.azure.com 中登录 Azure 门户。Sign in to the Azure portal at https://portal.azure.com.

获取帐户凭据Get account credentials

就此示例来说,需为 Batch 帐户和存储帐户提供凭据。For this example, you need to provide credentials for your Batch and Storage accounts. 若要获取所需凭据,一种直接的方法是使用 Azure 门户。A straightforward way to get the necessary credentials is in the Azure portal. (也可使用 Azure API 或命令行工具来获取这些凭据。)(You can also get these credentials using the Azure APIs or command-line tools.)

  1. 选择“所有服务” > “Batch 帐户”,然后选择 Batch 帐户的名称。Select All services > Batch accounts, and then select the name of your Batch account.

  2. 若要查看 Batch 凭据,请选择“密钥” 。To see the Batch credentials, select Keys. 将“Batch 帐户”、“URL”和“主访问密钥”的值复制到文本编辑器。 Copy the values of Batch account, URL, and Primary access key to a text editor.

  3. 若要查看存储帐户名称和密钥,请选择“存储帐户” 。To see the Storage account name and keys, select Storage account. 将“存储帐户名称”和“Key1”的值复制到文本编辑器。 Copy the values of Storage account name and Key1 to a text editor.

下载并运行示例Download and run the sample

下载示例Download the sample

从 GitHub 下载或克隆示例应用Download or clone the sample app from GitHub. 若要使用 Git 客户端克隆示例应用存储库,请使用以下命令:To clone the sample app repo with a Git client, use the following command:

git clone https://github.com/Azure-Samples/batch-python-ffmpeg-tutorial.git

导航到包含 batch_python_tutorial_ffmpeg.py 文件的目录。Navigate to the directory that contains the file batch_python_tutorial_ffmpeg.py.

在 Python 环境中使用 pip 安装所需的包。In your Python environment, install the required packages using pip.

pip install -r requirements.txt

打开 config.py 文件。Open the file config.py. 使用特定于帐户的值更新 Batch 帐户和存储帐户凭据字符串。Update the Batch and storage account credential strings with the values unique to your accounts. 例如:For example:

_BATCH_ACCOUNT_NAME = 'mybatchaccount'
_BATCH_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxE+yXrRvJAqT9BlXwwo1CwF+SwAYOxxxxxxxxxxxxxxxx43pXi/gdiATkvbpLRl3x14pcEQ=='
_BATCH_ACCOUNT_URL = 'https://mybatchaccount.mybatchregion.batch.azure.com'
_STORAGE_ACCOUNT_NAME = 'mystorageaccount'
_STORAGE_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxy4/xxxxxxxxxxxxxxxxfwpbIC5aAWA8wDu+AFXZB827Mt9lybZB1nUcQbQiUrkPtilK5BQ=='

运行应用Run the app

若要运行该脚本,请执行以下操作:To run the script:

python batch_python_tutorial_ffmpeg.py

运行示例应用程序时,控制台输出如下所示。When you run the sample application, the console output is similar to the following. 在执行期间启动池的计算节点时,会遇到暂停并看到Monitoring all tasks for 'Completed' state, timeout in 00:30:00...During execution, you experience a pause at Monitoring all tasks for 'Completed' state, timeout in 00:30:00... while the pool's compute nodes are started.

Sample start: 11/28/2018 3:20:21 PM

Container [input] created.
Container [output] created.
Uploading file LowPriVMs-1.mp4 to container [input]...
Uploading file LowPriVMs-2.mp4 to container [input]...
Uploading file LowPriVMs-3.mp4 to container [input]...
Uploading file LowPriVMs-4.mp4 to container [input]...
Uploading file LowPriVMs-5.mp4 to container [input]...
Creating pool [LinuxFFmpegPool]...
Creating job [LinuxFFmpegJob]...
Adding 5 tasks to job [LinuxFFmpegJob]...
Monitoring all tasks for 'Completed' state, timeout in 00:30:00...
Success! All tasks completed successfully within the specified timeout period.
Deleting container [input]....

Sample end: 11/28/2018 3:29:36 PM
Elapsed time: 00:09:14.3418742

转到 Azure 门户中的 Batch 帐户,监视池、计算节点、作业和任务。Go to your Batch account in the Azure portal to monitor the pool, compute nodes, job, and tasks. 例如,若要查看池中计算节点的热度地图,请单击“池” > “LinuxFFmpegPool”。For example, to see a heat map of the compute nodes in your pool, click Pools > LinuxFFmpegPool.

任务正在运行时,热度地图如下所示:When tasks are running, the heat map is similar to the following:

池热度地图

以默认配置运行应用程序时,典型的执行时间大约为 5 分钟Typical execution time is approximately 5 minutes when you run the application in its default configuration. 池创建过程需要最多时间。Pool creation takes the most time.

检索输出文件Retrieve output files

可以使用 Azure 门户下载 ffmpeg 任务生成的输出 MP3 文件。You can use the Azure portal to download the output MP3 files generated by the ffmpeg tasks.

  1. 单击“所有服务” > “存储帐户”,然后单击存储帐户的名称。Click All services > Storage accounts, and then click the name of your storage account.
  2. 单击“Blob” > “输出”。Click Blobs > output.
  3. 右键单击一个输出 MP3 文件,然后单击“下载” 。Right-click one of the output MP3 files and then click Download. 在浏览器中按提示打开或保存该文件。Follow the prompts in your browser to open or save the file.

下载输出文件

也可以编程方式从计算节点或存储容器下载这些文件(但在本示例中未演示)。Although not shown in this sample, you can also download the files programmatically from the compute nodes or from the storage container.

查看代码Review the code

以下部分将示例应用程序细分为多个执行步骤,用于处理 Batch 服务中的工作负荷。The following sections break down the sample application into the steps that it performs to process a workload in the Batch service. 在阅读本文的其余内容时,请参阅 Python 代码,因为我们并没有讨论示例中的每行代码。Refer to the Python code while you read the rest of this article, since not every line of code in the sample is discussed.

对 Blob 和 Batch 客户端进行身份验证Authenticate Blob and Batch clients

为了与存储帐户交互,应用使用 azure-storage-blob 包来创建 BlockBlobService 对象。To interact with a storage account, the app uses the azure-storage-blob package to create a BlockBlobService object.

blob_client = azureblob.BlockBlobService(
    account_name=_STORAGE_ACCOUNT_NAME,
    account_key=_STORAGE_ACCOUNT_KEY)

应用创建的 BatchServiceClient 对象用于创建和管理 Batch 服务中的池、作业和任务。The app creates a BatchServiceClient object to create and manage pools, jobs, and tasks in the Batch service. 示例中的 Batch 客户端使用共享密钥身份验证。The Batch client in the sample uses shared key authentication. Batch 还支持通过 Azure Active Directory 进行身份验证,以便对单个用户或无人参与应用程序进行身份验证。Batch also supports authentication through Azure Active Directory, to authenticate individual users or an unattended application.

credentials = batchauth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME,
                                             _BATCH_ACCOUNT_KEY)

batch_client = batch.BatchServiceClient(
    credentials,
    base_url=_BATCH_ACCOUNT_URL)

上传输入文件Upload input files

应用使用 blob_client 引用为输入 MP4 文件创建一个存储容器,并为任务输出创建一个容器。The app uses the blob_client reference create a storage container for the input MP4 files and a container for the task output. 然后,它会调用 upload_file_to_container 函数,将本地 InputFiles 目录中的 MP4 文件上传到容器。Then, it calls the upload_file_to_container function to upload MP4 files in the local InputFiles directory to the container. 存储中的文件定义为 Batch ResourceFile 对象,Batch 随后可以将这些对象下载到计算节点。The files in storage are defined as Batch ResourceFile objects that Batch can later download to compute nodes.

blob_client.create_container(input_container_name, fail_on_exist=False)
blob_client.create_container(output_container_name, fail_on_exist=False)
input_file_paths = []

for folder, subs, files in os.walk(os.path.join(sys.path[0], './InputFiles/')):
    for filename in files:
        if filename.endswith(".mp4"):
            input_file_paths.append(os.path.abspath(
                os.path.join(folder, filename)))

# Upload the input files. This is the collection of files that are to be processed by the tasks.
input_files = [
    upload_file_to_container(blob_client, input_container_name, file_path)
    for file_path in input_file_paths]

创建计算节点池Create a pool of compute nodes

然后,该示例会调用 create_pool 以在 Batch 帐户中创建计算节点池。Next, the sample creates a pool of compute nodes in the Batch account with a call to create_pool. 这个定义的函数使用 Batch PoolAddParameter 类来设置节点数、VM 大小和池配置。This defined function uses the Batch PoolAddParameter class to set the number of nodes, VM size, and a pool configuration. 在这里,VirtualMachineConfiguration 对象指定对 Azure 市场中发布的 Ubuntu Server 18.04 LTS 映像的 ImageReferenceHere, a VirtualMachineConfiguration object specifies an ImageReference to an Ubuntu Server 18.04 LTS image published in the Azure Marketplace. Batch 支持 Azure 市场中的各种 VM 映像以及自定义 VM 映像。Batch supports a wide range of VM images in the Azure Marketplace, as well as custom VM images.

节点数和 VM 大小使用定义的常数进行设置。The number of nodes and VM size are set using defined constants. Batch 支持专用节点和低优先级节点。可以在池中使用这其中的一种,或者两种都使用。Batch supports dedicated nodes and low-priority nodes, and you can use either or both in your pools. 专用节点为池保留。Dedicated nodes are reserved for your pool. 低优先级节点在 Azure 有剩余 VM 容量时以优惠价提供。Low-priority nodes are offered at a reduced price from surplus VM capacity in Azure. 如果 Azure 没有足够的容量,低优先级节点会变得不可用。Low-priority nodes become unavailable if Azure does not have enough capacity. 默认情况下,此示例创建的池只包含 5 个大小为 Standard_A1_v2 的低优先级节点。The sample by default creates a pool containing only 5 low-priority nodes in size Standard_A1_v2.

除了物理节点属性,此池配置还包括 StartTask 对象。In addition to physical node properties, this pool configuration includes a StartTask object. StartTask 在每个节点加入池以及每次重新启动节点时在该节点上运行。The StartTask executes on each node as that node joins the pool, and each time a node is restarted. 在此示例中,StartTask 运行的 Bash shell 命令用于在节点上安装 ffmpeg 包和依赖项。In this example, the StartTask runs Bash shell commands to install the ffmpeg package and dependencies on the nodes.

pool.add 方法将池提交到 Batch 服务。The pool.add method submits the pool to the Batch service.

new_pool = batch.models.PoolAddParameter(
    id=pool_id,
    virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
        image_reference=batchmodels.ImageReference(
            publisher="Canonical",
            offer="UbuntuServer",
            sku="18.04-LTS",
            version="latest"
        ),
        node_agent_sku_id="batch.node.ubuntu 18.04"),
    vm_size=_POOL_VM_SIZE,
    target_dedicated_nodes=_DEDICATED_POOL_NODE_COUNT,
    target_low_priority_nodes=_LOW_PRIORITY_POOL_NODE_COUNT,
    start_task=batchmodels.StartTask(
        command_line="/bin/bash -c \"apt-get update && apt-get install -y ffmpeg\"",
        wait_for_success=True,
        user_identity=batchmodels.UserIdentity(
            auto_user=batchmodels.AutoUserSpecification(
                scope=batchmodels.AutoUserScope.pool,
                elevation_level=batchmodels.ElevationLevel.admin)),
    )
)
batch_service_client.pool.add(new_pool)

创建作业Create a job

Batch 作业可指定在其中运行任务的池以及可选设置,例如工作的优先级和计划。A Batch job specifies a pool to run tasks on and optional settings such as a priority and schedule for the work. 此示例通过调用 create_job 创建一个作业。The sample creates a job with a call to create_job. 这个定义的函数使用 JobAddParameter 类在池中创建作业。This defined function uses the JobAddParameter class to create a job on your pool. job.add 方法将池提交到 Batch 服务。The job.add method submits the pool to the Batch service. 作业一开始没有任务。Initially the job has no tasks.

job = batch.models.JobAddParameter(
    id=job_id,
    pool_info=batch.models.PoolInformation(pool_id=pool_id))

batch_service_client.job.add(job)

创建任务Create tasks

应用通过调用 add_tasks 在作业中创建任务。The app creates tasks in the job with a call to add_tasks. 这个定义的函数使用 TaskAddParameter 类创建任务对象的列表。This defined function creates a list of task objects using the TaskAddParameter class. 每个任务都运行 ffmpeg,使用 command_line 参数来处理输入 resource_files 对象。Each task runs ffmpeg to process an input resource_files object using a command_line parameter. ffmpeg 此前已在创建池时安装在每个节点上。ffmpeg was previously installed on each node when the pool was created. 在这里,命令行运行 ffmpeg 将每个输入 MP4(视频)文件转换为 MP3(音频)文件。Here, the command line runs ffmpeg to convert each input MP4 (video) file to an MP3 (audio) file.

此示例在运行命令行后为 MP3 文件创建 OutputFile 对象。The sample creates an OutputFile object for the MP3 file after running the command line. 每个任务的输出文件(在此示例中为一个)都会使用任务的 output_files 属性上传到关联的存储帐户中的一个容器。Each task's output files (one, in this case) are uploaded to a container in the linked storage account, using the task's output_files property.

然后,应用使用 task.add_collection 方法将任务添加到作业,使任务按顺序在计算节点上运行。Then, the app adds tasks to the job with the task.add_collection method, which queues them to run on the compute nodes.

tasks = list()

for idx, input_file in enumerate(input_files):
    input_file_path = input_file.file_path
    output_file_path = "".join((input_file_path).split('.')[:-1]) + '.mp3'
    command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(
        input_file_path, output_file_path)
    tasks.append(batch.models.TaskAddParameter(
        id='Task{}'.format(idx),
        command_line=command,
        resource_files=[input_file],
        output_files=[batchmodels.OutputFile(
            file_pattern=output_file_path,
            destination=batchmodels.OutputFileDestination(
                container=batchmodels.OutputFileBlobContainerDestination(
                    container_url=output_container_sas_url)),
            upload_options=batchmodels.OutputFileUploadOptions(
                upload_condition=batchmodels.OutputFileUploadCondition.task_success))]
    )
    )
batch_service_client.task.add_collection(job_id, tasks)

监视任务Monitor tasks

将任务添加到作业时,Batch 自动对任务排队并进行计划,方便其在关联的池中的计算节点上执行。When tasks are added to a job, Batch automatically queues and schedules them for execution on compute nodes in the associated pool. Batch 根据指定的设置处理所有任务排队、计划、重试和其他任务管理工作。Based on the settings you specify, Batch handles all task queuing, scheduling, retrying, and other task administration duties.

监视任务的执行有许多方法。There are many approaches to monitoring task execution. 此示例中的 wait_for_tasks_to_complete 函数使用 TaskState 对象监视任务在某个时限内是否出现某种状态(在此示例中为“已完成”状态)。The wait_for_tasks_to_complete function in this example uses the TaskState object to monitor tasks for a certain state, in this case the completed state, within a time limit.

while datetime.datetime.now() < timeout_expiration:
    print('.', end='')
    sys.stdout.flush()
    tasks = batch_service_client.task.list(job_id)

    incomplete_tasks = [task for task in tasks if
                        task.state != batchmodels.TaskState.completed]
    if not incomplete_tasks:
        print()
        return True
    else:
        time.sleep(1)
...

清理资源Clean up resources

运行任务之后,应用自动删除所创建的输入存储容器,并允许你选择是否删除 Batch 池和作业。After it runs the tasks, the app automatically deletes the input storage container it created, and gives you the option to delete the Batch pool and job. BatchClient 的 JobOperationsPoolOperations 类都有删除方法(在确认删除时调用)。The BatchClient's JobOperations and PoolOperations classes both have delete methods, which are called if you confirm deletion. 虽然作业和任务本身不收费,但计算节点收费。Although you're not charged for jobs and tasks themselves, you are charged for compute nodes. 因此,建议只在需要的时候分配池。Thus, we recommend that you allocate pools only as needed. 删除池时会删除节点上的所有任务输出。When you delete the pool, all task output on the nodes is deleted. 但是,输入和输出文件保留在存储帐户中。However, the input and output files remain in the storage account.

若不再需要资源组、Batch 帐户和存储帐户,请将其删除。When no longer needed, delete the resource group, Batch account, and storage account. 为此,请在 Azure 门户中选择 Batch 帐户所在的资源组,然后单击“删除资源组”。 To do so in the Azure portal, select the resource group for the Batch account and click Delete resource group.

后续步骤Next steps

本教程介绍了如何:In this tutorial, you learned about how to:

  • 通过 Batch 和存储帐户进行身份验证Authenticate with Batch and Storage accounts
  • 将输入文件上传到存储Upload input files to Storage
  • 创建运行应用程序所需的计算节点池Create a pool of compute nodes to run an application
  • 创建用于处理输入文件的作业和任务Create a job and tasks to process input files
  • 监视任务执行情况Monitor task execution
  • 检索输出文件Retrieve output files

如需更多示例,以便了解如何使用 Python API 来计划和处理 Batch 工作负荷,请参阅 GitHub 上的示例。For more examples of using the Python API to schedule and process Batch workloads, see the samples on GitHub.