Asynchrone Request-Reply patroon

Azure
Logic Apps

Ontkoppel de back-endverwerking van een front-endhost waar de back-endverwerking asynchroon moet zijn, maar de front-end nog steeds een duidelijke respons nodig heeft.

Context en probleem

Bij moderne toepassingsontwikkeling is het normaal voor clienttoepassingen ( vaak code die wordt uitgevoerd in een webclient (browser) om afhankelijk te zijn van externe API's om bedrijfslogica en functionaliteit op te stellen. Deze API's zijn mogelijk rechtstreeks gerelateerd aan de toepassing of kunnen gedeelde services zijn die worden geleverd door een derde partij. Deze API-aanroepen vinden meestal plaats via het HTTP(S)-protocol en volgen REST-semantiek.

In de meeste gevallen zijn API's voor een clienttoepassing ontworpen om snel te reageren, op volgorde van 100 ms of minder. Veel factoren kunnen van invloed zijn op de reactielatentie, waaronder:

  • De hostingstack van een toepassing.
  • Beveiligingsonderdelen.
  • De relatieve geografische locatie van de aanroeper en de back-end.
  • Netwerkinfrastructuur.
  • Huidige belasting.
  • De grootte van de nettolading van de aanvraag.
  • De lengte van de wachtrij verwerken.
  • De tijd voordat de back-end de aanvraag verwerkt.

Een van deze factoren kan latentie toevoegen aan het antwoord. Sommige kunnen worden verzacht door de back-end uit te schalen. Andere, zoals netwerkinfrastructuur, zijn grotendeels niet onder controle van de toepassingsontwikkelaar. De meeste API's kunnen snel genoeg reageren voor antwoorden om terug te komen via dezelfde verbinding. Toepassingscode kan een synchrone API-aanroep op een niet-blokkerende manier maken, waardoor asynchrone verwerking wordt weergegeven. Dit wordt aanbevolen voor I/O-gebonden bewerkingen.

In sommige scenario's kan het werk dat door de back-end wordt uitgevoerd, langlopend zijn, in de volgorde van seconden, of mogelijk een achtergrondproces dat in minuten of zelfs uren wordt uitgevoerd. In dat geval is het niet haalbaar om te wachten totdat het werk is voltooid voordat u op de aanvraag reageert. Deze situatie is een potentieel probleem voor elk synchrone aanvraag-antwoordpatroon.

Sommige architecturen lossen dit probleem op met behulp van een berichtenbroker om de aanvraag- en antwoordfasen te scheiden. Deze scheiding wordt vaak bereikt door gebruik te maken van het patroon Load Leveling op basis van wachtrijen. Door deze scheiding kunnen het clientproces en de back-end-API onafhankelijk worden geschaald. Maar deze scheiding brengt ook extra complexiteit met zich mee wanneer de client een melding voor succes vereist, omdat deze stap asynchroon moet worden.

Veel van dezelfde overwegingen die voor clienttoepassingen worden besproken, zijn ook van toepassing op SERVER-naar-server REST API-aanroepen in gedistribueerde systemen, bijvoorbeeld in een microservicesarchitectuur.

Oplossing

Een oplossing voor dit probleem is het gebruik van HTTP-polling. Polling is handig voor code aan de clientzijde, omdat het lastig kan zijn om aanroepeindpunten te bieden of langdurige verbindingen te gebruiken. Zelfs wanneer callbacks mogelijk zijn, kunnen de extra bibliotheken en services die nodig zijn, soms te veel extra complexiteit toevoegen.

  • De clienttoepassing maakt een synchrone aanroep naar de API, waardoor een langlopende bewerking op de back-end wordt geactiveerd.

  • De API reageert synchroon zo snel mogelijk. Er wordt een HTTP 202-statuscode (geaccepteerd) geretourneerd, waarbij wordt erkend dat de aanvraag is ontvangen voor verwerking.

    Notitie

    De API moet zowel de aanvraag als de actie valideren die moet worden uitgevoerd voordat het langdurige proces wordt gestart. Als de aanvraag ongeldig is, reageert u onmiddellijk met een foutcode zoals HTTP 400 (Ongeldige aanvraag).

  • Het antwoord bevat een locatieverwijzing die verwijst naar een eindpunt dat de client kan peilen om het resultaat van de langdurige bewerking te controleren.

  • De API offload de verwerking naar een ander onderdeel, zoals een berichtenwachtrij.

  • Voor elke geslaagde aanroep naar het statuseindpunt wordt HTTP 202 geretourneerd. Terwijl het werk nog in behandeling is, retourneert het statuseindpunt een resource die aangeeft dat het werk nog wordt uitgevoerd. Zodra het werk is voltooid, kan het statuseindpunt een resource retourneren die aangeeft dat deze is voltooid of omleiden naar een andere resource-URL. Als met de asynchrone bewerking bijvoorbeeld een nieuwe resource wordt gemaakt, wordt het statuseindpunt omgeleid naar de URL voor die resource.

In het volgende diagram ziet u een typische stroom:

Request and response flow for asynchronous HTTP requests

  1. De client verzendt een aanvraag en ontvangt een HTTP 202-antwoord (geaccepteerd).
  2. De client verzendt een HTTP GET-aanvraag naar het statuseindpunt. Het werk is nog in behandeling, dus deze aanroep retourneert HTTP 200.
  3. Op een bepaald moment is het werk voltooid en retourneert het statuseindpunt 302 (Gevonden) die wordt omgeleid naar de resource.
  4. De client haalt de resource op bij de opgegeven URL.

Problemen en overwegingen

  • Er zijn een aantal mogelijke manieren om dit patroon te implementeren via HTTP en niet alle upstream-services hebben dezelfde semantiek. De meeste services retourneren bijvoorbeeld geen HTTP 202-antwoord van een GET-methode wanneer een extern proces niet is voltooid. Na pure REST-semantiek moeten ze HTTP 404 (Niet gevonden) retourneren. Dit antwoord is logisch wanneer u het resultaat van de aanroep beschouwt als deze nog niet aanwezig is.

  • Een HTTP 202-antwoord moet de locatie en frequentie aangeven die de client voor het antwoord moet peilen. Deze moet de volgende extra headers hebben:

    Header Beschrijving Notities
    Locatie Een URL die de client moet peilen naar een antwoordstatus. Deze URL kan een SAS-token zijn waarbij het valetsleutelpatroon geschikt is als deze locatie toegangsbeheer nodig heeft. Het valetsleutelpatroon is ook geldig wanneer de polling van antwoorden offloading naar een andere back-end nodig heeft
    Retry-After Een schatting van wanneer de verwerking is voltooid Deze header is ontworpen om te voorkomen dat polling-clients de back-end overweldigen met nieuwe pogingen.
  • Mogelijk moet u een verwerkingsproxy of gevel gebruiken om de antwoordheaders of nettolading te bewerken, afhankelijk van de onderliggende services die worden gebruikt.

  • Als het statuseindpunt wordt omgeleid na voltooiing, zijn HTTP 302 of HTTP 303 de juiste retourcodes, afhankelijk van de exacte semantiek die u ondersteunt.

  • Na een geslaagde verwerking moet de resource die is opgegeven door de locatieheader, een geschikte HTTP-antwoordcode retourneren, zoals 200 (OK), 201 (gemaakt) of 204 (geen inhoud).

  • Als er tijdens de verwerking een fout optreedt, moet u de fout behouden in de resource-URL die wordt beschreven in de locatieheader en in het ideale geval een geschikte antwoordcode retourneren aan de client van die resource (4xx-code).

  • Niet alle oplossingen implementeren dit patroon op dezelfde manier en sommige services bevatten extra of alternatieve headers. Azure Resource Manager gebruikt bijvoorbeeld een gewijzigde variant van dit patroon. Zie Azure Resource Manager Async-bewerkingen voor meer informatie.

  • Verouderde clients bieden mogelijk geen ondersteuning voor dit patroon. In dat geval moet u mogelijk een gevel over de asynchrone API plaatsen om de asynchrone verwerking van de oorspronkelijke client te verbergen. Azure Logic Apps ondersteunt dit patroon bijvoorbeeld systeemeigen als integratielaag tussen een asynchrone API en een client die synchrone aanroepen uitvoert. Zie Langlopende taken uitvoeren met het webhookactiepatroon.

  • In sommige scenario's wilt u mogelijk een manier bieden voor clients om een langlopende aanvraag te annuleren. In dat geval moet de back-endservice een vorm van annuleringsinstructie ondersteunen.

Wanneer dit patroon gebruiken

Gebruik dit patroon voor:

  • Code aan de clientzijde, zoals browsertoepassingen, waarbij het lastig is om eindpunten voor aanroepen te bieden, of het gebruik van langlopende verbindingen voegt te veel extra complexiteit toe.

  • Serviceaanroepen waarbij alleen het HTTP-protocol beschikbaar is en de retourservice kan geen callbacks activeren vanwege firewallbeperkingen aan de clientzijde.

  • Serviceaanroepen die moeten worden ge├»ntegreerd met verouderde architecturen die geen ondersteuning bieden voor moderne callbacktechnologie├źn, zoals WebSockets of webhooks.

Dit patroon is mogelijk niet geschikt wanneer:

  • U kunt in plaats daarvan een service gebruiken die is gebouwd voor asynchrone meldingen, zoals Azure Event Grid.
  • Antwoorden moeten in realtime naar de client worden gestreamd.
  • De client moet veel resultaten verzamelen en de latentie van deze resultaten is belangrijk. Overweeg in plaats daarvan een Service Bus-patroon.
  • U kunt permanente netwerkverbindingen aan de serverzijde gebruiken, zoals WebSockets of SignalR. Deze services kunnen worden gebruikt om de beller op de hoogte te stellen van het resultaat.
  • Met het netwerkontwerp kunt u poorten openen om asynchrone callbacks of webhooks te ontvangen.

Voorbeeld

De volgende code toont fragmenten van een toepassing die gebruikmaakt van Azure Functions om dit patroon te implementeren. Er zijn drie functies in de oplossing:

  • Het asynchrone API-eindpunt.
  • Het statuseindpunt.
  • Een back-endfunctie die werkitems in de wachtrij neemt en uitvoert.

Image of the structure of the Async Request Reply pattern in Functions

GitHub logoDit voorbeeld is beschikbaar op GitHub.

De functie AsyncProcessingWorkAcceptor

De AsyncProcessingWorkAcceptor functie implementeert een eindpunt dat werk accepteert vanuit een clienttoepassing en deze in een wachtrij plaatst voor verwerking.

  • De functie genereert een aanvraag-id en voegt deze toe als metagegevens aan het wachtrijbericht.
  • Het HTTP-antwoord bevat een locatieheader die verwijst naar een statuseindpunt. De aanvraag-id maakt deel uit van het URL-pad.
public static class AsyncProcessingWorkAcceptor
{
    [FunctionName("AsyncProcessingWorkAcceptor")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] CustomerPOCO customer,
        [ServiceBus("outqueue", Connection = "ServiceBusConnectionAppSetting")] IAsyncCollector<ServiceBusMessage> OutMessages,
        ILogger log)
    {
        if (String.IsNullOrEmpty(customer.id) || String.IsNullOrEmpty(customer.customername))
        {
            return new BadRequestResult();
        }

        string reqid = Guid.NewGuid().ToString();

        string rqs = $"http://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{reqid}";

        var messagePayload = JsonConvert.SerializeObject(customer);
        var message = new ServiceBusMessage(messagePayload);
        message.ApplicationProperties["RequestGUID"] = reqid;
        message.ApplicationProperties["RequestSubmittedAt"] = DateTime.Now;
        message.ApplicationProperties["RequestStatusURL"] = rqs;

        await OutMessages.AddAsync(message);  

        return (ActionResult) new AcceptedResult(rqs, $"Request Accepted for Processing{Environment.NewLine}ProxyStatus: {rqs}");
    }
}

De functie AsyncProcessingBackgroundWorker

De AsyncProcessingBackgroundWorker functie haalt de bewerking uit de wachtrij op, werkt wat op basis van de nettolading van het bericht en schrijft het resultaat naar de locatie van de SAS-handtekening.

public static class AsyncProcessingBackgroundWorker
{
    [FunctionName("AsyncProcessingBackgroundWorker")]
    public static async Task RunAsync(
        [ServiceBusTrigger("outqueue", Connection = "ServiceBusConnectionAppSetting")]ServiceBusMessage myQueueItem,
        [Blob("data", FileAccess.ReadWrite, Connection = "StorageConnectionAppSetting")] BlobContainerClient inputContainer,
        ILogger log)
    {
        // Perform an actual action against the blob data source for the async readers to be able to check against.
        // This is where your actual service worker processing will be performed.

        var id = myQueueItem.ApplicationProperties["RequestGUID"] as string;

        BlobClient blob = inputContainer.GetBlobClient($"{id}.blobdata");

        // Now write the results to blob storage.
        await blob.UploadAsync(myQueueItem.Body);
    }
}

De functie AsyncOperationStatusChecker

Met AsyncOperationStatusChecker de functie wordt het statuseindpunt geïmplementeerd. Met deze functie wordt eerst gecontroleerd of de aanvraag is voltooid

  • Als de aanvraag is voltooid, retourneert de functie een valetsleutel naar het antwoord of stuurt de aanroep onmiddellijk door naar de valetsleutel-URL.
  • Als de aanvraag nog in behandeling is, moeten we een 202 retourneren die is geaccepteerd met een zelfverwijzende locatieheader, waarbij een ETA wordt weergegeven voor een voltooid antwoord in de http-Retry-After-header.
public static class AsyncOperationStatusChecker
{
    [FunctionName("AsyncOperationStatusChecker")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "RequestStatus/{thisGUID}")] HttpRequest req,
        [Blob("data/{thisGuid}.blobdata", FileAccess.Read, Connection = "StorageConnectionAppSetting")] BlockBlobClient inputBlob, string thisGUID,
        ILogger log)
    {

        OnCompleteEnum OnComplete = Enum.Parse<OnCompleteEnum>(req.Query["OnComplete"].FirstOrDefault() ?? "Redirect");
        OnPendingEnum OnPending = Enum.Parse<OnPendingEnum>(req.Query["OnPending"].FirstOrDefault() ?? "Accepted");

        log.LogInformation($"C# HTTP trigger function processed a request for status on {thisGUID} - OnComplete {OnComplete} - OnPending {OnPending}");

        // Check to see if the blob is present.
        if (await inputBlob.ExistsAsync())
        {
            // If it's present, depending on the value of the optional "OnComplete" parameter choose what to do.
            return await OnCompleted(OnComplete, inputBlob, thisGUID);
        }
        else
        {
            // If it's NOT present, check the optional "OnPending" parameter.
            string rqs = $"http://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{thisGUID}";

            switch (OnPending)
            {
                case OnPendingEnum.Accepted:
                    {
                        // Return an HTTP 202 status code.
                        return (ActionResult)new AcceptedResult() { Location = rqs };
                    }

                case OnPendingEnum.Synchronous:
                    {
                        // Back off and retry. Time out if the backoff period hits one minute
                        int backoff = 250;

                        while (!await inputBlob.ExistsAsync() && backoff < 64000)
                        {
                            log.LogInformation($"Synchronous mode {thisGUID}.blob - retrying in {backoff} ms");
                            backoff = backoff * 2;
                            await Task.Delay(backoff);
                        }

                        if (await inputBlob.ExistsAsync())
                        {
                            log.LogInformation($"Synchronous Redirect mode {thisGUID}.blob - completed after {backoff} ms");
                            return await OnCompleted(OnComplete, inputBlob, thisGUID);
                        }
                        else
                        {
                            log.LogInformation($"Synchronous mode {thisGUID}.blob - NOT FOUND after timeout {backoff} ms");
                            return (ActionResult)new NotFoundResult();
                        }
                    }

                default:
                    {
                        throw new InvalidOperationException($"Unexpected value: {OnPending}");
                    }
            }
        }
    }

    private static async Task<IActionResult> OnCompleted(OnCompleteEnum OnComplete, BlockBlobClient inputBlob, string thisGUID)
    {
        switch (OnComplete)
        {
            case OnCompleteEnum.Redirect:
                {
                    // Redirect to the SAS URI to blob storage
                    return (ActionResult)new RedirectResult(inputBlob.GenerateSASURI());
                }

            case OnCompleteEnum.Stream:
                {
                    // Download the file and return it directly to the caller.
                    // For larger files, use a stream to minimize RAM usage.
                    return (ActionResult)new OkObjectResult(await inputBlob.DownloadContentAsync());
                }

            default:
                {
                    throw new InvalidOperationException($"Unexpected value: {OnComplete}");
                }
        }
    }
}

public enum OnCompleteEnum {

    Redirect,
    Stream
}

public enum OnPendingEnum {

    Accepted,
    Synchronous
}

Volgende stappen

De volgende informatie kan relevant zijn bij het implementeren van dit patroon: