Ändringsflödesprocessorn i Azure Cosmos DB

GÄLLER FÖR: SQL API

Ändringsflödesprocessorn är en del av Azure Cosmos DB SDK V3. Det gör det enklare att läsa ändringsflödet och distribuera händelsebearbetningen mellan flera konsumenter effektivt.

Den största fördelen med ändringsflödesprocessorbiblioteket är dess feltoleranta beteende som säkerställer en "minst en gång"-leverans av alla händelser i ändringsflödet.

Komponenter i ändringsflödesprocessorn

Det finns fyra huvudkomponenter i implementeringen av ändringsflödesprocessorn:

  1. Den övervakade containern: Den övervakade containern har de data som ändringsflödet genereras från. Infogningar och uppdateringar till den övervakade containern visas i containerns ändringsflöde.

  2. Lånecontainern: Lånecontainern fungerar som en lagerplats för tillstånd och samordnar bearbetning av ändringsflödet över flera arbetsroller. Lånecontainern kan lagras i samma konto som den övervakade containern eller i ett separat konto.

  3. Beräkningsinstansen: En beräkningsinstans är värd för ändringsflödesprocessorn för att lyssna efter ändringar. Beroende på plattformen kan den representeras av en virtuell dator, en kubernetes-podd, en Azure App Service-instans, en faktisk fysisk dator. Den har en unik identifierare som refereras till som instansnamnet i den här artikeln.

  4. Ombudet: Ombudet är den kod som definierar vad du, utvecklaren, vill göra med varje batch av ändringar som ändringsflödesprocessor läser.

För att ytterligare förstå hur dessa fyra element i ändringsflödesprocessorn fungerar tillsammans ska vi titta på ett exempel i följande diagram. Den övervakade containern lagrar dokument och använder "Stad" som partitionsnyckel. Vi ser att partitionsnyckelvärdena distribueras i intervall (varje intervall som representerar en fysisk partition) som innehåller objekt. Det finns två beräkningsinstanser och ändringsflödesprocessorn tilldelar olika intervall till varje instans för att maximera beräkningsdistributionen, varje instans har ett unikt och olika namn. Varje intervall läss parallellt och dess förlopp underhålls separat från andra intervall i lånecontainern via ett lånedokument . Kombinationen av lånen representerar det aktuella tillståndet för ändringsflödesprocessorn.

Change feed processor example

Implementera ändringsflödesprocessorn

Startpunkten är alltid den övervakade containern, från en Container instans som du anropar GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Där den första parametern är ett distinkt namn som beskriver målet för den här processorn och det andra namnet är den delegatimplementering som hanterar ändringar.

Ett exempel på ett ombud är:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Därefter definierar du namnet på beräkningsinstansen eller den unika identifieraren med WithInstanceName, detta bör vara unikt och olika i varje beräkningsinstans som du distribuerar och slutligen vilken container som ska underhålla lånetillståndet med WithLeaseContainer.

När du anropar Build får du den processorinstans som du kan starta genom att anropa StartAsync.

Bearbetningslivscykel

Den normala livscykeln för en värdinstans är:

  1. Läs ändringsflödet.
  2. Om det inte finns några ändringar kan du sätta instansen i viloläge under en viss tid (anpassa med WithPollInterval i Builder) och gå till #1.
  3. Om det finns ändringar skickar du dem till ombudet.
  4. När ombudet har slutfört bearbetningen av ändringarna uppdaterar du lånearkivet med den senaste bearbetade tidpunkten och går till #1.

Felhantering

Ändringsflödesprocessorn är motståndskraftig mot fel med användarkod. Det innebär att om implementeringen av ombudet har ett ohanterat undantag (steg 4) stoppas trådbearbetningen av just den batchen med ändringar och en ny tråd skapas. Den nya tråden kontrollerar vilken tidpunkt som var den senaste tidpunkten som lånearkivet har för det intervallet med partitionsnyckelvärden och startar om därifrån, vilket effektivt skickar samma batch med ändringar till ombudet. Det här beteendet fortsätter tills ombudet bearbetar ändringarna korrekt och det är anledningen till att ändringsflödesprocessorn har en "minst en gång"-garanti.

Anteckning

Det finns bara ett scenario där en batch med ändringar inte görs på nytt. Om felet inträffar vid den första ombudskörningen någonsin har lånearkivet inget tidigare sparat tillstånd som ska användas vid återförsöket. I dessa fall skulle återförsöket använda den inledande startkonfigurationen, som kanske eller kanske inte innehåller den sista batchen.

Om du vill förhindra att ändringsflödesprocessorn "fastnar" och kontinuerligt försöker utföra samma batch med ändringar igen, bör du lägga till logik i ombudskoden för att skriva dokument, om undantagsvis, till en kö med obeställbara meddelanden. Den här designen säkerställer att du kan hålla reda på obearbetade ändringar samtidigt som du kan fortsätta att bearbeta framtida ändringar. Kön med obeställbara meddelanden kan vara en annan Cosmos-container. Det exakta datalagret spelar ingen roll, helt enkelt att de obearbetade ändringarna sparas.

Dessutom kan du använda ändringsflödesestimatorn för att övervaka förloppet för dina instanser av ändringsflödesprocessorn när de läser ändringsflödet eller använder livscykelmeddelandena för att identifiera underliggande fel.

Livscykelmeddelanden

Med ändringsflödesprocessorn kan du koppla till relevanta händelser i livscykeln, du kan välja att meddelas till en eller alla av dem. Rekommendationen är att minst registrera felmeddelandet:

  • Registrera en hanterare för att meddelas när den aktuella värden skaffar ett lån för WithLeaseAcquireNotification att börja bearbeta det.
  • Registrera en hanterare för WithLeaseReleaseNotification att meddelas när den aktuella värden släpper ett lån och slutar bearbeta det.
  • Registrera en hanterare för WithErrorNotification att meddelas när den aktuella värden stöter på ett undantag under bearbetningen, kan skilja om källan är användardelegaten (ohanterat undantag) eller ett fel som processorn stöter på försöker komma åt den övervakade containern (till exempel nätverksproblem).
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Distributionsenhet

En distributionsenhet för en ändringsflödesprocessor består av en eller flera beräkningsinstanser med samma processorName och lånecontainerkonfiguration men olika instansnamn. Du kan ha många distributionsenheter där var och en har olika affärsflöde för ändringarna och varje distributionsenhet som består av en eller flera instanser.

Du kan till exempel ha en distributionsenhet som utlöser ett externt API när det sker en ändring i containern. En annan distributionsenhet kan flytta data i realtid varje gång det sker en ändring. När en ändring sker i din övervakade container meddelas alla dina distributionsenheter.

Dynamisk skalning

Som tidigare nämnts kan du i en distributionsenhet ha en eller flera beräkningsinstanser. För att dra nytta av beräkningsdistributionen i distributionsenheten är de enda viktiga kraven:

  1. Alla instanser ska ha samma lånecontainerkonfiguration.
  2. Alla instanser ska ha samma processorName.
  3. Varje instans måste ha ett unikt instansnamn (WithInstanceName).

Om dessa tre villkor gäller distribuerar ändringsflödesprocessorn, med en lika fördelningsalgoritm, alla lån i lånecontainern över alla pågående instanser av distributionsenheten och parallelliserar beräkning. Ett lån kan bara ägas av en instans vid en viss tidpunkt, så det maximala antalet instanser är lika med antalet lån.

Antalet instanser kan växa och krympa, och ändringsflödesprocessorn justerar belastningen dynamiskt genom att omdistribuera därefter.

Dessutom kan ändringsflödesprocessorn dynamiskt justeras till containrars skalning på grund av att dataflödet eller lagringen ökar. När containern växer hanterar ändringsflödesprocessorn transparent dessa scenarier genom att dynamiskt öka lånen och distribuera de nya lånen mellan befintliga instanser.

Ändra feed och etablerat dataflöde

Läsåtgärder för ändringsflöde i den övervakade containern förbrukar enheter för begäranden. Kontrollera att den övervakade containern inte har någon begränsning, annars uppstår fördröjningar i mottagandet av ändringsflödeshändelser på dina processorer.

Åtgärder i lånecontainern (uppdatering och underhåll av tillstånd) förbrukar enheter för begäran. Desto högre antal instanser som använder samma lånecontainer, desto högre blir den potentiella förbrukningen för enheter för begäran. Kontrollera att lånecontainern inte har begränsningar, annars kan det uppstå fördröjningar i mottagandet av ändringsflödeshändelser på dina processorer, i vissa fall där begränsningen är hög kan processorerna sluta bearbeta helt.

Starttid

När en ändringsflödesprocessor startar första gången initieras som standard lånecontainern och dess bearbetningslivscykel startas. Ändringar som har inträffat i den övervakade containern innan ändringsflödesprocessorn initierades för första gången identifieras inte.

Läsa från ett tidigare datum och en tidigare tid

Det går att initiera ändringsflödesprocessorn för att läsa ändringar som börjar vid ett visst datum och en viss tid genom att skicka en instans av en DateTime till WithStartTime builder-tillägget:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

Ändringsflödesprocessorn initieras för det specifika datumet och tiden och börjar läsa ändringarna som inträffade efteråt.

Anteckning

Det går inte att starta ändringsflödesprocessorn vid ett visst datum och en viss tid i skrivkonton i flera regioner.

Läsa från början

I andra scenarier som datamigreringar eller analys av hela historiken för en container måste vi läsa ändringsflödet från början av containerns livslängd. För att göra det kan vi använda WithStartTime på builder-tillägget, men genom att skicka DateTime.MinValue.ToUniversalTime(), vilket skulle generera UTC-representationen av minimivärdet DateTime , så här:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Ändringsflödesprocessorn initieras och börjar läsa ändringar från början av containerns livslängd.

Anteckning

De här anpassningsalternativen fungerar bara för att konfigurera startpunkten i tid för ändringsflödesprocessorn. När lånecontainern har initierats för första gången har det ingen effekt att ändra dem.

Dela lånecontainern

Du kan dela lånecontainern mellan flera distributionsenheter. Varje distributionsenhet lyssnar på en annan övervakad container eller har en annan processorName. Med den här konfigurationen skulle varje distributionsenhet upprätthålla ett oberoende tillstånd för lånecontainern. Granska förbrukningen för begärandeenheten i lånecontainern för att kontrollera att det etablerade dataflödet räcker för alla distributionsenheter.

Var du ska vara värd för ändringsflödesprocessorn

Ändringsflödesprocessorn kan finnas på valfri plattform som stöder tidskrävande processer eller uppgifter:

Även om ändringsflödesprocessorn kan köras i kortvariga miljöer, eftersom lånecontainern behåller tillståndet, kommer startcykeln för dessa miljöer att lägga till fördröjning i mottagandet av meddelandena (på grund av kostnaden för att starta processorn varje gång miljön startas).

Ytterligare resurser

Nästa steg

Du kan nu fortsätta att lära dig mer om ändringsflödesprocessorn i följande artiklar: