Använda ändringsflödesestimatorn

GÄLLER FÖR: NoSQL

Den här artikeln beskriver hur du kan övervaka förloppet för dina instanser av ändringsflödesprocessorn när de läser ändringsflödet.

Varför är övervakningen av förloppet viktig?

Ändringsflödesprocessorn fungerar som en pekare som går framåt i ändringsflödet och levererar ändringarna till en delegatimplementering.

Distributionen av ändringsflödesprocessorn kan bearbeta ändringar med en viss hastighet baserat på dess tillgängliga resurser, till exempel CPU, minne, nätverk och så vidare.

Om den här hastigheten är långsammare än den hastighet med vilken dina ändringar sker i Azure Cosmos DB-containern börjar processorn släpa efter.

Genom att identifiera det här scenariot kan du förstå om vi behöver skala distributionen av ändringsflödesprocessorn.

Implementera ändringsflödesestimatorn

Som push-modell för automatiska meddelanden

Precis som ändringsflödesprocessorn kan ändringsflödesberäknaren fungera som en push-modell. Skattaren mäter skillnaden mellan det senast bearbetade objektet (definierat av tillståndet för lånecontainern) och den senaste ändringen i containern och push-överför det här värdet till ett ombud. Intervallet för mätningen kan också anpassas med ett standardvärde på 5 sekunder.

Om till exempel ändringsflödesprocessorn definieras så här:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Rätt sätt att initiera en skattare för att mäta att processorn skulle använda GetChangeFeedEstimatorBuilder så här:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Där både processorn och skattaren delar samma leaseContainer och samma namn.

De andra två parametrarna är ombudet, som får ett tal som representerar hur många ändringar som ska läsas av processorn och det tidsintervall som du vill att mätningen ska utföras med.

Ett exempel på ett ombud som tar emot uppskattningen är:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Du kan skicka den här uppskattningen till din övervakningslösning och använda den för att förstå hur dina framsteg fungerar över tid.

Som en detaljerad uppskattning på begäran

Till skillnad från push-modellen finns det ett alternativ som gör att du kan få uppskattningen på begäran. Den här modellen innehåller också mer detaljerad information:

  • Den beräknade fördröjningen per lån.
  • Instansen äger och bearbetar varje lån, så att du kan identifiera om det finns ett problem på en instans.

Om ändringsflödesprocessorn definieras så här:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Du kan skapa uppskattningen med samma lånekonfiguration:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

Och när du vill ha det, med den frekvens du behöver, kan du få den detaljerade uppskattningen:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Var ChangeFeedProcessorState och en innehåller låne- och fördröjningsinformationen, och även vem som är den aktuella instansen som äger den.

Beräkningsdistribution

Ändringsflödesberäknaren behöver inte distribueras som en del av ändringsflödesprocessorn eller vara en del av samma projekt. Vi rekommenderar att du distribuerar uppskattningen på en oberoende och helt annan instans än dina processorer. En enskild beräkningsinstans kan spåra förloppet för alla lån och instanser i distributionen av ändringsflödesprocessorn.

Varje uppskattning förbrukar enheter för begäranden från dina övervakade och leasade containrar. En frekvens på 1 minut däremellan är en bra startpunkt, desto lägre frekvens, desto högre förbrukas enheter för programbegäran.

Ytterligare resurser

Nästa steg

Du kan nu fortsätta att lära dig mer om ändringsflödesprocessorn i följande artiklar: