Dayanıklı İşlevler nedir?

Dayanıklı İşlevler, sunucusuz bir işlem ortamında durum bilgisi olan işlevler yazmanızı sağlayan bir Azure İşlevleri özelliğidir. Uzantı, Azure İşlevleri programlama modelini kullanarak varlık işlevleri yazarak orchestrator işlevleri ve durum bilgisi olan varlıklar yazarak durum bilgisi olan iş akışlarını tanımlamanızı sağlar. Arka planda uzantı sizin için durumu, denetim noktalarını ve yeniden başlatmaları yöneterek iş mantığınıza odaklanmanızı sağlar.

Desteklenen diller

Dayanıklı İşlevler tüm Azure İşlevleri programlama dilleri ile çalışacak şekilde tasarlanmıştır, ancak her dil için farklı minimum gereksinimleri olabilir. Aşağıdaki tabloda desteklenen en düşük uygulama yapılandırmaları gösterilmektedir:

Dil yığını çalışma zamanı sürümlerini Azure İşlevleri Dil çalışanı sürümü En düşük paket sürümü
.NET / C# / F# İşlevler 1.0+ İşlemde
İşlem dışı
yok
JavaScript/TypeScript (V3 prog. model) İşlevler 2.0+ Düğüm 8+ 2.x paketleri
JavaScript/TypeScript (V4 prog. model) İşlevler 4.25+ Düğüm 18+ 3,15'den fazla paket
Python İşlevler 2.0+ Python 3.7+ 2.x paketleri
Python (V2 prog. modeli) İşlevler 4.0+ Python 3.7+ 3,15'den fazla paket
PowerShell İşlevler 3.0+ PowerShell 7+ 2.x paketleri
Java İşlevler 4.0+ Java 8+ 4.x paket

Önemli

Bu makalede, Node.js programlama modelinin birden çok sürümünü desteklemek için sekmeler kullanılır. Genel kullanıma sunulan v4 modeli, JavaScript ve TypeScript geliştiricileri için daha esnek ve sezgisel bir deneyime sahip olacak şekilde tasarlanmıştır. v4 modelinin nasıl çalıştığı hakkında daha fazla bilgi için Azure İşlevleri Node.js geliştirici kılavuzuna bakın. v3 ile v4 arasındaki farklar hakkında daha fazla bilgi edinmek için geçiş kılavuzuna bakın.

Önemli

Bu makalede, Python programlama modelinin birden çok sürümünü desteklemek için sekmeler kullanılır. v2 modeli genel olarak kullanılabilir ve dekoratörler aracılığıyla işlev yazmak için daha kod odaklı bir yol sağlamak üzere tasarlanmıştır. v2 modelinin nasıl çalıştığı hakkında daha fazla bilgi için Azure İşlevleri Python geliştirici kılavuzuna bakın.

Azure İşlevleri gibi Visual Studio, Visual Studio Code ve Azure portalını kullanarak Dayanıklı İşlevler geliştirmenize yardımcı olacak şablonlar vardır.

Uygulama desenleri

Dayanıklı İşlevler için birincil kullanım örneği sunucusuz uygulamalarda karmaşık, durum bilgisi olan koordinasyon gereksinimlerini basitleştirmektir. Aşağıdaki bölümlerde, Dayanıklı İşlevler yararlanabilecek tipik uygulama desenleri açıklanmaktadır:

Desen #1: İşlev zincirleme

İşlev zincirleme düzeninde, bir işlev dizisi belirli bir sırada yürütülür. Bu düzende, bir işlevin çıkışı başka bir işlevin girişine uygulanır. Her işlev arasında kuyrukların kullanılması, bir işlevden diğerine bir denetim akışı olsa bile sistemin dayanıklı ve ölçeklenebilir kalmasını sağlar.

İşlev zincirleme deseninin diyagramı

aşağıdaki örnekte gösterildiği gibi işlev zincirleme desenini kısa bir şekilde uygulamak için Dayanıklı İşlevler kullanabilirsiniz.

Bu örnekte , , F2F3ve F4 değerleri F1aynı işlev uygulamasındaki diğer işlevlerin adlarıdır. Normal kesinlik temelli kodlama yapılarını kullanarak denetim akışı uygulayabilirsiniz. Kod yukarıdan aşağıya doğru yürütülür. Kod, koşullular ve döngüler gibi mevcut dil denetimi akışı semantiğini içerebilir. Bloklara try//catchfinally hata işleme mantığı ekleyebilirsiniz.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

