Durable Functions のファンアウト/ファンイン シナリオ - クラウド バックアップの例Fan-out/fan-in scenario in Durable Functions - Cloud backup example

"ファンアウト/ファンイン" は、複数の関数を同時に実行した後、その結果に対して集計を行うパターンを指します。Fan-out/fan-in refers to the pattern of executing multiple functions concurrently and then performing some aggregation on the results. この記事では、Durable Functions を使用してファンイン/ファンアウト シナリオを実装するサンプルについて説明します。This article explains a sample that uses Durable Functions to implement a fan-in/fan-out scenario. このサンプルは、アプリのサイトのコンテンツの一部またはすべてを Azure Storage にバックアップする永続関数です。The sample is a durable function that backs up all or some of an app's site content into Azure Storage.

前提条件Prerequisites

シナリオの概要Scenario overview

このサンプルでは、関数は、指定されたディレクトリの下にあるすべてのファイルを BLOB ストレージに再帰的にアップロードします。In this sample, the functions upload all files under a specified directory recursively into blob storage. さらに、アップロードされたバイトの合計数をカウントします。They also count the total number of bytes that were uploaded.

すべてを管理する単一の関数を記述できます。It's possible to write a single function that takes care of everything. その際に発生する最大の問題はスケーラビリティです。The main problem you would run into is scalability. 単一の関数は、単一の仮想マシンでのみ実行できるため、スループットは単一の VM のスループットによって制限されます。A single function execution can only run on a single virtual machine, so the throughput will be limited by the throughput of that single VM. 別の問題として、信頼性があります。Another problem is reliability. 途中でエラーが発生した場合、またはプロセス全体が 5 分以上かかる場合、バックアップは部分的に完了した状態で失敗する可能性があります。If there's a failure midway through, or if the entire process takes more than 5 minutes, the backup could fail in a partially completed state. これにより、再起動が必要になることがあります。It would then need to be restarted.

もっと堅牢な方法は、2 つの標準的な関数 (ファイルを列挙し、ファイル名をキューに追加する関数と、キューからファイルを読み取り、そのファイルを BLOB ストレージにアップロードする関数) を記述することです。A more robust approach would be to write two regular functions: one would enumerate the files and add the file names to a queue, and another would read from the queue and upload the files to blob storage. このアプローチにより、スループットと信頼性は向上しますが、キューのプロビジョニングと管理を行う必要があります。This approach is better in terms of throughput and reliability, but it requires you to provision and manage a queue. さらに重要なのは、アップロードされた合計バイト数の報告などを行うと、状態管理調整が非常に複雑になることです。More importantly, significant complexity is introduced in terms of state management and coordination if you want to do anything more, like report the total number of bytes uploaded.

Durable Functions を使用する方法は、上記の利点を非常に少ないオーバーヘッドで実現できます。A Durable Functions approach gives you all of the mentioned benefits with very low overhead.

関数The functions

この記事では、サンプル アプリで使用されている次の関数について説明します。This article explains the following functions in the sample app:

  • E2_BackupSiteContent:E2_GetFileList を呼び出してバックアップするファイルのリストを取得してから、E2_CopyFileToBlob を呼び出して各ファイルをバックアップするオーケストレーター関数E2_BackupSiteContent: An orchestrator function that calls E2_GetFileList to obtain a list of files to back up, then calls E2_CopyFileToBlob to back up each file.
  • E2_GetFileList:ディレクトリ内のファイルのリストを返すアクティビティ関数E2_GetFileList: An activity function that returns a list of files in a directory.
  • E2_CopyFileToBlob:1 つのファイルを Azure Blob Storage にバックアップするアクティビティ関数。E2_CopyFileToBlob: An activity function that backs up a single file to Azure Blob Storage.

E2_BackupSiteContent オーケストレーター関数E2_BackupSiteContent orchestrator function

このオーケストレーター関数は、基本的に次の操作を行います。This orchestrator function essentially does the following:

  1. rootDirectory 値を入力パラメーターとして使用します。Takes a rootDirectory value as an input parameter.
  2. rootDirectory の下のファイルの再帰リストを取得する関数を呼び出します。Calls a function to get a recursive list of files under rootDirectory.
  3. 複数の並列関数を呼び出して、各ファイルを Azure Blob Storage にアップロードします。Makes multiple parallel function calls to upload each file into Azure Blob Storage.
  4. すべてのアップロードが完了するまで待機します。Waits for all uploads to complete.
  5. Azure Blob ストレージにアップロードされたバイト数の合計を返します。Returns the sum total bytes that were uploaded to Azure Blob Storage.

オーケストレーター関数を実装するコードを次に示します。Here is the code that implements the orchestrator function:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

await Task.WhenAll(tasks); 行に注目してください。Notice the await Task.WhenAll(tasks); line. E2_CopyFileToBlob 関数への個々の呼び出しがすべて待機されていて並列実行が可能なわけではありませんAll the individual calls to the E2_CopyFileToBlob function were not awaited, which allows them to run in parallel. このタスク配列を Task.WhenAll に渡すと、"すべてのコピー操作が完了するまで" 完了することがない 1 つのタスクが戻ります。When we pass this array of tasks to Task.WhenAll, we get back a task that won't complete until all the copy operations have completed. .NET のタスク並列ライブラリ (TPL) を知っていれば、これは新しい事柄ではありません。If you're familiar with the Task Parallel Library (TPL) in .NET, then this is not new to you. 違いは、これらのタスクが複数の仮想マシンで同時に実行される可能性があることと、Durable Functions 拡張機能によって、プロセスのリサイクルに対してエンド ツー エンドの実行が回復することが保証されることです。The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.

