Events
May 19, 6 PM - May 23, 12 AM
Calling all developers, creators, and AI innovators to join us in Seattle @Microsoft Build May 19-22.
Register todayThis browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
Fan-out/fan-in refers to the pattern of executing multiple functions concurrently and then performing some aggregation on the results. This article explains a sample that uses Durable Functions to implement a fan-in/fan-out scenario. The sample is a durable function that backs up all or some of an app's site content into Azure Storage.
Note
Version 4 of the Node.js programming model for Azure Functions is generally available. The new v4 model is designed to have a more flexible and intuitive experience for JavaScript and TypeScript developers. Learn more about the differences between v3 and v4 in the migration guide.
In the following code snippets, JavaScript (PM4) denotes programming model V4, the new experience.
In this sample, the functions upload all files under a specified directory recursively into blob storage. They also count the total number of bytes that were uploaded.
It's possible to write a single function that takes care of everything. The main problem you would run into is scalability. A single function execution can only run on a single virtual machine, so the throughput will be limited by the throughput of that single VM. Another problem is reliability. If there's a failure midway through, or if the entire process takes more than 5 minutes, the backup could fail in a partially completed state. It would then need to be restarted.
A more robust approach would be to write two regular functions: one would enumerate the files and add the file names to a queue, and another would read from the queue and upload the files to blob storage. This approach is better in terms of throughput and reliability, but it requires you to provision and manage a queue. More importantly, significant complexity is introduced in terms of state management and coordination if you want to do anything more, like report the total number of bytes uploaded.
A Durable Functions approach gives you all of the mentioned benefits with very low overhead.
This article explains the following functions in the sample app:
E2_BackupSiteContent
: An orchestrator function that calls E2_GetFileList
to obtain a list of files to back up, then calls E2_CopyFileToBlob
to back up each file.E2_GetFileList
: An activity function that returns a list of files in a directory.E2_CopyFileToBlob
: An activity function that backs up a single file to Azure Blob Storage.This orchestrator function essentially does the following:
rootDirectory
value as an input parameter.rootDirectory
.The function uses the standard function.json for orchestrator functions.
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
Here is the code that implements the orchestrator function:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Notice the yield context.task_all(tasks);
line. All the individual calls to the E2_CopyFileToBlob
function were not yielded, which allows them to run in parallel. When we pass this array of tasks to context.task_all
, we get back a task that won't complete until all the copy operations have completed. If you're familiar with asyncio.gather
in Python, then this is not new to you. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.
Note
Although tasks are conceptually similar to Python awaitables, orchestrator functions should use yield
as well as the context.task_all
and context.task_any
APIs to manage task parallelization.
After yielding from context.task_all
, we know that all function calls have completed and have returned values back to us. Each call to E2_CopyFileToBlob
returns the number of bytes uploaded, so we can calculate the sum total byte count by adding all the return values together.
The helper activity functions, as with other samples, are just regular functions that use the activityTrigger
trigger binding.
The function.json file for E2_GetFileList
looks like the following:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
]
}
And here is the implementation:
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Note
You might be wondering why you couldn't just put this code directly into the orchestrator function. You could, but this would break one of the fundamental rules of orchestrator functions, which is that they should never do I/O, including local file system access. For more information, see Orchestrator function code constraints.
The function.json file for E2_CopyFileToBlob
is similarly simple:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
}
]
}
The Python implementation uses the Azure Storage SDK for Python to upload the files to Azure Blob Storage.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
The implementation loads the file from disk and asynchronously streams the contents into a blob of the same name in the "backups" container. The return value is the number of bytes copied to storage, that is then used by the orchestrator function to compute the aggregate sum.
Note
This is a perfect example of moving I/O operations into an activityTrigger
function. Not only can the work be distributed across many different machines, but you also get the benefits of checkpointing the progress. If the host process gets terminated for any reason, you know which uploads have already completed.
You can start the orchestration, on Windows, by sending the following HTTP POST request.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Alternatively, on a Linux Function App (Python currently only runs on Linux for App Service), you can start the orchestration like so:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Note
The HttpStart
function that you are invoking only works with JSON-formatted content. For this reason, the Content-Type: application/json
header is required and the directory path is encoded as a JSON string. Moreover, HTTP snippet assumes there is an entry in the host.json
file which removes the default api/
prefix from all HTTP trigger functions URLs. You can find the markup for this configuration in the host.json
file in the samples.
This HTTP request triggers the E2_BackupSiteContent
orchestrator and passes the string D:\home\LogFiles
as a parameter. The response provides a link to get the status of the backup operation:
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
Depending on how many log files you have in your function app, this operation could take several minutes to complete. You can get the latest status by querying the URL in the Location
header of the previous HTTP 202 response.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
In this case, the function is still running. You are able to see the input that was saved into the orchestrator state and the last updated time. You can continue to use the Location
header values to poll for completion. When the status is "Completed", you see an HTTP response value similar to the following:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Now you can see that the orchestration is complete and approximately how much time it took to complete. You also see a value for the output
field, which indicates that around 450 KB of logs were uploaded.
This sample has shown how to implement the fan-out/fan-in pattern. The next sample shows how to implement the monitor pattern using durable timers.
Events
May 19, 6 PM - May 23, 12 AM
Calling all developers, creators, and AI innovators to join us in Seattle @Microsoft Build May 19-22.
Register today