Fan-out/fan-in scenario i Durable Functions – Exempel på molnsäkerhetskopiering

Utloggning/utloggning avser mönstret för att köra flera funktioner samtidigt och sedan utföra viss aggregering på resultaten. I den här artikeln förklaras ett exempel som använder Durable Functions för att implementera ett scenario med in-/ut fläkt. Exemplet är en beständig funktion som säkerhetskopierar allt eller en del av en apps webbplatsinnehåll i Azure Storage.

Anteckning

Version 4 av Node.js programmeringsmodell för Azure Functions är allmänt tillgänglig. Den nya v4-modellen är utformad för att ha en mer flexibel och intuitiv upplevelse för JavaScript- och TypeScript-utvecklare. Läs mer om skillnaderna mellan v3 och v4 i migreringsguiden.

I följande kodfragment anger JavaScript (PM4) programmeringsmodellen V4, den nya upplevelsen.

Förutsättningar

Översikt över scenario

I det här exemplet laddar funktionerna upp alla filer under en angiven katalog rekursivt till Blob Storage. De räknar också det totala antalet byte som har laddats upp.

Det går att skriva en enda funktion som tar hand om allt. Det största problemet du stöter på är skalbarhet. En enda funktionskörning kan bara köras på en enda virtuell dator, så dataflödet begränsas av dataflödet för den enskilda virtuella datorn. Ett annat problem är tillförlitlighet. Om det uppstår ett fel halvvägs, eller om hela processen tar mer än 5 minuter, kan säkerhetskopieringen misslyckas i ett delvis slutfört tillstånd. Den skulle då behöva startas om.

En mer robust metod är att skriva två vanliga funktioner: en skulle räkna upp filerna och lägga till filnamnen i en kö, och en annan skulle läsa från kön och ladda upp filerna till Blob Storage. Den här metoden är bättre när det gäller dataflöde och tillförlitlighet, men du måste etablera och hantera en kö. Ännu viktigare är att betydande komplexitet införs när det gäller tillståndshantering och samordning om du vill göra något mer, som att rapportera det totala antalet uppladdade byte.

En Durable Functions metod ger dig alla de nämnda fördelarna med mycket låga omkostnader.

Funktionerna

Den här artikeln förklarar följande funktioner i exempelappen:

  • E2_BackupSiteContent: En orkestreringsfunktion som anropar E2_GetFileList för att hämta en lista över filer som ska säkerhetskopieras och sedan anropar E2_CopyFileToBlob för att säkerhetskopiera varje fil.
  • E2_GetFileList: En aktivitetsfunktion som returnerar en lista med filer i en katalog.
  • E2_CopyFileToBlob: En aktivitetsfunktion som säkerhetskopierar en enda fil för att Azure Blob Storage.

E2_BackupSiteContent orchestrator-funktion

Den här orkestreringsfunktionen gör i princip följande:

  1. Tar ett rootDirectory värde som en indataparameter.
  2. Anropar en funktion för att hämta en rekursiv lista över filer under rootDirectory.
  3. Gör flera parallella funktionsanrop för att ladda upp varje fil till Azure Blob Storage.
  4. Väntar på att alla uppladdningar ska slutföras.
  5. Returnerar summan av totalt antal byte som laddats upp till Azure Blob Storage.

Här är koden som implementerar orchestrator-funktionen:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Lägg märke till await Task.WhenAll(tasks); raden. Alla enskilda anrop till E2_CopyFileToBlob funktionen väntades inte , vilket gör att de kan köras parallellt. När vi skickar den här matrisen med aktiviteter till Task.WhenAllfår vi tillbaka en uppgift som inte slutförs förrän alla kopieringsåtgärder har slutförts. Om du är bekant med TPL (Task Parallel Library) i .NET är detta inte nytt för dig. Skillnaden är att dessa uppgifter kan köras på flera virtuella datorer samtidigt, och tillägget Durable Functions säkerställer att körningen från slutpunkt till slutpunkt är motståndskraftig mot processåtervinning.

Efter att ha väntat från Task.WhenAllvet vi att alla funktionsanrop har slutförts och har returnerat värden tillbaka till oss. Varje anrop till E2_CopyFileToBlob returnerar antalet uppladdade byte, så att beräkna det totala antalet byte handlar om att lägga till alla dessa returvärden tillsammans.

Aktivitetsfunktioner för hjälpkomponenter

Hjälpaktivitetsfunktionerna är precis som med andra exempel bara vanliga funktioner som använder utlösarbindningen activityTrigger .

E2_GetFileList aktivitetsfunktion

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Anteckning

Du kanske undrar varför du inte bara kunde placera den här koden direkt i orchestrator-funktionen. Du kan, men detta skulle bryta en av de grundläggande reglerna för orchestrator-funktioner, vilket är att de aldrig bör göra I/O, inklusive åtkomst till lokala filsystem. Mer information finns i Begränsningar för Orchestrator-funktionskod.

E2_CopyFileToBlob aktivitetsfunktion

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Anteckning

Du måste installera Microsoft.Azure.WebJobs.Extensions.Storage NuGet-paketet för att köra exempelkoden.

Funktionen använder vissa avancerade funktioner i Azure Functions bindningar (dvs. användningen av parameternBinder), men du behöver inte bekymra dig om informationen i den här genomgången.

Implementeringen läser in filen från disken och strömmar innehållet asynkront till en blob med samma namn i containern "säkerhetskopior". Returvärdet är det antal byte som kopieras till lagringen, som sedan används av orchestrator-funktionen för att beräkna aggregeringssumman.

Anteckning

Det här är ett perfekt exempel på hur du flyttar I/O-åtgärder till en activityTrigger funktion. Arbetet kan inte bara distribueras på många olika datorer, utan du får också fördelarna med att kontrollera förloppet. Om värdprocessen avslutas av någon anledning vet du vilka uppladdningar som redan har slutförts.

Kör exemplet

Du kan starta orkestreringen i Windows genom att skicka följande HTTP POST-begäran.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

I en Linux-funktionsapp (Python körs för närvarande bara på Linux för App Service) kan du också starta orkestreringen så här:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Anteckning

Funktionen HttpStart som du anropar fungerar bara med JSON-formaterat innehåll. Därför Content-Type: application/json krävs rubriken och katalogsökvägen kodas som en JSON-sträng. Dessutom förutsätter HTTP-kodfragment att det finns en post i host.json filen som tar bort standardprefixet api/ från alla URL:er för HTTP-utlösarfunktioner. Du hittar koden för den här konfigurationen host.json i filen i exemplen.

Den här HTTP-begäran utlöser orkestreraren E2_BackupSiteContent och skickar strängen D:\home\LogFiles som en parameter. Svaret innehåller en länk för att hämta status för säkerhetskopieringsåtgärden:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Den här åtgärden kan ta flera minuter att slutföra, beroende på hur många loggfiler du har i funktionsappen. Du kan få den senaste statusen genom att fråga URL:en i Location rubriken för föregående HTTP 202-svar.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

I det här fallet körs funktionen fortfarande. Du kan se de indata som sparades i orkestreringstillståndet och den senaste uppdaterade tiden. Du kan fortsätta att använda Location rubrikvärdena för att söka efter slutförande. När statusen är "Slutförd" visas ett HTTP-svarsvärde som liknar följande:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

Nu kan du se att orkestreringen är klar och ungefär hur lång tid det tog att slutföra. Du ser också ett värde för output fältet, vilket indikerar att cirka 450 KB loggar laddades upp.

Nästa steg

Det här exemplet har visat hur du implementerar fan-out/fan-in-mönstret. Nästa exempel visar hur du implementerar övervakningsmönstret med hjälp av hållbara timers.