The change feed processor in .NET is available for latest version mode and all versions and deletes mode. All versions and deletes mode is in preview and is supported for the change feed processor beginning in version 3.40.0-preview.0
. The point of entry for both modes is always the monitored container.
To read using latest version mode, in a Container
instance, you call GetChangeFeedProcessorBuilder
:
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
To read using all versions and deletes mode, call GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes
from the Container
instance:
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
For both modes, the first parameter is a distinct name that describes the goal of this processor. The second name is the delegate implementation that handles changes.
Here's an example of a delegate for latest version mode:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
Here's an example of a delegate for all versions and deletes mode:
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ChangeFeedItem<ToDoItem> item in changes)
{
if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
}
else
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
}
// Simulate work
await Task.Delay(1);
}
}
Afterward, you define the compute instance name or unique identifier by using WithInstanceName
. The compute instance name should be unique and different for each compute instance you're deploying. You set the container to maintain the lease state by using WithLeaseContainer
.
Calling Build
gives you the processor instance that you can start by calling StartAsync
.
The normal life cycle of a host instance is:
- Read the change feed.
- If there are no changes, sleep for a predefined amount of time (customizable by using
WithPollInterval
in the Builder) and go to #1.
- If there are changes, send them to the delegate.
- When the delegate finishes processing the changes successfully, update the lease store with the latest processed point in time and go to #1.
The change feed processor is resilient to user code errors. If your delegate implementation has an unhandled exception (step #4), the thread that is processing that particular batch of changes stops, and a new thread is eventually created. The new thread checks the latest point in time that the lease store has saved for that range of partition key values. The new thread restarts from there, effectively sending the same batch of changes to the delegate. This behavior continues until your delegate processes the changes correctly, and it's the reason the change feed processor has an "at least once" guarantee.
Note
In only one scenario, a batch of changes is not retried. If the failure happens on the first-ever delegate execution, the lease store has no previous saved state to be used on the retry. In those cases, the retry uses the initial starting configuration, which might or might not include the last batch.
To prevent your change feed processor from getting "stuck" continuously retrying the same batch of changes, you should add logic in your delegate code to write documents, upon exception, to an errored-message queue. This design ensures that you can keep track of unprocessed changes while still being able to continue to process future changes. The errored-message queue might be another Azure Cosmos DB container. The exact data store doesn't matter. You simply want the unprocessed changes to be persisted.
You also can use the change feed estimator to monitor the progress of your change feed processor instances as they read the change feed, or you can use life cycle notifications to detect underlying failures.
You can connect the change feed processor to any relevant event in its life cycle. You can choose to be notified to one or all of them. The recommendation is to at least register the error notification:
- Register a handler for
WithLeaseAcquireNotification
to be notified when the current host acquires a lease to start processing it.
- Register a handler for
WithLeaseReleaseNotification
to be notified when the current host releases a lease and stops processing it.
- Register a handler for
WithErrorNotification
to be notified when the current host encounters an exception during processing. You need to be able to distinguish whether the source is the user delegate (an unhandled exception) or an error that the processor encounters when it tries to access the monitored container (for example, networking issues).
Life cycle notifications are available in both change feed modes. Here's an example of life cycle notifications in latest version mode:
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
A single change feed processor deployment unit consists of one or more compute instances that have the same value for processorName
and the same lease container configuration, but different instance names. You can have many deployment units in which each unit has a different business flow for the changes and each deployment unit consists of one or more instances.
For example, you might have one deployment unit that triggers an external API each time there's a change in your container. Another deployment unit might move data in real time each time there's a change. When a change happens in your monitored container, all your deployment units are notified.
As mentioned earlier, within a deployment unit, you can have one or more compute instances. To take advantage of the compute distribution within the deployment unit, the only key requirements are that:
- All instances should have the same lease container configuration.
- All instances should have the same value for
processorName
.
- Each instance needs to have a different instance name (
WithInstanceName
).
If these three conditions apply, then the change feed processor distributes all the leases that are in the lease container across all running instances of that deployment unit, and it parallelizes compute by using an equal-distribution algorithm. A lease is owned by one instance at any time, so the number of instances shouldn't be greater than the number of leases.
The number of instances can grow and shrink. The change feed processor dynamically adjusts the load by redistributing it accordingly.
Moreover, the change feed processor can dynamically adjust a container's scale if the container's throughput or storage increases. When your container grows, the change feed processor transparently handles the scenario by dynamically increasing the leases and distributing the new leases among existing instances.
By default, when a change feed processor starts for the first time, it initializes the lease container and starts its processing life cycle. Any changes that happened in the monitored container before the change feed processor is initialized for the first time aren't detected.
Reading from a previous date and time
It's possible to initialize the change feed processor to read changes starting at a specific date and time by passing an instance of DateTime
to the WithStartTime
builder extension:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
The change feed processor is initialized for that specific date and time, and it starts to read the changes that happened afterward.
Reading from the beginning
In other scenarios, like in data migrations or if you're analyzing the entire history of a container, you need to read the change feed from the beginning of that container's lifetime. You can use WithStartTime
on the builder extension, but pass DateTime.MinValue.ToUniversalTime()
, which generates the UTC representation of the minimum DateTime
value like in this example:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
The change feed processor is initialized, and it starts reading changes from the beginning of the lifetime of the container.
Note
These customization options work only to set up the starting point in time of the change feed processor. After the lease container is initialized for the first time, changing these options has no effect.
Customizing the starting point is only available for latest version change feed mode. When using all versions and deletes mode you must start reading from the time the processor is started, or resume from a prior lease state that is within the continuous backup retention period of your account.
For full working samples, see here. An example of a delegate implementation when reading the change feed in latest version mode is:
private static Consumer<List<JsonNode>> handleChanges() {
return (List<JsonNode> docs) -> {
logger.info("Start handleChanges()");
for (JsonNode document : docs) {
try {
//Change Feed hands the document to you in the form of a JsonNode
//As a developer you have two options for handling the JsonNode document provided to you by Change Feed
//One option is to operate on the document in the form of a JsonNode, as shown below. This is great
//especially if you do not have a single uniform data model for all documents.
logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
.writeValueAsString(document));
//You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
//as shown below. Then you can operate on the POJO.
CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
logger.info("id: " + pojo_doc.getId());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
isWorkCompleted = true;
logger.info("End handleChanges()");
};
}
Note
In this example, you pass a variable options
of type ChangeFeedProcessorOptions
, which can be used to set various values, including setStartFromBeginning
:
options = new ChangeFeedProcessorOptions();
options.setStartFromBeginning(false);
options.setLeasePrefix("myChangeFeedDeploymentUnit");
options.setFeedPollDelay(Duration.ofSeconds(5));
options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
The delegate implementation for reading the change feed in all versions and deletes mode is similar, but instead of calling .handleChanges()
, call .handleAllVersionsAndDeletesChanges()
. The All versions and deletes mode is in preview and is available in Java SDK version >= 4.42.0
.
Here's an example:
public static ChangeFeedProcessor getChangeFeedProcessorForAllVersionsAndDeletesMode(String hostName, CosmosAsyncContainer feedContainer, CosmosAsyncContainer leaseContainer) {
return new ChangeFeedProcessorBuilder()
.hostName(hostName)
.options(options)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleAllVersionsAndDeletesChanges((List<ChangeFeedProcessorItem> changeFeedProcessorItems) -> {
logger.info("--->handleAllVersionsAndDeletesChanges() START");
for (ChangeFeedProcessorItem item : changeFeedProcessorItems) {
try {
// AllVersionsAndDeletes Change Feed hands the document to you in the form of ChangeFeedProcessorItem
// As a developer you have two options for handling the ChangeFeedProcessorItem provided to you by Change Feed
// One option is to operate on the item as it is and call the different getters for different states, as shown below.
logger.info("---->DOCUMENT RECEIVED: {}", OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
.writeValueAsString(item));
logger.info("---->CURRENT RECEIVED: {}", item.getCurrent());
logger.info("---->PREVIOUS RECEIVED: {}", item.getPrevious());
logger.info("---->METADATA RECEIVED: {}", item.getChangeFeedMetaData());
// You can also transform the ChangeFeedProcessorItem to JsonNode and work on the generic json structure.
// This is great especially if you do not have a single uniform data model for all documents.
logger.info("----=>JsonNode received: " + item.toJsonNode());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
logger.info("--->handleAllVersionsAndDeletesChanges() END");
})
.buildChangeFeedProcessor();
}
In either change feed mode, you can assign it to changeFeedProcessorInstance
and pass the parameters of the compute instance name (hostName
), the monitored container (here called feedContainer
), and the leaseContainer
. Then start the change feed processor:
logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
.hostName("SampleHost_1")
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleChanges(handleChanges())
.options(options)
.buildChangeFeedProcessor();
changeFeedProcessorInstance.start()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
The normal life cycle of a host instance is:
- Read the change feed.
- If there are no changes, sleep for a predefined amount of time (customizable with
options.setFeedPollDelay
in the builder) and go to #1.
- If there are changes, send them to the delegate.
- When the delegate finishes processing the changes successfully, update the lease store by using the latest processed point in time and go to #1.
The change feed processor is resilient to user code errors. If your delegate implementation has an unhandled exception (step #4), the thread that's processing that particular batch of changes is stopped, and a new thread is created. The new thread checks the latest point in time that the lease store has saved for that range of partition key values, and it restarts from there, effectively sending the same batch of changes to the delegate. This behavior continues until your delegate processes the changes correctly. It's the reason why the change feed processor has an "at least once" guarantee.
Note
In only one scenario is a batch of changes is not retried. If the failure happens on the first-ever delegate execution, the lease store has no previous saved state to be used on the retry. In those cases, the retry uses the initial starting configuration, which might or might not include the last batch.
To prevent your change feed processor from getting "stuck" continuously retrying the same batch of changes, you should add logic in your delegate code to write documents, upon exception, to an errored-message. This design ensures that you can keep track of unprocessed changes while still being able to continue to process future changes. The errored-message might be another Azure Cosmos DB container. The exact data store doesn't matter. You simply want the unprocessed changes to be persisted.
You also can use the change feed estimator to monitor the progress of your change feed processor instances as they read the change feed.
A single change feed processor deployment unit consists of one or more compute instances that have the same lease container configuration and the same leasePrefix
, but different hostName
values. You can have many deployment units in which each one has a different business flow for the changes, and each deployment unit consists of one or more instances.
For example, you might have one deployment unit that triggers an external API each time there's a change in your container. Another deployment unit might move data in real time each time there's a change. When a change happens in your monitored container, all your deployment units are notified.
As mentioned earlier, within a deployment unit, you can have one or more compute instances. To take advantage of the compute distribution within the deployment unit, the only key requirements are that:
- All instances should have the same lease container configuration.
- All instances should have the same value set in
options.setLeasePrefix
(or none set at all).
- Each instance needs to have a different
hostName
.
If these three conditions apply, then the change feed processor distributes all the leases in the lease container across all running instances of that deployment unit, and it parallelizes compute by using an equal-distribution algorithm. A lease is owned by one instance at any time, so the number of instances shouldn't be greater than the number of leases.
The number of instances can grow and shrink. The change feed processor dynamically adjusts the load by redistributing it accordingly. Deployment units can share the same lease container, but they should each have a different leasePrefix
value.
Moreover, the change feed processor can dynamically adjust a container's scale if the container's throughput or storage increases. When your container grows, the change feed processor transparently handles the scenario by dynamically increasing the leases and distributing the new leases among existing instances.
By default, when a change feed processor starts for the first time, it initializes the lease container and starts its processing life cycle. Any changes that happened in the monitored container before the change feed processor was initialized for the first time aren't detected.
Note
Modifying the starting time of the change feed processor isn't available when you use all versions and deletes mode. Currently, you must use the default start time.
Reading from a previous date and time
It's possible to initialize the change feed processor to read changes starting at a specific date and time by setting setStartTime
in options
. The change feed processor is initialized for that specific date and time, and it starts reading the changes that happened afterward.
Reading from the beginning
In the sample, setStartFromBeginning
is set to false
, which is the same as the default value. In other scenarios, like in data migrations or if you're analyzing the entire history of a container, you need to read the change feed from the beginning of that container's lifetime. To do that, you can set setStartFromBeginning
to true
. The change feed processor is initialized, and it starts reading changes from the beginning of the lifetime of the container.
Note
These customization options work only to set up the starting point in time of the change feed processor. After the lease container is initialized for the first time, changing them has no effect.