Migrate from the bulk executor library to the bulk support in Azure Cosmos DB .NET V3 SDK

APPLIES TO: NoSQL

This article describes the required steps to migrate an existing application's code that uses the .NET bulk executor library to the bulk support feature in the latest version of the .NET SDK.

Enable bulk support

Enable bulk support on the CosmosClient instance through the AllowBulkExecution configuration:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Create Tasks for each operation

Bulk support in the .NET SDK works by leveraging the Task Parallel Library and grouping operations that occur concurrently.

There is no single method in the SDK that will take your list of documents or operations as an input parameter, but rather, you need to create a Task for each operation you want to execute in bulk, and then simply wait for them to complete.

For example, if your initial input is a list of items where each item has the following schema:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

If you want to do bulk import (similar to using BulkExecutor.BulkImportAsync), you need to have concurrent calls to CreateItemAsync. For example:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

If you want to do bulk update (similar to using BulkExecutor.BulkUpdateAsync), you need to have concurrent calls to ReplaceItemAsync method after updating the item value. For example:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

And if you want to do bulk delete (similar to using BulkExecutor.BulkDeleteAsync), you need to have concurrent calls to DeleteItemAsync, with the id and partition key of each item. For example:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Capture task result state

In the previous code examples, we have created a concurrent list of tasks, and called the CaptureOperationResponse method on each of those tasks. This method is an extension that lets us maintain a similar response schema as BulkExecutor, by capturing any errors and tracking the request units usage.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Where the OperationResponse is declared as:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Execute operations concurrently

To track the scope of the entire list of Tasks, we use this helper class:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

The ExecuteAsync method will wait until all operations are completed and you can use it like so:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Capture statistics

The previous code waits until all operations are completed and calculates the required statistics. These statistics are similar to that of the bulk executor library's BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

The BulkOperationResponse contains:

  1. The total time taken to process the list of operations through bulk support.
  2. The number of successful operations.
  3. The total of request units consumed.
  4. If there are failures, it displays a list of tuples that contain the exception and the associated item for logging and identification purpose.

Retry configuration

Bulk executor library had guidance that mentioned to set the MaxRetryWaitTimeInSeconds and MaxRetryAttemptsOnThrottledRequests of RetryOptions to 0 to delegate control to the library.

For bulk support in the .NET SDK, there is no hidden behavior. You can configure the retry options directly through the CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests and CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Note

In cases where the provisioned request units is much lower than the expected based on the amount of data, you might want to consider setting these to high values. The bulk operation will take longer but it has a higher chance of completely succeeding due to the higher retries.

Performance improvements

As with other operations with the .NET SDK, using the stream APIs results in better performance and avoids any unnecessary serialization.

Using stream APIs is only possible if the nature of the data you use matches that of a stream of bytes (for example, file streams). In such cases, using the CreateItemStreamAsync, ReplaceItemStreamAsync, or DeleteItemStreamAsync methods and working with ResponseMessage (instead of ItemResponse) increases the throughput that can be achieved.

Next steps