Azure Functions で Python アプリのスループット パフォーマンスを向上させる

Python を使用して Azure Functions 向けに開発する場合、関数がどのように実行され、そのパフォーマンスが関数アプリのスケーリング方法にどのように影響するかを理解する必要があります。 高パフォーマンスのアプリを設計するときには、この必要性がより重要になります。 関数アプリを設計、作成、構成するときに考慮すべき主な要素は、水平スケーリングとスループット パフォーマンスの構成です。

水平スケーリング

既定では、Azure Functions は、アプリケーションの負荷を自動的に監視し、必要に応じて Python 用に追加のホスト インスタンスを作成します。 Azure Functions では、さまざまなトリガーの種類の組み込みしきい値 (メッセージの経過時間や QueueTrigger のキュー サイズなど) を使用して、インスタンスを追加するタイミングを決定します。 これらのしきい値は、ユーザーが構成することはできません。 詳細については、「Azure Functions でのイベント ドリブン スケーリング」を参照してください。

スループットのパフォーマンスの向上

ほとんどの Azure Functions アプリケーションの場合、既定の構成が適しています。 ただし、ワークロード プロファイルに基づく構成を採用することで、アプリケーションのスループットのパフォーマンスを向上させることができます。 最初の手順は、実行しているワークロードの種類を理解することです。

ワークロードの種類 関数アプリの特性
I/O バウンド • アプリで多くの同時呼び出しを処理する必要がある。
• アプリは、ネットワーク呼び出しやディスクの読み取り/書き込みなど、大量の I/O イベントを処理します。
• Web API
CPU バウンド • アプリでは、イメージのサイズ変更など、長時間実行される計算が行われます。
• アプリでは、データの変換が行われます。
• データ処理
• 機械学習推論

実際の関数ワークロードは、通常、I/O バインドと CPU バインドが混在しているため、運用環境の現実的な負荷の下でアプリをプロファイリングする必要があります。

パフォーマンス固有の構成

関数アプリのワークロード プロファイルを理解したら、次は関数のスループット パフォーマンスを向上させるために使用できる構成を行います。

Async

Python はシングルスレッド ランタイムであるため、Python のホスト インスタンスが処理できる関数呼び出しは、既定で一度に 1 つに限られます。 大量の I/O イベントを処理するアプリケーションや、I/O バインドされているアプリケーションでは、関数を非同期に実行することによってパフォーマンスを著しく向上させることができます。

関数を非同期に実行するには、async def ステートメントを使用します。これにより、asyncio を使用して関数が直接実行されます。

async def main():
    await some_nonblocking_socket_io_op()

aiohttp http クライアントを使用する HTTP トリガーを使用した関数の例を次に示します。

import aiohttp

import azure.functions as func

async def main(req: func.HttpRequest) -> func.HttpResponse:
    async with aiohttp.ClientSession() as client:
        async with client.get("PUT_YOUR_URL_HERE") as response:
            return func.HttpResponse(await response.text())

    return func.HttpResponse(body='NotFound', status_code=404)

async キーワードのない関数は、自動的に ThreadPoolExecutor スレッド プールで実行されます。

# Runs in an ThreadPoolExecutor threadpool. Number of threads is defined by PYTHON_THREADPOOL_THREAD_COUNT. 
# The example is intended to show how default synchronous function are handled.

def main():
    some_blocking_socket_io()

関数を非同期的に実行する利点を最大限に活用するには、コード内で使用されている I/O 操作/ライブラリにも async を実装する必要があります。 非同期として定義されている関数で同期 I/O 操作を使用すると、全体的なパフォーマンスが 低下する可能性が あります。 使用しているライブラリに非同期バージョンが実装されていない場合でも、アプリでイベント ループを管理して、コードを非同期で実行することでメリットが得られる可能性があります。

非同期パターンを実装したクライアント ライブラリのいくつかの例を次に示します。

  • aiohttp - asyncio の Http クライアント/サーバー
  • Streams API -ネットワーク接続を操作するための高レベルの async/await 対応プリミティブ
  • Janus Queue - Python 用のスレッドセーフな asyncio 対応キュー
  • pyzmq -ZeroMQ 用の Python バインド
Python Worker の async について

関数シグネチャの前に async を定義すると、Python はその関数をコルーチンとしてマークします。 コルーチンを呼び出すときに、イベント ループにタスクとしてスケジュールできます。 非同期関数で await を呼び出すと、イベント ループに継続が登録され、イベント ループは待機時間中に次のタスクを処理できるようになります。

Python Worker では、ワーカーは顧客の async 関数とイベント ループを共有し、複数の要求を同時に処理できます。 asyncio 互換ライブラリ (aiohttppyzmq など) を使用することを強くお勧めします。 これらの推奨事項を採用すると、同期的に実装されたライブラリと比較して、関数のスループットが大幅に向上します。

注意

関数が実装内で await なしで async として宣言されている場合、イベント ループがブロックされ、Python Worker が同時要求を処理できなくなるため、関数のパフォーマンスに深刻な影響を及ぼします。

複数の言語ワーカー プロセスを使用する

既定では、すべての Functions ホスト インスタンスに 1 つの言語ワーカー プロセスがあります。 FUNCTIONS_WORKER_PROCESS_COUNT アプリケーション設定を使用して、ホストごとのワーカー プロセスの数を増やすことができます (最大 10)。 次に、Azure Functions は、これらのワーカー間で同時関数呼び出しを均等に分散しようとします。

CPU バインド アプリの場合、言語ワーカーの数は、関数アプリごとに使用できるコアの数と同じかそれ以上である必要があります。 詳細については、「利用可能インスタンス SKU」を参照してください。

I/O バインド アプリでも、使用可能なコア数を超えてワーカー プロセスの数を増やすことでメリットが得られる場合もあります。 ワーカー数を大きく設定し過ぎると、必要なコンテキスト切り替えの数が増えるため、全体的なパフォーマンスに影響する可能性があることに注意してください。

FUNCTIONS_WORKER_PROCESS_COUNT は、需要に応じてアプリケーションをスケールアウトするときに Functions が作成する各ホストに適用されます。

言語ワーカー プロセス内のワーカーの最大数を設定する

async に関するセクションで説明したように、Python 言語ワーカーは関数とコルーチンを別々に扱います。 コルーチンは、言語ワーカーが実行されている同じイベント ループ内で実行されます。 一方、関数呼び出しは、言語ワーカーによって管理されている ThreadPoolExecutor 内でスレッドとして実行されます。

PYTHON_THREADPOOL_THREAD_COUNT アプリケーション設定を使用して、同期関数を実行する際に許可される最大ワーカー数の値を設定できます。 この値により、ThreadPoolExecutor オブジェクトの max_worker 引数が設定されます。これにより、Python は最大 max_worker スレッドのプールを使用して、非同期で呼び出しを実行できます。 PYTHON_THREADPOOL_THREAD_COUNT は、Functions ホストが作成する各ワーカーに適用されます。Python は、新しいスレッドを作成するか、アイドル状態の既存のスレッドを再利用するかを決定します。 Python の古いバージョン (3.83.73.6) では、max_worker 値は 1 に設定されています。 Python バージョン 3.9 では、max_workerNone に設定されています。

CPU にバインドされたアプリの場合、この設定を小さい数値にしておきます。1 から始めて、ワークロードを試しながら増やしていきます。 この提案は、コンテキスト切り替えに費やす時間を減らし、CPU にバインドされたタスクを終了できるようにすることを目的としています。

I/O にバインドされたアプリの場合、各呼び出しで動作するスレッドの数を増やすことで、スループットが大幅に向上します。 Python の既定値 (コア数 + 4) から始め、実際のスループット値に基づいて調整することをお勧めします。

混合ワークロード アプリの場合、スループットを最大化するには、FUNCTIONS_WORKER_PROCESS_COUNTPYTHON_THREADPOOL_THREAD_COUNT の両方の構成のバランスを取る必要があります。 関数アプリが何に最も多くの時間を費やしているかを理解するために、アプリをプロファイリングし、その動作に応じた値を設定することをお勧めします。 FUNCTIONS_WORKER_PROCESS_COUNT アプリケーション設定の詳細については、こちらのセクションも参照してください。

注意

これらの推奨事項は、HTTP によってトリガーされる関数とトリガーされない関数の両方に適用されますが、関数アプリから期待されるパフォーマンスを得るために、HTTP によってトリガーされない関数では、トリガー固有の他の構成を調整することが必要な場合があります。 この詳細については、こちらの記事をご覧ください。

イベント ループを管理する

asyncio 互換のサードパーティ製ライブラリを使用します。 ニーズを満たすサードパーティ製ライブラリがない場合は、Azure Functions でイベント ループを管理することもできます。 イベント ループを管理すると、コンピューティング リソース管理の柔軟性が向上します。また、同期 I/O ライブラリをコルーチンにラップすることも可能になります。

組み込みの asyncio ライブラリを使用した コルーチンとタスクおよび イベント ループについて説明している有用な Python 公式ドキュメントが多数あります。

例として、次の requests ライブラリを見てみましょう。このコード スニペットでは、asyncio ライブラリを使用して requests.get() メソッドをコルーチンにラップし、SAMPLE_URL への複数の Web 要求を同時に実行します。

import asyncio
import json
import logging

import azure.functions as func
from time import time
from requests import get, Response


async def invoke_get_request(eventloop: asyncio.AbstractEventLoop) -> Response:
    # Wrap requests.get function into a coroutine
    single_result = await eventloop.run_in_executor(
        None,  # using the default executor
        get,  # each task call invoke_get_request
        'SAMPLE_URL'  # the url to be passed into the requests.get function
    )
    return single_result

async def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    eventloop = asyncio.get_event_loop()

    # Create 10 tasks for requests.get synchronous call
    tasks = [
        asyncio.create_task(
            invoke_get_request(eventloop)
        ) for _ in range(10)
    ]

    done_tasks, _ = await asyncio.wait(tasks)
    status_codes = [d.result().status_code for d in done_tasks]

    return func.HttpResponse(body=json.dumps(status_codes),
                             mimetype='application/json')

垂直方向のスケーリング

特に CPU にバインドされた操作で処理ユニットを増やす場合、仕様が高い Premium プランにアップグレードすることで、これを実現できる可能性があります。 より高度な処理ユニットを使用すると、使用可能なコアの数に応じてワーカー プロセス数を調整し、並列処理の次数を増やすことができます。

次のステップ

Python による Azure Functions 開発の詳細については、次のリソースを参照してください。