Task.WhenAll から応答が返ることは、すべての関数呼び出しが完了し、値が戻っていることを意味します。After awaiting from Task.WhenAll, we know that all function calls have completed and have returned values back to us. E2_CopyFileToBlob への各呼び出しがアップロードしたバイト数を返しているため、バイト数の合計を計算することは、これらの返された値をすべて合計するだけの操作です。Each call to E2_CopyFileToBlob returns the number of bytes uploaded, so calculating the sum total byte count is a matter of adding all those return values together.

ヘルパー アクティビティ関数Helper activity functions

ヘルパー アクティビティ関数は、他のサンプルと同じように、activityTrigger トリガー バインドを使う標準的な関数です。The helper activity functions, as with other samples, are just regular functions that use the activityTrigger trigger binding.

E2_GetFileList アクティビティ関数E2_GetFileList activity function

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

注意

このコードをオーケストレーター関数に直接配置できないことを疑問に思うかもしれません。You might be wondering why you couldn't just put this code directly into the orchestrator function. 配置することは可能ですが、それを行うと、オーケストレーター関数の基本ルールの 1 つである、ローカル ファイル システムへのアクセスを含めて I/O 操作を行うべきではないというルールを破ることになります。You could, but this would break one of the fundamental rules of orchestrator functions, which is that they should never do I/O, including local file system access. 詳細については、「オーケストレーター関数コードの制約」を参照してください。For more information, see Orchestrator function code constraints.

E2_CopyFileToBlob アクティビティ関数E2_CopyFileToBlob activity function

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

注意

サンプル コードを実行するには、Microsoft.Azure.WebJobs.Extensions.Storage NuGet パッケージをインストールする必要があります。You will need to install the Microsoft.Azure.WebJobs.Extensions.Storage NuGet package to run the sample code.

関数は、Azure Functions のバインドの高度な機能を使用します (つまり、Binder パラメーター の使用) が、このチュートリアルでは、詳細を気にする必要はありません。The function uses some advanced features of Azure Functions bindings (that is, the use of the Binder parameter), but you don't need to worry about those details for the purpose of this walkthrough.

この実装は、ディスクからファイルを読み込み、"backups" コンテナー内の同じ名前の BLOB に内容を非同期でストリーミングします。The implementation loads the file from disk and asynchronously streams the contents into a blob of the same name in the "backups" container. 戻り値はストレージにコピーされたバイト数であり、この数値がオーケストレーター関数によって集計の合計を計算するために使用されます。The return value is the number of bytes copied to storage, that is then used by the orchestrator function to compute the aggregate sum.

注意

これは、I/O 操作を activityTrigger 関数に移動させる完璧な例です。This is a perfect example of moving I/O operations into an activityTrigger function. 作業をさまざまなマシンに分散できるだけではなく、進行状況のチェックポイント処理のメリットも得ることができます。Not only can the work be distributed across many different machines, but you also get the benefits of checkpointing the progress. ホスト プロセスが何らかの理由で終了した場合でも、どのアップロードが完了しているかがわかります。If the host process gets terminated for any reason, you know which uploads have already completed.

サンプルを実行するRun the sample

次の HTTP POST 要求を送信してオーケストレーションを開始できます。You can start the orchestration by sending the following HTTP POST request.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

注意

呼び出している HttpStart 関数は、JSON 形式のコンテンツでのみ動作します。The HttpStart function that you are invoking only works with JSON-formatted content. このため、Content-Type: application/json ヘッダーは必須であり、ディレクトリ パスは JSON 文字列としてエンコードされます。For this reason, the Content-Type: application/json header is required and the directory path is encoded as a JSON string. さらに、HTTP スニペットでは、既定の api/ プレフィックスをすべての HTTP トリガー関数 URL から削除するエントリが host.json ファイルにあることを想定しています。Moreover, HTTP snippet assumes there is an entry in the host.json file which removes the default api/ prefix from all HTTP trigger functions URLs. この構成のマークアップはサンプルの host.json ファイルにあります。You can find the markup for this configuration in the host.json file in the samples.

この HTTP 要求で E2_BackupSiteContent オーケストレーターがトリガーされ、文字列 D:\home\LogFiles がパラメーターとして渡されます。This HTTP request triggers the E2_BackupSiteContent orchestrator and passes the string D:\home\LogFiles as a parameter. 応答は、このバックアップ操作の状態を取得するためのリンクを提供します。The response provides a link to get the status of the backup operation:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

この操作は、関数アプリ内のログ ファイルの数によっては、完了するまで数分かかる場合があります。Depending on how many log files you have in your function app, this operation could take several minutes to complete. 前の HTTP 202 応答の Location ヘッダー内の URL をクエリすることで、最新の状態を取得できます。You can get the latest status by querying the URL in the Location header of the previous HTTP 202 response.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

ここでは、関数はまだ実行中です。In this case, the function is still running. オーケストレーターの状態と最終更新時間に保存された入力を確認できます。You are able to see the input that was saved into the orchestrator state and the last updated time. Location ヘッダーの値を引き続き使用して、完了するまでポーリングできます。You can continue to use the Location header values to poll for completion. 状態が "Completed" になると、次のような HTTP 応答値が表示されます。When the status is "Completed", you see an HTTP response value similar to the following:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

これで、オーケストレーションが完了したこと、完了までにかかったおおよその時間を確認できます。Now you can see that the orchestration is complete and approximately how much time it took to complete. output フィールドの値から、約 450 KB のログがアップロードされたことも確認できます。You also see a value for the output field, which indicates that around 450 KB of logs were uploaded.

次のステップNext steps

このサンプルでは、ファンアウト/ファンイン パターンの実装方法について説明しました。This sample has shown how to implement the fan-out/fan-in pattern. 次のサンプルでは、永続的タイマーを使用して監視パターンを実装する方法を示します。The next sample shows how to implement the monitor pattern using durable timers.