Co je Durable Functions?

Durable Functions je rozšíření Azure Functions, které umožňuje psát stavové funkce v bezserverovém výpočetním prostředí. Rozšíření umožňuje definovat stavové pracovní postupy tím, že pomocí programovacího modelu Azure Functions napíšete funkce orchestrátoru a stavové entity. Rozšíření na pozadí spravuje stav, kontrolní body a restartuje, takže se můžete soustředit na obchodní logiku.

Podporované jazyky

Durable Functions je navržený tak, aby fungoval se všemi programovacími jazyky Azure Functions, ale pro každý jazyk může mít jiné minimální požadavky. Následující tabulka uvádí minimální podporované konfigurace aplikací:

Zásobník jazyků Verze modulu runtime Azure Functions Verze pracovního procesu jazyka Minimální verze sad
.NET / C# / F# Funkce 1.0+ Probíhá proces
Mimoprocesové
Není k dispozici
JavaScript/TypeScript (model verze 3) Funkce 2.0+ Uzel 8 a novější Sady 2.x
JavaScript/TypeScript (model v4 prog) Funkce 4.25 a novější Uzel 18 nebo novější 3.15+ sady
Python Funkce 2.0+ Python 3.7 nebo novější Sady 2.x
Python (v2 prog. model) Funkce 4.0 a novější Python 3.7 nebo novější 3.15+ sady
PowerShell Funkce 3.0 a novější PowerShell 7 a novější Sady 2.x
Java Funkce 4.0 a novější Java 8 a novější Sady 4.x

Důležité

Tento článek používá karty pro podporu více verzí programovacího modelu Node.js. Model v4 je obecně dostupný a je navržený tak, aby měl flexibilnější a intuitivnější prostředí pro vývojáře v JavaScriptu a TypeScriptu. Další podrobnosti o tom, jak model v4 funguje, najdete v příručce pro vývojáře Azure Functions Node.js. Další informace o rozdílech mezi v3 a v4 najdete v průvodci migrací.

Důležité

Tento článek používá karty pro podporu více verzí programovacího modelu Pythonu. Model v2 je obecně dostupný a je navržený tak, aby poskytoval přístupnější způsob vytváření funkcí prostřednictvím dekorátorů. Další podrobnosti o tom, jak model v2 funguje, najdete v příručce pro vývojáře v Pythonu pro Azure Functions.

Podobně jako Azure Functions existují šablony, které vám pomůžou vyvíjet Durable Functions pomocí sady Visual Studio, Visual Studio Code a webu Azure Portal.

Vzory aplikací

Primárním případem použití Durable Functions je zjednodušení složitých a stavových požadavků koordinace v bezserverových aplikacích. Následující části popisují typické vzory aplikací, které můžou využívat Durable Functions:

Pattern č. 1: Řetězení funkcí

V modelu zřetězení funkcí se posloupnost funkcí provede v určitém pořadí. V tomto vzoru se výstup jedné funkce použije na vstup jiné funkce. Použití front mezi jednotlivými funkcemi zajišťuje, že systém zůstane trvalý a škálovatelný, i když existuje tok řízení z jedné funkce do další.

A diagram of the function chaining pattern

Durable Functions můžete použít k implementaci modelu řetězení funkcí stručně, jak je znázorněno v následujícím příkladu.

V tomto příkladu jsou hodnoty F1, F2, F3a F4 jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může zahrnovat sémantiku toku řízení jazyka, jako jsou podmíněné výrazy a smyčky. Logiku zpracování chyb můžete zahrnout do try//catchfinally bloků.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

Tento parametr můžete použít context k vyvolání dalších funkcí podle názvu, předání parametrů a výstupu návratové funkce. Pokaždé, když kód volá await, Durable Functions framework kontrolní body průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího await volání. Další informace najdete v další části Vzor č. 2: Ventilátor nebo ventilátor.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

Objekt můžete použít context.df k vyvolání dalších funkcí podle názvu, předání parametrů a návratového výstupu funkce. Pokaždé, když kód volá yield, Durable Functions framework kontrolní body průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího yield volání. Další informace najdete v další části Vzor č. 2: Ventilátor nebo ventilátor.

Poznámka:

Objekt context v JavaScriptu představuje celý kontext funkce. Přístup k kontextu Durable Functions pomocí df vlastnosti v hlavním kontextu.

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result


main = df.Orchestrator.create(orchestrator_function)

Objekt můžete použít context k vyvolání dalších funkcí podle názvu, předání parametrů a návratového výstupu funkce. Pokaždé, když kód volá yield, Durable Functions framework kontrolní body průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího yield volání. Další informace najdete v další části Vzor č. 2: Ventilátor nebo ventilátor.

Poznámka:

Objekt context v Pythonu představuje kontext orchestrace. Přístup k hlavnímu function_context kontextu Azure Functions pomocí vlastnosti v kontextu orchestrace.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Příkaz můžete použít Invoke-DurableActivity k vyvolání dalších funkcí podle názvu, předání parametrů a výstupu návratové funkce. Pokaždé, když kód volá Invoke-DurableActivity bez NoWait přepínače, architektura Durable Functions zkontroluje průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího Invoke-DurableActivity volání. Další informace najdete v další části Vzor č. 2: Ventilátor nebo ventilátor.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

Objekt můžete použít ctx k vyvolání dalších funkcí podle názvu, předání parametrů a návratového výstupu funkce. Výstup těchto metod je objekt, Task<V> kde V je typ dat vrácený vyvolanou funkcí. Pokaždé, když zavoláte Task<V>.await(), Durable Functions framework kontrolní body průběh aktuální instance funkce. Pokud proces neočekávaně recykluje uprostřed provádění, instance funkce se obnoví z předchozího Task<V>.await() volání. Další informace najdete v další části Vzor č. 2: Ventilátor nebo ventilátor.

Vzorek č. 2: Ventilátor nebo ventilátor v

Ve vzorci ventilátoru/ventilátoru spustíte paralelně více funkcí a pak počkáte na dokončení všech funkcí. U výsledků vrácených z funkcí se často provádí určitá agregační práce.

A diagram of the fan out/fan pattern

Díky normálním funkcím můžete posílat více zpráv do fronty. Fanning vzadu je mnohem náročnější. Pokud chcete ventilátorovat, napíšete v normální funkci kód, který bude sledovat, kdy funkce aktivované frontou končí, a pak uloží výstupy funkce.

Rozšíření Durable Functions zpracovává tento vzor s relativně jednoduchým kódem:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

Práce s ventilátorem se distribuuje F2 do několika instancí funkce. Práce se sleduje pomocí dynamického seznamu úkolů. Task.WhenAll je volána k čekání na dokončení všech volané funkce. F2 Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3.

Automatické vytváření kontrolních bodů, které se děje při await volání, Task.WhenAll zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

Práce s ventilátorem se distribuuje F2 do několika instancí funkce. Práce se sleduje pomocí dynamického seznamu úkolů. context.df.Task.all Rozhraní API se volá, aby čekalo na dokončení všech volavaných funkcí. F2 Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3.

Automatické vytváření kontrolních bodů, které se děje při yield volání, context.df.Task.all zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

Práce s ventilátorem se distribuuje F2 do několika instancí funkce. Práce se sleduje pomocí dynamického seznamu úkolů. context.task_all Rozhraní API se volá, aby čekalo na dokončení všech volavaných funkcí. F2 Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3.

Automatické vytváření kontrolních bodů, které se děje při yield volání, context.task_all zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

Práce s ventilátorem se distribuuje F2 do několika instancí funkce. Všimněte si použití NoWait přepínače F2 při vyvolání funkce: tento přepínač umožňuje orchestrátoru pokračovat v volání F2 bez čekání na dokončení aktivity. Práce se sleduje pomocí dynamického seznamu úkolů. Příkaz Wait-ActivityFunction se volá, aby počkal, až se dokončí všechny volané funkce. F2 Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3.

Automatické vytváření kontrolních bodů, které se děje při Wait-ActivityFunction volání, zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

Práce s ventilátorem se distribuuje F2 do několika instancí funkce. Práce se sleduje pomocí dynamického seznamu úkolů. ctx.allOf(parallelTasks).await() je volána k čekání na dokončení všech volané funkce. F2 Výstupy funkce se pak agregují ze seznamu dynamických úloh a vrátí se jako výstup funkce orchestrátoru.

Automatické vytváření kontrolních bodů, které se děje při .await() volání, ctx.allOf(parallelTasks) zajišťuje, že neočekávaný recyklace procesu nevyžaduje restartování již dokončených úkolů.

Poznámka:

Ve výjimečných případech je možné, že po dokončení funkce aktivity dojde k chybovému ukončení v okně, ale před uložením jeho dokončení do historie orchestrace. Pokud k tomu dojde, funkce aktivity by se po obnovení procesu znovu spustila od začátku.

Vzor č. 3: Asynchronní rozhraní HTTP API

Asynchronní vzor rozhraní HTTP API řeší problém koordinace stavu dlouhotrvajících operací s externími klienty. Běžným způsobem implementace tohoto modelu je, že koncový bod HTTP aktivuje dlouhotrvající akci. Pak přesměrujte klienta na koncový bod stavu, který klient dotazuje, až se operace dokončí.

A diagram of the HTTP API pattern

Durable Functions poskytuje integrovanou podporu pro tento vzor, zjednodušuje nebo dokonce odebírá kód, který potřebujete k zápisu pro interakci s dlouhotrvajícími spouštěními funkcí. Ukázky rychlého startu Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell a Java) ukazují jednoduchý příkaz REST, který můžete použít ke spuštění nových instancí funkcí orchestrátoru. Po spuštění instance rozšíření zveřejňuje rozhraní API HTTP webhooku, která se dotazují na stav funkce orchestrátoru.

Následující příklad ukazuje příkazy REST, které spouští orchestrátor a dotazuje se na jeho stav. Kvůli přehlednosti jsou některé podrobnosti protokolu z příkladu vynechány.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Vzhledem k tomu, že modul runtime Durable Functions spravuje stav za vás, nemusíte implementovat vlastní mechanismus sledování stavu.

Rozšíření Durable Functions zveřejňuje integrovaná rozhraní API HTTP, která spravují dlouhotrvající orchestrace. Tento vzor můžete implementovat sami pomocí vlastních triggerů funkcí (například HTTP, fronty nebo služby Azure Event Hubs) a trvalé vazby klienta. K aktivaci ukončení můžete například použít zprávu fronty. Nebo můžete použít trigger HTTP, který je chráněný zásadami ověřování Microsoft Entra místo integrovaných rozhraní HTTP API, která k ověřování používají vygenerovaný klíč.

Další informace najdete v článku o funkcích HTTP, který vysvětluje, jak můžete zveřejnit asynchronní dlouhotrvající procesy přes HTTP pomocí rozšíření Durable Functions.

Vzor č. 4: Monitorování

Vzor monitorování odkazuje na flexibilní opakující se proces v pracovním postupu. Příkladem je dotazování, dokud nebudou splněny konkrétní podmínky. Pomocí běžného triggeru časovače můžete řešit základní scénář, jako je například pravidelná úloha čištění, ale jeho interval je statický a správa životností instancí je složitá. Durable Functions můžete použít k vytváření flexibilních intervalů opakování, správě životností úloh a vytváření více procesů monitorování z jedné orchestrace.

Příkladem modelu monitorování je obrácení předchozího asynchronního scénáře rozhraní HTTP API. Místo vystavení koncového bodu pro externího klienta pro monitorování dlouhotrvající operace využívá dlouhotrvající monitorování externí koncový bod a pak čeká na změnu stavu.

A diagram of the monitor pattern

V několika řádcích kódu můžete pomocí Durable Functions vytvořit několik monitorů, které sledují libovolné koncové body. Monitorování můžou ukončit provádění, pokud je splněna podmínka, nebo jiná funkce může k ukončení monitorování použít trvalý orchestrační klient. Interval monitorování wait můžete změnit na základě konkrétní podmínky (například exponenciální zpoždnění).)

Následující kód implementuje základní monitorování:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Po přijetí požadavku se pro toto ID úlohy vytvoří nová instance orchestrace. Instance se dotazuje na stav, dokud není splněna podmínka nebo dokud nevyprší časový limit. Trvalý časovač řídí interval dotazování. Pak je možné provést více práce nebo orchestrace může končit.

Vzor č. 5: Lidská interakce

Mnoho automatizovaných procesů zahrnuje určitý druh lidské interakce. Zapojení lidí do automatizovaného procesu je složité, protože lidé nejsou tak vysoce dostupné a reagují jako cloudové služby. Automatizovaný proces může této interakci umožnit pomocí časových limitů a logiky kompenzace.

Proces schvalování je příkladem obchodního procesu, který zahrnuje lidskou interakci. Schválení manažera může být vyžadováno pro sestavu výdajů, která překračuje určitou částku v dolarech. Pokud manažer zprávu o výdajích neschválí do 72 hodin (může jít o manažera na dovolenou), zahájí se proces eskalace, který získá schválení od někoho jiného (možná manažera).

A diagram of the human interaction pattern

Vzor v tomto příkladu můžete implementovat pomocí funkce orchestrátoru. Orchestrátor používá trvalý časovač k vyžádání schválení. Orchestrátor eskaluje, pokud dojde k vypršení časového limitu. Orchestrátor čeká na externí událost, například na oznámení vygenerované lidskou interakcí.

Tyto příklady vytvoří schvalovací proces, který předvede vzor lidské interakce:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Chcete-li vytvořit trvalý časovač, volání context.CreateTimer. Oznámení obdrží context.WaitForExternalEvent. Pak se zavolá, aby se rozhodlo, Task.WhenAny jestli se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Chcete-li vytvořit trvalý časovač, volání context.df.createTimer. Oznámení obdrží context.df.waitForExternalEvent. Pak se zavolá, aby se rozhodlo, context.df.Task.any jestli se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Chcete-li vytvořit trvalý časovač, volání context.create_timer. Oznámení obdrží context.wait_for_external_event. Pak se zavolá, aby se rozhodlo, context.task_any jestli se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Chcete-li vytvořit trvalý časovač, volání Start-DurableTimer. Oznámení obdrží Start-DurableExternalEventListener. Pak se zavolá, aby se rozhodlo, Wait-DurableTask jestli se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

Volání ctx.waitForExternalEvent(...).await() metody pozastaví orchestraci, dokud neobdrží událost s názvem ApprovalEvent, která má datovou boolean část. Pokud je událost přijata, volá se funkce aktivity pro zpracování výsledku schválení. Pokud však žádná taková událost není přijata před vypršením timeout platnosti (72 hodin), vyvolá se vyvolání TaskCanceledException a Escalate zavolá se funkce aktivity.

Poznámka:

Za dobu strávenou čekáním na externí události při spuštění v plánu Consumption se neúčtují žádné poplatky.

Externí klient může doručit oznámení události do funkce čekající orchestrátoru pomocí integrovaných rozhraní API HTTP:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Událost lze vyvolat také pomocí klienta trvalé orchestrace z jiné funkce ve stejné aplikaci funkcí:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Vzor č. 6: Agregátor (stavové entity)

Šestý vzor se týká agregace dat událostí v určitém časovém období do jedné adresovatelné entity. V tomto modelu můžou agregovaná data pocházet z více zdrojů, mohou být doručena v dávkách nebo mohou být rozptýlená po dlouhou dobu. Agregátor může potřebovat provést akci s daty událostí při jejich doručení a externí klienti se můžou muset dotazovat na agregovaná data.

Aggregator diagram

Složitá věc, když se snažíte implementovat tento vzor s normálními bezstavovými funkcemi, je, že řízení souběžnosti se stává obrovskou výzvou. Nemusíte si dělat starosti jenom s několika vlákny, které upravují stejná data současně, ale potřebujete se také starat o to, aby agregátor běžel jenom na jednom virtuálním počítači najednou.

Durable entity můžete použít k snadné implementaci tohoto vzoru jako jedné funkce.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Odolné entity lze také modelovat jako třídy v .NET. Tento model může být užitečný, pokud je seznam operací pevný a bude velký. Následující příklad je ekvivalentní implementací Counter entity pomocí tříd a metod .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Poznámka:

Odolné entity se v PowerShellu v současné době nepodporují.

Poznámka:

Odolné entity se v javě v současné době nepodporují.

Klienti mohou zavázat operace pro funkci entity (označovanou také jako "signaling") pomocí vazby klienta entity.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Poznámka:

Dynamicky generované proxy servery jsou také k dispozici v .NET pro signalizaci entit způsobem bezpečným způsobem. Kromě signalizace mohou klienti také dotazovat stav funkce entity pomocí metod bezpečných typů na vazbě klienta orchestrace.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Poznámka:

Odolné entity se v PowerShellu v současné době nepodporují.

Poznámka:

Odolné entity se v javě v současné době nepodporují.

Funkce entit jsou k dispozici v Durable Functions 2.0 a vyšší pro C#, JavaScript a Python.

Technologie

Rozšíření Durable Functions je na pozadí postavené na platformě Durable Task Framework, opensourcové knihovně na GitHubu, která se používá k vytváření pracovních postupů v kódu. Stejně jako Azure Functions je bezserverový vývoj webových úloh Azure, Durable Functions je bezserverový vývoj architektury Durable Task Framework. Microsoft a další organizace používají architekturu Durable Task Framework k rozsáhlé automatizaci důležitých procesů. Je přirozeným prostředím bezserverové služby Azure Functions.

Omezení kódu

Aby bylo možné poskytovat spolehlivé a dlouhotrvající záruky provádění, mají funkce orchestrátoru sadu pravidel kódování, která je potřeba dodržovat. Další informace najdete v článku Omezení kódu funkce nástroje Orchestrator.

Fakturace

Durable Functions se účtují stejně jako Azure Functions. Další informace najdete v tématu Ceny služby Azure Functions. Při spouštění funkcí orchestrátoru v plánu Consumption služby Azure Functions existuje několik chování fakturace, o nichž je potřeba vědět. Další informace o těchto chováních najdete v článku fakturace Durable Functions.

Přeskočte doprava

S Durable Functions můžete začít pracovat za méně než 10 minut dokončením některého z těchto kurzů rychlých startů pro konkrétní jazyk:

V těchto rychlých startech místně vytvoříte a otestujete odolnou funkci "hello world". Kód funkce potom publikujete do Azure. Funkce, kterou vytvoříte, orchestruje a zřetědí volání dalších funkcí.

Publikace

Durable Functions se vyvíjí ve spolupráci s Microsoft Research. V důsledku toho tým Durable Functions aktivně vytváří výzkumné studie a artefakty; mezi ně patří:

Další informace

Následující video ukazuje výhody Durable Functions:

Vzhledem k tomu, že Durable Functions je rozšířené rozšíření pro Azure Functions, není vhodné pro všechny aplikace. Porovnání s jinými technologiemi orchestrace Azure najdete v tématu Porovnání Azure Functions a Azure Logic Apps.

Další kroky