Diğer işlevleri ada göre çağırmak, parametreleri geçirmek ve işlev çıkışı döndürmek için parametresini kullanabilirsiniz context . Kod her çağırışındaawait, Dayanıklı İşlevler çerçevesi geçerli işlev örneğinin ilerleme durumunu denetler. İşlem veya sanal makine yürütme sırasında geri dönüşüme geçerse, işlev örneği önceki await çağrıdan devam eder. Daha fazla bilgi için desen #2: Fan out/fan in başlıklı sonraki bölüme bakın.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

Diğer işlevleri ada göre çağırmak, parametreleri geçirmek ve işlev çıkışı döndürmek için nesnesini kullanabilirsiniz context.df . Kod her çağırışındayield, Dayanıklı İşlevler çerçevesi geçerli işlev örneğinin ilerleme durumunu denetler. İşlem veya sanal makine yürütme sırasında geri dönüşüme geçerse, işlev örneği önceki yield çağrıdan devam eder. Daha fazla bilgi için desen #2: Fan out/fan in başlıklı sonraki bölüme bakın.

Not

context JavaScript'teki nesnesi tüm işlev bağlamını temsil eder. Ana bağlamdaki özelliğini kullanarak df Dayanıklı İşlevler bağlama erişin.

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result


main = df.Orchestrator.create(orchestrator_function)

Diğer işlevleri ada göre çağırmak, parametreleri geçirmek ve işlev çıkışı döndürmek için nesnesini kullanabilirsiniz context . Kod her çağırışındayield, Dayanıklı İşlevler çerçevesi geçerli işlev örneğinin ilerleme durumunu denetler. İşlem veya sanal makine yürütme sırasında geri dönüşüme geçerse, işlev örneği önceki yield çağrıdan devam eder. Daha fazla bilgi için desen #2: Fan out/fan in başlıklı sonraki bölüme bakın.

Not

context Python'daki nesnesi düzenleme bağlamını temsil eder. Düzenleme bağlamında özelliğini kullanarak function_context ana Azure İşlevleri bağlama erişin.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Komutunu kullanarak Invoke-DurableActivity diğer işlevleri ada göre çağırabilir, parametreleri geçirebilir ve işlev çıktısı döndürebilirsiniz. Kod anahtar olmadan NoWait her çağırışındaInvoke-DurableActivity, Dayanıklı İşlevler çerçevesi geçerli işlev örneğinin ilerleme durumunu denetler. İşlem veya sanal makine yürütme sırasında geri dönüşüme geçerse, işlev örneği önceki Invoke-DurableActivity çağrıdan devam eder. Daha fazla bilgi için desen #2: Fan out/fan in başlıklı sonraki bölüme bakın.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

Diğer işlevleri ada göre çağırmak, parametreleri geçirmek ve işlev çıkışı döndürmek için nesnesini kullanabilirsiniz ctx . Bu yöntemlerin çıkışı, çağrılan işlev tarafından döndürülen veri türü olan bir Task<V> nesnedir V . her çağırdığınızdaTask<V>.await(), Dayanıklı İşlevler çerçevesi geçerli işlev örneğinin ilerleme durumunu denetler. İşlem yürütme sırasında beklenmedik şekilde geri dönüşüme neden olursa, işlev örneği önceki Task<V>.await() çağrıdan devam eder. Daha fazla bilgi için desen #2: Fan out/fan in başlıklı sonraki bölüme bakın.

Desen #2: Fan çıkışı/fan

Fan çıkışı/fan deseninde birden çok işlevi paralel olarak yürütür ve ardından tüm işlevlerin tamamlanmasını beklersiniz. Genellikle, işlevlerden döndürülen sonuçlar üzerinde bazı toplama işleri yapılır.

Fan çıkışı/fan deseninin diyagramı

Normal işlevlerle, işlevin kuyruğa birden çok ileti göndermesini sağlayarak bu özelliği kullanabilirsiniz. Geri dönmek çok daha zor. Normal bir işlevde, kuyrukla tetiklenen işlevlerin ne zaman sona ereceğini izlemek için kod yazar ve ardından işlev çıkışlarını depolarsınız.

Dayanıklı İşlevler uzantısı bu düzeni nispeten basit bir kodla işler:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

