Use the change feed estimator

APPLIES TO: SQL API

This article describes how you can monitor the progress of your change feed processor instances as they read the change feed.

Why is monitoring progress important?

The change feed processor acts as a pointer that moves forward across your change feed and delivers the changes to a delegate implementation.

Your change feed processor deployment can process changes at a particular rate based on its available resources like CPU, memory, network, and so on.

If this rate is slower than the rate at which your changes happen in your Azure Cosmos container, your processor will start to lag behind.

Identifying this scenario helps understand if we need to scale our change feed processor deployment.

Implement the change feed estimator

As a push model for automatic notifications

Like the change feed processor, the change feed estimator can work as a push model. The estimator will measure the difference between the last processed item (defined by the state of the leases container) and the latest change in the container, and push this value to a delegate. The interval at which the measurement is taken can also be customized with a default value of 5 seconds.

As an example, if your change feed processor is defined like this:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

The correct way to initialize an estimator to measure that processor would be using GetChangeFeedEstimatorBuilder like so:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Where both the processor and the estimator share the same leaseContainer and the same name.

The other two parameters are the delegate, which will receive a number that represents how many changes are pending to be read by the processor, and the time interval at which you want this measurement to be taken.

An example of a delegate that receives the estimation is:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

You can send this estimation to your monitoring solution and use it to understand how your progress is behaving over time.

As an on-demand detailed estimation

In contrast with the push model, there's an alternative that lets you obtain the estimation on demand. This model also provides more detailed information:

  • The estimated lag per lease.
  • The instance owning and processing each lease, so you can identify if there's an issue on an instance.

If your change feed processor is defined like this:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

You can create the estimator with the same lease configuration:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

And whenever you want it, with the frequency you require, you can obtain the detailed estimation:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Each ChangeFeedProcessorState will contain the lease and lag information, and also who is the current instance owning it.

Note

The change feed estimator does not need to be deployed as part of your change feed processor, nor be part of the same project. It can be independent and run in a completely different instance, which is recommended. It just needs to use the same name and lease configuration.

Additional resources

Next steps

You can now proceed to learn more about change feed processor in the following articles: