什麼是 Durable Functions?

Durable Functions 是 Azure Functions延伸模組,可讓您在無伺服器計算環境中撰寫具狀態函式。 此延伸模組可讓您使用 Azure Functions 程式設計模型撰寫實體函式,藉以撰寫協調器函式和具狀態實體來定義具狀態工作流程。 在幕後,延伸模組會為您管理狀態、檢查點和重新啟動,讓您可以專注於商務邏輯。

支援的語言

Durable Functions 的設計目的是要與所有 Azure Functions 程式設計語言搭配使用,但每個語言可能有不同的最低需求。 下表顯示支援的最小應用程式群組態:

語言堆疊 Azure Functions 運行時間版本 語言背景工作角色版本 最低套件組合版本
.NET / C# / F# Functions 1.0+ 同處理序
跨處理序
n/a
JavaScript/TypeScript (V3 prog. model) Functions 2.0+ 節點8+ 2.x 套件組合
JavaScript/TypeScript (V4 prog. model) Functions 4.25+ 節點 18+ 3.15+ 套件組合
Python Functions 2.0+ Python 3.7+ 2.x 套件組合
Python (V2 prog. model) Functions 4.0+ Python 3.7+ 3.15+ 套件組合
PowerShell Functions 3.0+ PowerShell 7+ 2.x 套件組合
Java Functions 4.0+ Java 8+ 4.x 套件組合

重要

本文使用索引標籤來支援 Node.js 程式設計模型的多個版本。 v4 模型已正式推出,其設計目的是為 JavaScript 和 TypeScript 開發人員提供更靈活且直覺的體驗。 如需 v4 模型運作方式的詳細資訊,請參閱 Azure Functions Node.js 開發人員指南。 若要深入瞭解 v3 與 v4 之間的差異,請參閱 移轉指南

重要

本文使用索引標籤來支援多個版本的 Python 程式設計模型。 v2 模型已正式推出,其設計目的是要透過裝飾專案提供更以程式代碼為中心的撰寫函式。 如需 v2 模型運作方式的詳細資訊,請參閱 Azure Functions Python 開發人員指南

如同 Azure Functions,有範本可協助您使用 Visual Studio、Visual Studio Code 和 Azure 入口網站 來開發 Durable Functions。

應用程式模式

Durable Functions 的主要使用案例是簡化無伺服器應用程式中的複雜、具狀態協調需求。 下列各節描述可受益於 Durable Functions 的典型應用程式模式:

模式 #1:函式鏈結

函式鏈結模式會按照特定順序執行一連串的函式。 在此模式中,一個函式的輸出會套用至另一個函式的輸入。 每個函式之間的佇列使用可確保系統保持持久且可調整,即使有一個函式到下一個函式的控制流程也一樣。

A diagram of the function chaining pattern

您可以使用 Durable Functions 來簡潔實作函式鏈結模式,如下列範例所示。

在此範例中,值 F1F2F3F4 是相同函式應用程式中其他函式的名稱。 您可以使用一般命令式編碼建構來實作控制流程。 程式碼從上到下執行。 程式碼可以涉及現有的語言控制流程語意,例如條件和迴圈。 您可以在 try/catch/finally 區塊中包含錯誤處理邏輯。

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

您可以使用 context 參數依名稱叫用其他函式、傳遞參數,以及傳回函式輸出。 每次程式代碼呼叫 await時,Durable Functions 架構都會檢查目前函式實例的進度。 如果進程或虛擬機在執行中途回收,函式實例會從上述 await 呼叫繼續。 如需詳細資訊,請參閱下一節:模式 #2:在 中展開扇出/風扇。

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

您可以使用 context.df 物件,依名稱叫用其他函式、傳遞參數,以及傳回函式輸出。 每次程式代碼呼叫 yield時,Durable Functions 架構都會檢查目前函式實例的進度。 如果進程或虛擬機在執行中途回收,函式實例會從上述 yield 呼叫繼續。 如需詳細資訊,請參閱下一節:模式 #2:在 中展開扇出/風扇。

注意

context JavaScript 中的物件代表整個函式內容。 使用 df 主要內容上的 屬性存取 Durable Functions 內容。

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result


main = df.Orchestrator.create(orchestrator_function)

您可以使用 context 物件,依名稱叫用其他函式、傳遞參數,以及傳回函式輸出。 每次程式代碼呼叫 yield時,Durable Functions 架構都會檢查目前函式實例的進度。 如果進程或虛擬機在執行中途回收,函式實例會從上述 yield 呼叫繼續。 如需詳細資訊,請參閱下一節:模式 #2:在 中展開扇出/風扇。

注意

context Python 中的物件代表協調流程內容。 使用 function_context 協調流程內容上的 屬性存取主要 Azure Functions 內容。

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

您可以使用 Invoke-DurableActivity 命令依名稱叫用其他函式、傳遞參數,以及傳回函式輸出。 每次程式代碼呼叫 Invoke-DurableActivity 時沒有 NoWait 參數,Durable Functions 架構會檢查目前函式實例的進度。 如果進程或虛擬機在執行中途回收,函式實例會從上述 Invoke-DurableActivity 呼叫繼續。 如需詳細資訊,請參閱下一節:模式 #2:在 中展開扇出/風扇。

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

您可以使用 ctx 物件,依名稱叫用其他函式、傳遞參數,以及傳回函式輸出。 這些方法的輸出是 Task<V> 物件,其中 V 是叫用函式所傳回的數據類型。 每次呼叫 Task<V>.await()時,Durable Functions 架構都會檢查目前函式實例的進度。 如果進程在執行中途意外回收,函式實例會從上述 Task<V>.await() 呼叫繼續。 如需詳細資訊,請參閱下一節:模式 #2:在 中展開扇出/風扇。

模式 #2:在 中展開扇出/風扇

在展開傳送/收合傳送模式中,您會以平行方式執行多個函式,然後等候所有函式完成。 某些彙總工作通常會在函式傳回結果時執行。

A diagram of the fan out/fan pattern

透過一般函式,您可以讓函式將多則訊息傳送至佇列,來進行展開傳送。 回溯更具有挑戰性。 若要在一般函式中加入,您可以撰寫程式代碼來追蹤佇列觸發函式何時結束,然後儲存函式輸出。

Durable Functions 擴充功能會以相對簡單的程式代碼處理此模式:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

展開工作會散發至函式的 F2 多個實例。 系統會使用動態工作清單來追蹤此工作。 Task.WhenAll 會呼叫 以等候所有呼叫的函式完成。 然後,F2 函式輸出會從動態工作清單彙總,並傳遞至 F3 函式。

在 呼叫 Task.WhenAllawait 發生的自動檢查點可確保潛在的中途損毀或重新開機不需要重新開機已完成的工作。

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

展開工作會散發至函式的 F2 多個實例。 系統會使用動態工作清單來追蹤此工作。 context.df.Task.all 呼叫 API 以等候所有呼叫的函式完成。 然後,F2 函式輸出會從動態工作清單彙總,並傳遞至 F3 函式。

在 呼叫 context.df.Task.allyield 發生的自動檢查點可確保潛在的中途損毀或重新開機不需要重新開機已完成的工作。

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

展開工作會散發至函式的 F2 多個實例。 系統會使用動態工作清單來追蹤此工作。 context.task_all 呼叫 API 以等候所有呼叫的函式完成。 然後,F2 函式輸出會從動態工作清單彙總,並傳遞至 F3 函式。

在 呼叫 context.task_allyield 發生的自動檢查點可確保潛在的中途損毀或重新開機不需要重新開機已完成的工作。

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

展開工作會散發至函式的 F2 多個實例。 請注意函式調用上的 F2 參數使用 NoWait 方式:此參數可讓協調器繼續叫 F2 用,而不需要等待活動完成。 系統會使用動態工作清單來追蹤此工作。 呼叫 Wait-ActivityFunction 命令以等候所有呼叫的函式完成。 然後,F2 函式輸出會從動態工作清單彙總,並傳遞至 F3 函式。

在呼叫時 Wait-ActivityFunction 發生的自動檢查點可確保潛在的中途損毀或重新開機不需要重新開機已完成的工作。

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

展開工作會散發至函式的 F2 多個實例。 系統會使用動態工作清單來追蹤此工作。 ctx.allOf(parallelTasks).await() 會呼叫 以等候所有呼叫的函式完成。 然後,函 F2 式輸出會從動態工作清單匯總,並以協調器函式的輸出傳回。

在 呼叫 ctx.allOf(parallelTasks).await() 發生的自動檢查點可確保未預期的進程回收不需要重新開機任何已完成的工作。

注意

在極少數情況下,活動函式完成之後,可能會在視窗中發生當機,但在其完成之前儲存到協調流程歷程記錄中。 如果發生這種情況,活動函式會在進程復原之後從頭重新執行。

模式 #3:非同步 HTTP API

非同步 HTTP API 模式可處理與外部用戶端協調長時間執行作業狀態的問題。 實作此模式的常見方式是讓 HTTP 端點觸發長時間執行的動作。 然後,將用戶端重新導向至用戶端輪詢的狀態端點,以了解作業何時完成。

A diagram of the HTTP API pattern

Durable Functions 為此模式提供內建支援,簡化或甚至移除與長時間執行的函式執行互動所需撰寫的程式碼。 例如,Durable Functions 快速入門範例( C# JavaScript、 TypeScript Python PowerShell JAVA )會顯示簡單的 REST 命令,可用來啟動新的協調器函式實例。 在執行個體啟動之後,延伸模組會公開 Webhook HTTP API,其會查詢協調器函式狀態。

下列範例顯示啟動協調器並查詢其狀態的 REST 命令。 為了清楚起見,範例中會省略某些通訊協定詳細資料。

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

由於 Durable Functions 執行時間會為您管理狀態,因此您不需要實作自己的狀態追蹤機制。

Durable Functions 延伸模組會公開內建的 HTTP API,其會管理長時間執行的協調流程。 或者,您可以使用自己的函式觸發程式(例如 HTTP、佇列或Azure 事件中樞)和 永久性用戶端系 結,自行實作此模式。 例如,您可以使用佇列訊息來觸發終止。 或者,您可以使用受 Microsoft Entra 驗證原則保護的 HTTP 觸發程式,而不是使用產生的金鑰進行驗證的內建 HTTP API。

如需詳細資訊,請參閱 HTTP 功能 一文,其中說明如何使用 Durable Functions 擴充功能透過 HTTP 公開非同步且長時間執行的進程。

模式 #4:監視

監視模式是指工作流程中靈活的週期性流程。 一項範例是輪詢直到滿足特定條件。 您可以使用一般 計時器觸發 程式來解決基本案例,例如定期清除作業,但其間隔是靜態的,而且管理實例存留期變得很複雜。 您可以使用 Durable Functions 來建立彈性的週期間隔、管理工作存留期,以及從單一協調流程建立多個監視流程。

監視模式的範例是反轉先前的非同步 HTTP API 案例。 長時間執行的監視器不會公開外部用戶端的端點來監視長時間執行的作業,而是會取用外部端點,然後等候狀態變更。

A diagram of the monitor pattern

在幾行程式碼中,您可以使用 Durable Functions,建立多個觀察任意端點的監視器。 監視器可以在符合條件時結束執行,或者另一個函式可以使用永久性協調流程用戶端來終止監視器。 您可以根據特定條件來變更監視器的 wait 間隔(例如指數輪詢。

下列程式碼會實作基本監視器:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

收到要求時,會針對該作業識別碼建立新的協調流程實例。 實例會輪詢狀態,直到符合條件或逾時到期為止。 長期計時器會控制輪詢間隔。 然後,可以執行更多工作,或者協調流程可以結束。

模式 #5:人為互動

許多自動化流程涉及某種人類互動。 讓人類參與自動化流程相當棘手,因為人員並不像雲端服務那樣經常有空且回應迅速。 自動化流程可能會使用逾時和補償邏輯來允許此互動。

核准程式是涉及人為互動的商務程式範例。 對於超過特定金額的費用報表,可能需要經理核准。 如果經理在 72 小時內未核准費用報告(可能是經理休假),升級程式就會開始取得其他人的核准(也許是經理經理)。

A diagram of the human interaction pattern

您可以在此範例中使用協調器函式實作模式。 協調器會使用 長期計時器 來要求核准。 如果發生逾時,協調器就會呈報。 協調器會 等候外來事件 ,例如人為互動所產生的通知。

這些範例會建立核准程式來示範人類互動模式:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

若要建立長期計時器,請呼叫 context.CreateTimer 。 通知會由 context.WaitForExternalEvent 接收。 然後, Task.WhenAny 呼叫 以決定是否要呈報 (逾時先發生)或處理核准 (在逾時之前收到核准)。

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

若要建立長期計時器,請呼叫 context.df.createTimer 。 通知會由 context.df.waitForExternalEvent 接收。 然後, context.df.Task.any 呼叫 以決定是否要呈報 (逾時先發生)或處理核准 (在逾時之前收到核准)。

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

若要建立長期計時器,請呼叫 context.create_timer 。 通知會由 context.wait_for_external_event 接收。 然後, context.task_any 呼叫 以決定是否要呈報 (逾時先發生)或處理核准 (在逾時之前收到核准)。

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

若要建立長期計時器,請呼叫 Start-DurableTimer 。 通知會由 Start-DurableExternalEventListener 接收。 然後, Wait-DurableTask 呼叫 以決定是否要呈報 (逾時先發生)或處理核准 (在逾時之前收到核准)。

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

方法 ctx.waitForExternalEvent(...).await() 呼叫會暫停協調流程,直到它收到名為 ApprovalEvent 的事件,其具有 boolean 承載。 如果收到事件,則會呼叫活動函式來處理核准結果。 不過,如果在 (72 小時) 到期之前 timeout 未收到這類事件, TaskCanceledException 則會引發 ,並 Escalate 呼叫活動函式。

注意

在取用方案中執行時,等候外來事件所花費的時間不收費。

外部用戶端可以使用內建 HTTP API,將事件通知傳遞至等候的協調器函

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

您也可以使用來自相同函式應用程式中另一個函式的永久性協調流程用戶端來引發事件:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

模式 #6:匯總工具(具狀態實體)

第六個模式是將一段時間內的事件資料匯總成單一可 定址實體 。 在此模式中,匯總的資料可能來自多個來源、可能以批次方式傳遞,或可能分散在很長一段時間內。 匯總工具可能需要在事件資料送達時採取動作,而外部用戶端可能需要查詢匯總的資料。

Aggregator diagram

嘗試使用一般無狀態函式實作此模式的棘手問題在於並行控制會成為巨大的挑戰。 您不僅需要擔心多個執行緒同時修改相同的資料,您也需要擔心確保匯總工具一次只在單一 VM 上執行。

您可以使用 Durable 實體 ,輕鬆地將此模式實作為單一函式。

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

長期實體也可以模型化為 .NET 中的類別。 如果作業清單已固定且變大,此模型就很有用。 下列範例是使用 .NET 類別和方法之實體的 Counter 對等實作。

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

注意

PowerShell 目前不支援長期實體。

注意

JAVA 目前不支援長期實體。

用戶端可以使用實體用戶端系 結,將實體函 式排入佇列 作業 (也稱為「訊號」)。

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

注意

動態產生的 Proxy 也可在 .NET 中使用,以型別安全的方式向實體發出訊號。 除了發出訊號之外,用戶端還可以使用 協調流程用戶端系結上的型別安全方法 來查詢實體函式的狀態。

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

注意

PowerShell 目前不支援長期實體。

注意

JAVA 目前不支援長期實體。

實體函式適用于 C#、JavaScript 和 Python 的 Durable Functions 2.0 和更新版本。

這項技術

在幕後,Durable Functions 延伸模組建置在 Durable Task Framework 之上 ,這是 GitHub 上的開放原始碼程式庫,用來在程式碼中建置工作流程。 如同 Azure Functions 是 Azure WebJobs 的無伺服器演進,Durable Functions 是 Durable Task Framework 的無伺服器演進。 Microsoft 和其他組織會廣泛使用長期工作架構,將任務關鍵性程式自動化。 它很適合無伺服器 Azure Functions 環境。

程式碼條件約束

為了提供可靠且長時間執行的執行保證,協調器函式具有一組必須遵循的編碼規則。 如需詳細資訊,請參閱 Orchestrator 函 式程式碼條件約束 一文。

計費

Durable Functions 的計費方式與 Azure Functions 相同。 如需詳細資訊,請參閱 Azure Functions 定價 。 在 Azure Functions 取用方案中 執行協調器函式時,有一些計費行為需要注意。 如需這些行為的詳細資訊,請參閱 Durable Functions 計費 一文。

直接跳入

您可以完成下列其中一個特定語言快速入門教學課程,在 10 分鐘內開始使用 Durable Functions:

在這些快速入門中,您會在本機建立及測試 「hello world」 耐久函式。 接著,您會將函式程式碼發佈至 Azure。 您建立的函式會協調並鏈結至其他函式的呼叫。

發行集

Durable Functions 是與 Microsoft Research 共同開發的。 因此,Durable Functions 小組會積極產生研究論文和成品:這些包括:

深入了解

下列影片強調 Durable Functions 的優點:

由於 Durable Functions 是 Azure Functions 進階擴充功能,因此不適用於所有應用程式。 如需與其他 Azure 協調流程技術的比較,請參閱 比較 Azure Functions 和 Azure Logic Apps

下一步