Tutorial - Menjalankan beban kerja paralel Azure Batch menggunakan Python API

Gunakan Azure Batch untuk menjalankan tugas batch komputasi paralel dan kinerja tinggi (HPC) skala besar secara efisien di Azure. Tutorial ini berjalan melalui contoh Python menjalankan beban kerja paralel menggunakan Batch. Anda mempelajari alur kerja aplikasi Batch umum dan cara berinteraksi secara terprogram dengan sumber daya Batch dan Storage.

  • Autentikasi dengan akun Batch dan Storage.
  • Unggah file input ke Storage.
  • Buat kumpulan simpul komputasi untuk menjalankan aplikasi.
  • Buat pekerjaan dan tugas untuk memproses file input.
  • Pantau eksekusi tugas.
  • Mengambil file output.

Dalam tutorial ini, Anda mengonversi file media MP4 ke format MP3, secara paralel, dengan menggunakan alat sumber terbuka ffmpeg .

Jika Anda tidak memiliki Langganan Azure, buat Akun gratis Azure sebelum memulai.

Prasyarat

Masuk ke Azure

Masuk ke portal Azure.

Mendapatkan kredensial akun

Untuk contoh ini, Anda harus memberikan kredensial untuk akun Batch dan Storage Anda. Cara mudah untuk mendapatkan kredensial yang diperlukan ada di portal Microsoft Azure. (Anda juga bisa mendapatkan kredensial ini menggunakan Azure API atau alat baris perintah.)

  1. Pilih Semua layanan>Akun Batch, lalu pilih nama akun Batch Anda.

  2. Untuk melihat kredensial Batch, pilih Kunci. Salin nilai akun Batch, URL, dan Kunci akses primer ke editor teks.

  3. Untuk melihat nama dan kunci akun Storage, pilih Akun Storage. Salin nilai nama akun Storage dan Key1 ke editor teks.

Unduh dan jalankan aplikasi sampel

Mengunduh aplikasi sampel

Unduh atau klon aplikasi contoh dari GitHub. Untuk mengkloning repo aplikasi sampel dengan klien Git, gunakan perintah berikut:

git clone https://github.com/Azure-Samples/batch-python-ffmpeg-tutorial.git

Navigasi ke direktori yang berisi file batch_python_tutorial_ffmpeg.py.

Di lingkungan Python Anda, instal paket yang diperlukan menggunakan pip.

pip install -r requirements.txt

Gunakan editor kode untuk membuka file config.py. Perbarui string informasi masuk akun Batch dan Storage dengan nilai yang unik untuk akun Anda. Contohnya:

_BATCH_ACCOUNT_NAME = 'yourbatchaccount'
_BATCH_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxE+yXrRvJAqT9BlXwwo1CwF+SwAYOxxxxxxxxxxxxxxxx43pXi/gdiATkvbpLRl3x14pcEQ=='
_BATCH_ACCOUNT_URL = 'https://yourbatchaccount.yourbatchregion.batch.azure.com'
_STORAGE_ACCOUNT_NAME = 'mystorageaccount'
_STORAGE_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxy4/xxxxxxxxxxxxxxxxfwpbIC5aAWA8wDu+AFXZB827Mt9lybZB1nUcQbQiUrkPtilK5BQ=='

Menjalankan aplikasi

Untuk menjalankan skrip:

python batch_python_tutorial_ffmpeg.py

Saat Anda menjalankan aplikasi sampel, output konsol serupa dengan yang berikut ini. Selama eksekusi, Anda mengalami jeda pada Monitoring all tasks for 'Completed' state, timeout in 00:30:00... ketika node komputasi kumpulan dimulai.

Sample start: 11/28/2018 3:20:21 PM

Container [input] created.
Container [output] created.
Uploading file LowPriVMs-1.mp4 to container [input]...
Uploading file LowPriVMs-2.mp4 to container [input]...
Uploading file LowPriVMs-3.mp4 to container [input]...
Uploading file LowPriVMs-4.mp4 to container [input]...
Uploading file LowPriVMs-5.mp4 to container [input]...
Creating pool [LinuxFFmpegPool]...
Creating job [LinuxFFmpegJob]...
Adding 5 tasks to job [LinuxFFmpegJob]...
Monitoring all tasks for 'Completed' state, timeout in 00:30:00...
Success! All tasks completed successfully within the specified timeout period.
Deleting container [input]....

Sample end: 11/28/2018 3:29:36 PM
Elapsed time: 00:09:14.3418742

Buka akun Batch Anda di portal Microsoft Azure untuk memantau kumpulan, node komputasi, pekerjaan, dan tugas. Misalnya, untuk melihat peta panas simpul komputasi di kumpulan Anda, pilih Kumpulan>LinuxFmpegPool.

Saat tugas berjalan, peta panas serupa dengan yang berikut ini:

Screenshot of Pool heat map.

Waktu eksekusi umum adalah sekitar 5 menit ketika Anda menjalankan aplikasi dalam konfigurasi defaultnya. Pembuatan kumpulan membutuhkan waktu paling lama.

Mengambil file output

Anda dapat menggunakan portal Microsoft Azure untuk mengunduh file MP3 output yang dihasilkan oleh tugas ffmpeg.

  1. Klik Semua layanan>Akun Storage, lalu klik nama akun penyimpanan Anda.
  2. Klik Blob>output.
  3. Klik kanan salah satu file MP3 output, lalu klik Unduh. Ikuti perintah di browser Anda untuk membuka atau menyimpan file.

Download output file

Meskipun tidak ditampilkan dalam sampel ini, Anda juga dapat mengunduh file secara terprogram dari node komputasi atau dari kontainer penyimpanan.

Mengulas kode

Bagian berikut ini memecah aplikasi sampel ke dalam langkah-langkah yang dijalankan untuk memproses beban kerja di layanan Batch. Lihat kode Python saat Anda membaca sisa artikel ini, karena tidak setiap baris kode dalam sampel dibahas.

Mengautentikasi klien Blob dan Batch

Untuk berinteraksi dengan akun penyimpanan, aplikasi ini menggunakan paket azure-storage-blob untuk membuat objek BlockBlobService.

blob_client = azureblob.BlockBlobService(
    account_name=_STORAGE_ACCOUNT_NAME,
    account_key=_STORAGE_ACCOUNT_KEY)

Aplikasi ini membuat objek BatchServiceClient untuk membuat dan mengelola kumpulan, pekerjaan, dan tugas dalam layanan Batch. Klien Batch dalam sampel ini menggunakan autentikasi kunci bersama. Batch juga mendukung autentikasi melalui ID Microsoft Entra, untuk mengautentikasi pengguna individual atau aplikasi tanpa pengawas.

credentials = batchauth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME,
                                             _BATCH_ACCOUNT_KEY)

batch_client = batch.BatchServiceClient(
    credentials,
    base_url=_BATCH_ACCOUNT_URL)

Mengunggah file input

Aplikasi ini menggunakan referensi blob_client, membuat kontainer penyimpanan untuk file MP4 input dan kontainer untuk output tugas. Kemudian, ia memanggil upload_file_to_container fungsi untuk mengunggah file MP4 di direktori InputFiles lokal ke kontainer. File dalam penyimpanan didefinisikan sebagai objek Batch ResourceFile yang nantinya dapat diunduh Batch ke node komputasi.

blob_client.create_container(input_container_name, fail_on_exist=False)
blob_client.create_container(output_container_name, fail_on_exist=False)
input_file_paths = []

for folder, subs, files in os.walk(os.path.join(sys.path[0], './InputFiles/')):
    for filename in files:
        if filename.endswith(".mp4"):
            input_file_paths.append(os.path.abspath(
                os.path.join(folder, filename)))

# Upload the input files. This is the collection of files that are to be processed by the tasks.
input_files = [
    upload_file_to_container(blob_client, input_container_name, file_path)
    for file_path in input_file_paths]

Membuat kumpulan simpul komputasi

Selanjutnya, sampel membuat kumpulan node komputasi di akun Batch dengan panggilan ke create_pool. Fungsi yang ditentukan ini menggunakan class Batch PoolAddParameter untuk mengatur jumlah node, ukuran VM, dan konfigurasi kumpulan. Di sini, objek VirtualMachineConfiguration menentukan ImageReference ke gambar Ubuntu Server 20.04 LTS yang diterbitkan di Azure Marketplace. Batch mendukung berbagai citra VM di Azure Marketplace, serta citra VM kustom.

Jumlah node dan ukuran VM diatur menggunakan konstanta yang ditentukan. Batch mendukung node khusus dan node Spot, dan Anda dapat menggunakan salah satu atau keduanya di kumpulan Anda. Simpul khusus dicadangkan untuk kumpulan Anda. Node Spot ditawarkan dengan harga lebih murah dari kelebihan kapasitas mesin virtual di Azure. Simpul spot menjadi tidak tersedia jika Azure tidak memiliki kapasitas yang cukup. Sampel secara default membuat kumpulan yang hanya berisi lima simpul Spot dalam ukuran Standard_A1_v2.

Selain properti node fisik, konfigurasi kumpulan ini mencakup objek StartTask. StartTask dijalankan pada setiap node saat node tersebut bergabung dengan kumpulan, dan setiap kali node dimulai ulang. Dalam contoh ini, StartTask menjalankan perintah shell Bash untuk menginstal paket ffmpeg dan dependensi pada node.

Metode pool.add mengirimkan kumpulan ke layanan Batch.

new_pool = batch.models.PoolAddParameter(
    id=pool_id,
    virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
        image_reference=batchmodels.ImageReference(
            publisher="Canonical",
            offer="UbuntuServer",
            sku="20.04-LTS",
            version="latest"
        ),
        node_agent_sku_id="batch.node.ubuntu 20.04"),
    vm_size=_POOL_VM_SIZE,
    target_dedicated_nodes=_DEDICATED_POOL_NODE_COUNT,
    target_low_priority_nodes=_LOW_PRIORITY_POOL_NODE_COUNT,
    start_task=batchmodels.StartTask(
        command_line="/bin/bash -c \"apt-get update && apt-get install -y ffmpeg\"",
        wait_for_success=True,
        user_identity=batchmodels.UserIdentity(
            auto_user=batchmodels.AutoUserSpecification(
                scope=batchmodels.AutoUserScope.pool,
                elevation_level=batchmodels.ElevationLevel.admin)),
    )
)
batch_service_client.pool.add(new_pool)

Membuat pekerjaan

Tugas Batch menentukan kumpulan untuk menjalankan tugas dan pengaturan opsional seperti prioritas dan jadwal untuk pekerjaan. Sampel membuat pekerjaan dengan panggilan ke create_job. Fungsi yang ditentukan ini menggunakan kelas JobAddParameter untuk membuat pekerjaan pada kumpulan Anda. Metode job.add mengirimkan kumpulan ke layanan Batch. Awalnya pekerjaan tersebut tidak memiliki tugas.

job = batch.models.JobAddParameter(
    id=job_id,
    pool_info=batch.models.PoolInformation(pool_id=pool_id))

batch_service_client.job.add(job)

Membuat tugas

Aplikasi ini membuat tugas dalam pekerjaan dengan panggilan ke add_tasks. Fungsi yang telah ditentukan ini membuat daftar objek tugas menggunakan kelas TaskAddParameter. Setiap tugas menjalankan ffmpeg untuk memproses objek input resource_files menggunakan parameter command_line. ffmpeg telah diinstal sebelumnya pada setiap node ketika kumpulan dibuat. Di sini, baris perintah menjalankan ffmpeg untuk mengonversi setiap file MP4 (video) input ke file MP3 (audio).

Sampel membuat objek OutputFile untuk file MP3 setelah menjalankan baris perintah. Setiap file output tugas (satu, dalam kasus ini) diunggah ke kontainer di akun penyimpanan tertaut, menggunakan properti output_files tugas.

Kemudian, aplikasi menambahkan tugas ke pekerjaan dengan metode task.add_collection, yang mengurutkan tugas untuk berjalan pada node komputasi.

tasks = list()

for idx, input_file in enumerate(input_files):
    input_file_path = input_file.file_path
    output_file_path = "".join((input_file_path).split('.')[:-1]) + '.mp3'
    command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(
        input_file_path, output_file_path)
    tasks.append(batch.models.TaskAddParameter(
        id='Task{}'.format(idx),
        command_line=command,
        resource_files=[input_file],
        output_files=[batchmodels.OutputFile(
            file_pattern=output_file_path,
            destination=batchmodels.OutputFileDestination(
                container=batchmodels.OutputFileBlobContainerDestination(
                    container_url=output_container_sas_url)),
            upload_options=batchmodels.OutputFileUploadOptions(
                upload_condition=batchmodels.OutputFileUploadCondition.task_success))]
    )
    )
batch_service_client.task.add_collection(job_id, tasks)

Memantau tugas

Ketika tugas ditambahkan ke pekerjaan, Batch secara otomatis mengurutkan dan menjadwalkannya untuk dieksekusi pada node komputasi dalam kumpulan terkait. Berdasarkan pengaturan yang Anda tentukan, Batch menangani semua antrean tugas, penjadwalan, upaya untuk mencoba kembali, dan tugas administrasi tugas lainnya.

Ada banyak pendekatan untuk memantau eksekusi tugas. Fungsi wait_for_tasks_to_complete dalam contoh ini menggunakan objek TaskState untuk memantau tugas untuk status tertentu, dalam hal ini status selesai, dalam batas waktu.

while datetime.datetime.now() < timeout_expiration:
    print('.', end='')
    sys.stdout.flush()
    tasks = batch_service_client.task.list(job_id)

    incomplete_tasks = [task for task in tasks if
                        task.state != batchmodels.TaskState.completed]
    if not incomplete_tasks:
        print()
        return True
    else:
        time.sleep(1)
...

Membersihkan sumber daya

Setelah menjalankan tugas, aplikasi secara otomatis menghapus kontainer penyimpanan input yang dibuatnya, dan memberi Anda opsi untuk menghapus kumpulan dan pekerjaan Batch. Kelas JobOperations dan PoolOperations BatchClient memiliki metode penghapusan, yang dipanggil jika Anda mengonfirmasi penghapusan. Meskipun Anda tidak perlu membayar untuk pekerjaan dan tugas tersebut, node komputasi akan dikenakan biaya. Oleh karenanya, kami menyarankan Anda untuk mengalokasikan kumpulan hanya jika diperlukan. Saat Anda menghapus kumpulan, semua output tugas di simpul akan dihapus. Namun, file input dan output tetap ada di akun penyimpanan.

Saat tidak lagi diperlukan, hapus grup sumber daya, akun Batch, dan akun penyimpanan. Untuk melakukannya di portal Azure, pilih grup sumber daya untuk akun Batch dan pilih Hapus grup sumber daya.

Langkah berikutnya

Dalam tutorial ini, Anda mempelajari cara:

  • Autentikasi dengan akun Batch dan Storage.
  • Unggah file input ke Storage.
  • Buat kumpulan simpul komputasi untuk menjalankan aplikasi.
  • Buat pekerjaan dan tugas untuk memproses file input.
  • Pantau eksekusi tugas.
  • Mengambil file output.

Untuk contoh selengkapnya tentang menggunakan Python API untuk menjadwalkan dan memproses beban kerja Batch, lihat sampel Batch Python di GitHub.