Fan-out çalışması işlevin F2 birden çok örneğine dağıtılır. Çalışma, dinamik bir görev listesi kullanılarak izlenir. Task.WhenAll çağrılan tüm işlevlerin bitmesini beklemek için çağrılır. Ardından işlev F2 çıkışları dinamik görev listesinden toplanır ve işleve F3 geçirilir.

çağrısında awaitTask.WhenAll gerçekleşen otomatik denetim noktası oluşturma, olası bir orta yol kilitlenmesinin veya yeniden başlatmanın zaten tamamlanmış bir görevin yeniden başlatılmasını gerektirmemesini sağlar.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

Fan-out çalışması işlevin F2 birden çok örneğine dağıtılır. Çalışma, dinamik bir görev listesi kullanılarak izlenir. context.df.Task.all Çağrılan tüm işlevlerin tamamlanmasını beklemek için API çağrılır. Ardından işlev F2 çıkışları dinamik görev listesinden toplanır ve işleve F3 geçirilir.

çağrısında yieldcontext.df.Task.all gerçekleşen otomatik denetim noktası oluşturma, olası bir orta yol kilitlenmesinin veya yeniden başlatmanın zaten tamamlanmış bir görevin yeniden başlatılmasını gerektirmemesini sağlar.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

Fan-out çalışması işlevin F2 birden çok örneğine dağıtılır. Çalışma, dinamik bir görev listesi kullanılarak izlenir. context.task_all Çağrılan tüm işlevlerin tamamlanmasını beklemek için API çağrılır. Ardından işlev F2 çıkışları dinamik görev listesinden toplanır ve işleve F3 geçirilir.

çağrısında yieldcontext.task_all gerçekleşen otomatik denetim noktası oluşturma, olası bir orta yol kilitlenmesinin veya yeniden başlatmanın zaten tamamlanmış bir görevin yeniden başlatılmasını gerektirmemesini sağlar.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

Fan-out çalışması işlevin F2 birden çok örneğine dağıtılır. İşlev çağrısında NoWaitF2 anahtarın kullanımına dikkat edin: Bu anahtar, düzenleyicinin etkinlik tamamlanmasını beklemeden çağırmaya F2 devam etmesine olanak tanır. Çalışma, dinamik bir görev listesi kullanılarak izlenir. Komut Wait-ActivityFunction , çağrılan tüm işlevlerin tamamlanmasını beklemek için çağrılır. Ardından işlev F2 çıkışları dinamik görev listesinden toplanır ve işleve F3 geçirilir.

Çağrıda Wait-ActivityFunction gerçekleşen otomatik denetim noktası oluşturma, olası bir orta yol kilitlenmesinin veya yeniden başlatmanın zaten tamamlanmış bir görevin yeniden başlatılmasını gerektirmemesini sağlar.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

Fan-out çalışması işlevin F2 birden çok örneğine dağıtılır. Çalışma, dinamik bir görev listesi kullanılarak izlenir. ctx.allOf(parallelTasks).await() çağrılan tüm işlevlerin bitmesini beklemek için çağrılır. Ardından işlev F2 çıkışları dinamik görev listesinden toplanır ve orchestrator işlevinin çıkışı olarak döndürülür.

Çağrısında .await()ctx.allOf(parallelTasks) gerçekleşen otomatik denetim noktası, beklenmeyen bir işlemin geri dönüştürülmesinin zaten tamamlanmış görevlerin yeniden başlatılmasını gerektirmemesini sağlar.

Not

Nadir durumlarda, bir etkinlik işlevi tamamlandıktan sonra ancak tamamlanması düzenleme geçmişine kaydedilmeden önce pencerede bir kilitlenme oluşabilir. Böyle bir durumda, işlem kurtarıldıktan sonra etkinlik işlevi baştan yeniden çalıştırılır.

Desen #3: Zaman Uyumsuz HTTP API'leri

Zaman uyumsuz HTTP API düzeni, uzun süre çalışan işlemlerin durumunu dış istemcilerle koordine etme sorununu giderir. Bu düzeni uygulamanın yaygın bir yolu, http uç noktasının uzun süre çalışan eylemi tetikletirmektir. Ardından, işlemi tamamladığınızda öğrenmek için istemciyi istemcinin yoklama yaptığı bir durum uç noktasına yönlendirin.

HTTP API deseninin diyagramı

Dayanıklı İşlevler, uzun süre çalışan işlev yürütmeleriyle etkileşime geçmek için yazmanız gereken kodu basitleştirerek ve hatta kaldırarak bu düzen için yerleşik destek sağlar. Örneğin, Dayanıklı İşlevler hızlı başlangıç örnekleri (C#, JavaScript, TypeScript, Python, PowerShell ve Java), yeni orchestrator işlev örneklerini başlatmak için kullanabileceğiniz basit bir REST komutu gösterir. Bir örnek başlatıldıktan sonra uzantı, orchestrator işlev durumunu sorgulayan web kancası HTTP API'lerini kullanıma sunar.

Aşağıdaki örnekte, bir düzenleyici başlatan ve durumunu sorgulayan REST komutları gösterilmektedir. Netlik sağlamak için, bazı protokol ayrıntıları örnekten atlanır.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Dayanıklı İşlevler çalışma zamanı sizin için durumu yönettiğinden, kendi durum izleme mekanizmanızı uygulamanız gerekmez.

Dayanıklı İşlevler uzantısı, uzun süre çalışan düzenlemeleri yöneten yerleşik HTTP API'lerini kullanıma sunar. Alternatif olarak, kendi işlev tetikleyicilerinizi (HTTP, kuyruk veya Azure Event Hubs gibi) ve dayanıklı istemci bağlamasını kullanarak bu düzeni kendiniz uygulayabilirsiniz. Örneğin, sonlandırmayı tetikleme amacıyla bir kuyruk iletisi kullanabilirsiniz. Alternatif olarak, kimlik doğrulaması için oluşturulan anahtarı kullanan yerleşik HTTP API'leri yerine Microsoft Entra kimlik doğrulama ilkesi tarafından korunan bir HTTP tetikleyicisi de kullanabilirsiniz.

Daha fazla bilgi için, Dayanıklı İşlevler uzantısını kullanarak HTTP üzerinden zaman uyumsuz, uzun süre çalışan işlemleri nasıl kullanıma açabileceğinizi açıklayan HTTP özellikleri makalesine bakın.

Desen #4: İzleyici

İzleyici deseni, bir iş akışındaki esnek, yinelenen bir işleme başvurur. Belirli koşullar karşılanıncaya kadar yoklama işlemi örnek olarak verilmiştir. Düzenli bir temizleme işi gibi temel senaryoları ele almak için normal bir zamanlayıcı tetikleyicisi kullanabilirsiniz, ancak aralığı statiktir ve örnek yaşam sürelerini yönetmek karmaşık hale gelir. Esnek yineleme aralıkları oluşturmak, görev yaşam sürelerini yönetmek ve tek bir düzenlemeden birden çok monitör işlemi oluşturmak için Dayanıklı İşlevler kullanabilirsiniz.

İzleyici deseninin bir örneği, önceki zaman uyumsuz HTTP API senaryosunu tersine çevirmektir. Uzun süre çalışan bir işlemi izlemek üzere bir dış istemcinin uç noktasını ortaya çıkarmak yerine, uzun süre çalışan izleyici bir dış uç nokta tüketir ve ardından durum değişikliğini bekler.

monitör deseninin diyagramı

Birkaç kod satırı içinde, rastgele uç noktaları gözlemleyen birden çok izleyici oluşturmak için Dayanıklı İşlevler kullanabilirsiniz. Bir koşul karşılandığında izleyiciler yürütmeyi sonlandırabilir veya başka bir işlev izleyicileri sonlandırmak için dayanıklı düzenleme istemcisini kullanabilir. Belirli bir koşula göre (örneğin, üstel geri alma) bir izleyicinin wait aralığını değiştirebilirsiniz.

Aşağıdaki kod temel bir izleyici uygular:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

bir istek alındığında, bu iş kimliği için yeni bir düzenleme örneği oluşturulur. Örnek, bir koşul karşılanıncaya veya zaman aşımı süresi dolana kadar durumu yoklar. Dayanıklı zamanlayıcı yoklama aralığını denetler. Daha sonra daha fazla iş gerçekleştirilebilir veya düzenleme sona erebilir.

Desen #5: İnsan etkileşimi

Birçok otomatik işlem bir tür insan etkileşimi içerir. İnsanlar bulut hizmetleri kadar yüksek oranda kullanılabilir ve hızlı yanıt vermediğinden insanları otomatik bir sürece dahil etmek zordur. Otomatik bir işlem, zaman aşımları ve telafi mantığı kullanarak bu etkileşime izin verebilir.

Onay süreci, insan etkileşimi içeren bir iş süreci örneğidir. Belirli bir dolar tutarını aşan bir gider raporu için yöneticiden onay gerekebilir. Yönetici 72 saat içinde gider raporunu onaylamazsa (yönetici tatile çıkmış olabilir), başka birinden (belki de yöneticinin yöneticisinden) onay almak için bir yükseltme işlemi başlatılır.

İnsan etkileşimi deseninin diyagramı

Bu örnekteki deseni bir orchestrator işlevi kullanarak uygulayabilirsiniz. Düzenleyici, onay istemek için dayanıklı bir zamanlayıcı kullanır. Zaman aşımı oluşursa düzenleyici yükseltir. Düzenleyici, bir insan etkileşimi tarafından oluşturulan bildirim gibi bir dış olayı bekler.

Bu örnekler, insan etkileşimi düzenini göstermek için bir onay süreci oluşturur:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Dayanıklı zamanlayıcıyı oluşturmak için öğesini çağırın context.CreateTimer. Bildirim tarafından context.WaitForExternalEventalınır. Ardından, Task.WhenAny yükseltme (önce zaman aşımı gerçekleşir) veya onayı işleme (onay zaman aşımından önce alınır) karar vermek için çağrılır.

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Dayanıklı zamanlayıcıyı oluşturmak için öğesini çağırın context.df.createTimer. Bildirim tarafından context.df.waitForExternalEventalınır. Ardından, context.df.Task.any yükseltme (önce zaman aşımı gerçekleşir) veya onayı işleme (onay zaman aşımından önce alınır) karar vermek için çağrılır.

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Dayanıklı zamanlayıcıyı oluşturmak için öğesini çağırın context.create_timer. Bildirim tarafından context.wait_for_external_eventalınır. Ardından, context.task_any yükseltme (önce zaman aşımı gerçekleşir) veya onayı işleme (onay zaman aşımından önce alınır) karar vermek için çağrılır.

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Dayanıklı zamanlayıcıyı oluşturmak için öğesini çağırın Start-DurableTimer. Bildirim tarafından Start-DurableExternalEventListeneralınır. Ardından, Wait-DurableTask yükseltme (önce zaman aşımı gerçekleşir) veya onayı işleme (onay zaman aşımından önce alınır) karar vermek için çağrılır.

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

yöntem ctx.waitForExternalEvent(...).await() çağrısı, yükü olan boolean adlı ApprovalEventbir olayı alana kadar düzenlemeyi duraklatır. Olay alınırsa, onay sonucunu işlemek için bir etkinlik işlevi çağrılır. Ancak, (72 saat) süresi dolmadan böyle timeout bir olay alınmazsa, bir TaskCanceledException oluşturulur ve Escalate etkinlik işlevi çağrılır.

Not

Tüketim planında çalışırken dış olayları beklerken harcanan süre için ücret alınmaz.

Dış istemci, yerleşik HTTP API'lerini kullanarak olay bildirimini bekleyen bir düzenleyici işlevine teslim edebilir:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Bir olay, aynı işlev uygulamasındaki başka bir işlevden dayanıklı düzenleme istemcisi kullanılarak da oluşturulabilir:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Desen #6: Toplayıcı (durum bilgisi olan varlıklar)

Altıncı düzen, olay verilerini belirli bir süre boyunca tek ve adreslenebilir bir varlığa toplamaya yöneliktir. Bu düzende, toplanan veriler birden çok kaynaktan gelebilir, toplu olarak teslim edilebilir veya uzun sürelere dağılmış olabilir. Toplayıcının olay verileri geldikçe eylem gerçekleştirmesi ve dış istemcilerin toplanan verileri sorgulaması gerekebilir.

Toplayıcı diyagramı

Bu düzeni normal ve durum bilgisi olmayan işlevlerle uygulamaya çalışmayla ilgili karmaşık olan şey, eşzamanlılık denetiminin büyük bir zorluk haline gelmesidir. Aynı anda birden çok iş parçacığının aynı verileri değiştirmesi konusunda endişelenmeniz değil, toplayıcının aynı anda yalnızca tek bir VM üzerinde çalıştığından emin olmanız da gerekir.

Bu düzeni tek bir işlev olarak kolayca uygulamak için Dayanıklı varlıkları kullanabilirsiniz.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Dayanıklı varlıklar .NET'te sınıf olarak da modellenebilir. bu model, işlem listesi sabitse ve büyük olursa yararlı olabilir. Aşağıdaki örnek, .NET sınıflarını ve yöntemlerini kullanan varlığın Counter eşdeğer bir uygulamasıdır.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Not

Dayanıklı varlıklar şu anda PowerShell'de desteklenmemektedir.

Not

Dayanıklı varlıklar şu anda Java'da desteklenmemektedir.

İstemciler, varlık istemci bağlamasını kullanarak bir varlık işlevi için ("sinyal" olarak da bilinir) işlemleri sıralayabilir.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Not

Dinamik olarak oluşturulan proxy'ler, varlıkların türü güvenli bir şekilde sinyalle işaretlenmesi için .NET'te de kullanılabilir. Ayrıca istemciler, sinyal göndermeye ek olarak, düzenleme istemci bağlamasında tür açısından güvenli yöntemleri kullanarak varlık işlevinin durumunu da sorgulayabilir.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Not

Dayanıklı varlıklar şu anda PowerShell'de desteklenmemektedir.

Not

Dayanıklı varlıklar şu anda Java'da desteklenmemektedir.

Varlık işlevleri C#, JavaScript ve Python için Dayanıklı İşlevler 2.0 ve üzeri sürümleriyle kullanılabilir.

Teknoloji

Arka planda Dayanıklı İşlevler uzantısı, GitHub'da kodda iş akışları oluşturmak için kullanılan bir açık kaynak kitaplığı olan Dayanıklı Görev Çerçevesi'nin üzerine kurulmuştur. Azure İşlevleri, Azure Webjobs'un sunucusuz evrimi gibi, Dayanıklı İşlevler dayanıklı görev çerçevesinin sunucusuz evrimidir. Microsoft ve diğer kuruluşlar, görev açısından kritik süreçleri otomatikleştirmek için Dayanıklı Görev Çerçevesi'ni kapsamlı bir şekilde kullanır. Sunucusuz Azure İşlevleri ortamı için doğal bir uyum sağlar.

Kod kısıtlamaları

Güvenilir ve uzun süre çalışan yürütme garantileri sağlamak için orchestrator işlevlerinin izlenmesi gereken bir dizi kodlama kuralı vardır. Daha fazla bilgi için Orchestrator işlev kodu kısıtlamaları makalesine bakın.

Faturalandırma

Dayanıklı İşlevler Azure İşlevleri ile aynı şekilde faturalandırılır. Daha fazla bilgi için bkz. fiyatlandırma Azure İşlevleri. Azure İşlevleri Tüketim planında orchestrator işlevlerini yürütürken dikkat etmeniz gereken bazı faturalama davranışları vardır. Bu davranışlar hakkında daha fazla bilgi için Dayanıklı İşlevler faturalama makalesine bakın.

Doğrudan içeri atla

Bu dile özgü hızlı başlangıç öğreticilerinden birini tamamlayarak 10 dakikadan kısa bir zaman içinde Dayanıklı İşlevler kullanmaya başlayabilirsiniz:

Bu hızlı başlangıçlarda, "merhaba dünya" dayanıklı işlevini yerel olarak oluşturup test edebilirsiniz. Ardından işlev kodunu Azure’da yayımlayacaksınız. Oluşturduğunuz işlev, diğer işlevlere çağrıları düzenler ve zincirler.

Yayın

Dayanıklı İşlevler, Microsoft Research ile işbirliği içinde geliştirilmiştir. Sonuç olarak, Dayanıklı İşlevler ekibi aktif olarak araştırma çalışmaları ve yapıtlar üretir; bunlar şunlardır:

Daha fazla bilgi edinin

Aşağıdaki videoda Dayanıklı İşlevler avantajları vurgulanmaktadır:

Dayanıklı İşlevler Azure İşlevleri için gelişmiş bir uzantı olduğundan, tüm uygulamalar için uygun değildir. Diğer Azure düzenleme teknolojileriyle karşılaştırma için bkz. Azure İşlevleri ve Azure Logic Apps karşılaştırması.

Sonraki adımlar