Events
Mar 17, 9 PM - Mar 21, 10 AM
Join the meetup series to build scalable AI solutions based on real-world use cases with fellow developers and experts.
Register nowThis browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
APPLIES TO:
NoSQL
This article describes the required steps to migrate an existing application's code that uses the .NET bulk executor library to the bulk support feature in the latest version of the .NET SDK.
Enable bulk support on the CosmosClient
instance through the AllowBulkExecution configuration:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Bulk support in the .NET SDK works by leveraging the Task Parallel Library and grouping operations that occur concurrently.
There is no single method in the SDK that will take your list of documents or operations as an input parameter, but rather, you need to create a Task for each operation you want to execute in bulk, and then simply wait for them to complete.
For example, if your initial input is a list of items where each item has the following schema:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
If you want to do bulk import (similar to using BulkExecutor.BulkImportAsync), you need to have concurrent calls to CreateItemAsync
. For example:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
If you want to do bulk update (similar to using BulkExecutor.BulkUpdateAsync), you need to have concurrent calls to ReplaceItemAsync
method after updating the item value. For example:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
And if you want to do bulk delete (similar to using BulkExecutor.BulkDeleteAsync), you need to have concurrent calls to DeleteItemAsync
, with the id
and partition key of each item. For example:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
In the previous code examples, we have created a concurrent list of tasks, and called the CaptureOperationResponse
method on each of those tasks. This method is an extension that lets us maintain a similar response schema as BulkExecutor, by capturing any errors and tracking the request units usage.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Where the OperationResponse
is declared as:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
To track the scope of the entire list of Tasks, we use this helper class:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
The ExecuteAsync
method will wait until all operations are completed and you can use it like so:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
The previous code waits until all operations are completed and calculates the required statistics. These statistics are similar to that of the bulk executor library's BulkImportResponse.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
The BulkOperationResponse
contains:
Bulk executor library had guidance that mentioned to set the MaxRetryWaitTimeInSeconds
and MaxRetryAttemptsOnThrottledRequests
of RetryOptions to 0
to delegate control to the library.
For bulk support in the .NET SDK, there is no hidden behavior. You can configure the retry options directly through the CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests and CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Note
In cases where the provisioned request units is much lower than the expected based on the amount of data, you might want to consider setting these to high values. The bulk operation will take longer but it has a higher chance of completely succeeding due to the higher retries.
As with other operations with the .NET SDK, using the stream APIs results in better performance and avoids any unnecessary serialization.
Using stream APIs is only possible if the nature of the data you use matches that of a stream of bytes (for example, file streams). In such cases, using the CreateItemStreamAsync
, ReplaceItemStreamAsync
, or DeleteItemStreamAsync
methods and working with ResponseMessage
(instead of ItemResponse
) increases the throughput that can be achieved.
Events
Mar 17, 9 PM - Mar 21, 10 AM
Join the meetup series to build scalable AI solutions based on real-world use cases with fellow developers and experts.
Register nowTraining
Module
Process bulk data in Azure Cosmos DB for NoSQL - Training
Perform bulk operations on Azure Cosmos DB in bulk from code using the SDK.
Certification
Microsoft Certified: Azure Cosmos DB Developer Specialty - Certifications
Write efficient queries, create indexing policies, manage, and provision resources in the SQL API and SDK with Microsoft Azure Cosmos DB.
Documentation
Azure Cosmos DB: Bulk executor .NET API, SDK & resources
Learn all about the bulk executor .NET API and SDK including release dates, retirement dates, and changes made between each version of the Azure Cosmos DB bulk executor .NET SDK.
Use bulk executor .NET library in Azure Cosmos DB for bulk import and update operations
Learn how to bulk import and update the Azure Cosmos DB documents using the bulk executor .NET library.
How to do Bulk and Transactional Batch operations with the Azure Cosmos DB .NET SDK
Matías Quaranta (@ealsur) shows Donovan Brown (@donovanbrown) how to do bulk operations with the Azure Cosmos DB .NET SDK to maximize throughput, and how to use the new Transactional Batch support to create atomic groups of operations.[0:02:21] - Demo: Batch API[0:06:32] - Demo: Bulk APIBulk import data to Azure Cosmos DB SQL API account by using the .NET SDKIntroducing Bulk support in the [Azure Cosmos DB] .NET SDKazure-cosmos-dotnet-v3/TransactionalBatch sample code (GitHub)Welcome to Azure Cosm