Edit

Share via


Fan-out/fan-in scenario in Durable Functions - Cloud backup example

Fan-out/fan-in refers to the pattern of executing multiple functions concurrently and then performing some aggregation on the results. This article explains a sample that uses Durable Functions to implement a fan-in/fan-out scenario. The sample is a durable function that backs up all or some of an app's site content into Azure Storage.

Note

Version 4 of the Node.js programming model for Azure Functions is generally available. The new v4 model is designed to have a more flexible and intuitive experience for JavaScript and TypeScript developers. Learn more about the differences between v3 and v4 in the migration guide.

In the following code snippets, JavaScript (PM4) denotes programming model V4, the new experience.

Prerequisites

Scenario overview

In this sample, the functions upload all files under a specified directory recursively into blob storage. They also count the total number of bytes that were uploaded.

It's possible to write a single function that takes care of everything. The main problem you would run into is scalability. A single function execution can only run on a single virtual machine, so the throughput will be limited by the throughput of that single VM. Another problem is reliability. If there's a failure midway through, or if the entire process takes more than 5 minutes, the backup could fail in a partially completed state. It would then need to be restarted.

A more robust approach would be to write two regular functions: one would enumerate the files and add the file names to a queue, and another would read from the queue and upload the files to blob storage. This approach is better in terms of throughput and reliability, but it requires you to provision and manage a queue. More importantly, significant complexity is introduced in terms of state management and coordination if you want to do anything more, like report the total number of bytes uploaded.

A Durable Functions approach gives you all of the mentioned benefits with very low overhead.

The functions

This article explains the following functions in the sample app:

  • E2_BackupSiteContent: An orchestrator function that calls E2_GetFileList to obtain a list of files to back up, then calls E2_CopyFileToBlob to back up each file.
  • E2_GetFileList: An activity function that returns a list of files in a directory.
  • E2_CopyFileToBlob: An activity function that backs up a single file to Azure Blob Storage.

E2_BackupSiteContent orchestrator function

This orchestrator function essentially does the following:

  1. Takes a rootDirectory value as an input parameter.
  2. Calls a function to get a recursive list of files under rootDirectory.
  3. Makes multiple parallel function calls to upload each file into Azure Blob Storage.
  4. Waits for all uploads to complete.
  5. Returns the sum total bytes that were uploaded to Azure Blob Storage.

The function uses the standard function.json for orchestrator functions.

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "context",
      "type": "orchestrationTrigger",
      "direction": "in"
    }
  ]
}

Here is the code that implements the orchestrator function:

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):

    root_directory: str = context.get_input()

    if not root_directory:
        raise Exception("A directory path is required as input")

    files = yield context.call_activity("E2_GetFileList", root_directory)
    tasks = []
    for file in files:
        tasks.append(context.call_activity("E2_CopyFileToBlob", file))
    
    results = yield context.task_all(tasks)
    total_bytes = sum(results)
    return total_bytes

main = df.Orchestrator.create(orchestrator_function)

Notice the yield context.task_all(tasks); line. All the individual calls to the E2_CopyFileToBlob function were not yielded, which allows them to run in parallel. When we pass this array of tasks to context.task_all, we get back a task that won't complete until all the copy operations have completed. If you're familiar with asyncio.gather in Python, then this is not new to you. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.

Note

Although tasks are conceptually similar to Python awaitables, orchestrator functions should use yield as well as the context.task_all and context.task_any APIs to manage task parallelization.

After yielding from context.task_all, we know that all function calls have completed and have returned values back to us. Each call to E2_CopyFileToBlob returns the number of bytes uploaded, so we can calculate the sum total byte count by adding all the return values together.

Helper activity functions

The helper activity functions, as with other samples, are just regular functions that use the activityTrigger trigger binding.

E2_GetFileList activity function

The function.json file for E2_GetFileList looks like the following:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "rootDirectory",
      "type": "activityTrigger",
      "direction": "in"
    }
  ]
}

And here is the implementation:

import os
from os.path import dirname
from typing import List

def main(rootDirectory: str) -> List[str]:

    all_file_paths = []
    # We walk the file system
    for path, _, files in os.walk(rootDirectory):
        # We copy the code for activities and orchestrators
        if "E2_" in path:
            # For each file, we add their full-path to the list
            for name in files:
                if name == "__init__.py" or name == "function.json":
                    file_path = os.path.join(path, name)
                    all_file_paths.append(file_path)
    
    return all_file_paths

Note

You might be wondering why you couldn't just put this code directly into the orchestrator function. You could, but this would break one of the fundamental rules of orchestrator functions, which is that they should never do I/O, including local file system access. For more information, see Orchestrator function code constraints.

E2_CopyFileToBlob activity function

The function.json file for E2_CopyFileToBlob is similarly simple:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "filePath",
      "type": "activityTrigger",
      "direction": "in"
    }
  ]
}

The Python implementation uses the Azure Storage SDK for Python to upload the files to Azure Blob Storage.

import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError

connect_str = os.getenv('AzureWebJobsStorage')

def main(filePath: str) -> str:
    # Create the BlobServiceClient object which will be used to create a container client
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    
    # Create a unique name for the container
    container_name = "backups"
    
    # Create the container if it does not exist
    try:
        blob_service_client.create_container(container_name)
    except ResourceExistsError:
        pass

    # Create a blob client using the local file name as the name for the blob
    parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
    blob_name = parent_dir + "_" + fname
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

    # Count bytes in file
    byte_count = os.path.getsize(filePath)

    # Upload the created file
    with open(filePath, "rb") as data:
        blob_client.upload_blob(data)

    return byte_count

The implementation loads the file from disk and asynchronously streams the contents into a blob of the same name in the "backups" container. The return value is the number of bytes copied to storage, that is then used by the orchestrator function to compute the aggregate sum.

Note

This is a perfect example of moving I/O operations into an activityTrigger function. Not only can the work be distributed across many different machines, but you also get the benefits of checkpointing the progress. If the host process gets terminated for any reason, you know which uploads have already completed.

Run the sample

You can start the orchestration, on Windows, by sending the following HTTP POST request.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Alternatively, on a Linux Function App (Python currently only runs on Linux for App Service), you can start the orchestration like so:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Note

The HttpStart function that you are invoking only works with JSON-formatted content. For this reason, the Content-Type: application/json header is required and the directory path is encoded as a JSON string. Moreover, HTTP snippet assumes there is an entry in the host.json file which removes the default api/ prefix from all HTTP trigger functions URLs. You can find the markup for this configuration in the host.json file in the samples.

This HTTP request triggers the E2_BackupSiteContent orchestrator and passes the string D:\home\LogFiles as a parameter. The response provides a link to get the status of the backup operation:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Depending on how many log files you have in your function app, this operation could take several minutes to complete. You can get the latest status by querying the URL in the Location header of the previous HTTP 202 response.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

In this case, the function is still running. You are able to see the input that was saved into the orchestrator state and the last updated time. You can continue to use the Location header values to poll for completion. When the status is "Completed", you see an HTTP response value similar to the following:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

Now you can see that the orchestration is complete and approximately how much time it took to complete. You also see a value for the output field, which indicates that around 450 KB of logs were uploaded.

Next steps

This sample has shown how to implement the fan-out/fan-in pattern. The next sample shows how to implement the monitor pattern using durable timers.