Perform bulk operations on Azure Cosmos DB data
APPLIES TO:
SQL API
This tutorial provides instructions on performing bulk operations in the Azure Cosmos DB Java V4 SDK. This version of the SDK comes with the bulk executor library built-in. If you are using an older version of Java SDK, it's recommended to migrate to the latest version. Azure Cosmos DB Java V4 SDK is the current recommended solution for Java bulk support.
Currently, the bulk executor library is supported only by Azure Cosmos DB SQL API and Gremlin API accounts. To learn about using bulk executor .NET library with Gremlin API, see perform bulk operations in Azure Cosmos DB Gremlin API.
Prerequisites
If you don't have an Azure subscription, create a free account before you begin.
You can try Azure Cosmos DB for free without an Azure subscription, free of charge and commitments. Or, you can use the Azure Cosmos DB Emulator with the
https://localhost:8081
endpoint. The Primary Key is provided in Authenticating requests.Java Development Kit (JDK) 1.8+
On Ubuntu, run
apt-get install default-jdk
to install the JDK.Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
Download and install a Maven binary archive
- On Ubuntu, you can run
apt-get install maven
to install Maven.
- On Ubuntu, you can run
Create an Azure Cosmos DB SQL API account by using the steps described in the create database account section of the Java quickstart article.
Clone the sample application
Now let's switch to working with code by downloading a generic samples repository for Java V4 SDK for Azure Cosmos DB from GitHub. These sample applications perform CRUD operations and other common operations on Azure Cosmos DB. To clone the repository, open a command prompt, navigate to the directory where you want to copy the application and run the following command:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
The cloned repository contains a sample SampleBulkQuickStartAsync.java
in the /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async
folder. The application generates documents and executes operations to bulk create, upsert, replace and delete items in Azure Cosmos DB. In the next sections, we will review the code in the sample app.
Bulk execution in Azure Cosmos DB
- The Azure Cosmos DB's connection strings are read as arguments and assigned to variables defined in /
examples/common/AccountSettings.java
file. These environment variables must be set
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
To run the bulk sample, specify its Main Class:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
The
CosmosAsyncClient
object is initialized by using the following statements:client = new CosmosClientBuilder().endpoint(AccountSettings.HOST).key(AccountSettings.MASTER_KEY) .preferredRegions(preferredRegions).contentResponseOnWriteEnabled(true) .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
The sample creates an async database and container. It then creates multiple documents on which bulk operations will be executed. It adds these documents to a
Flux<Family>
reactive stream object:createDatabaseIfNotExists(); createContainerIfNotExists(); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); Family johnsonFamilyItem = Families.getJohnsonFamilyItem(); Family smithFamilyItem = Families.getSmithFamilyItem(); // Setup family items to create Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
The sample contains methods for bulk create, upsert, replace, and delete. In each method we map the families documents in the BulkWriter
Flux<Family>
stream to multiple method calls inCosmosBulkOperations
. These operations are added to another reactive stream objectFlux<CosmosItemOperation>
. The stream is then passed to theexecuteBulkOperations
method of the asynccontainer
we created at the beginning, and operations are executed in bulk. See thebulkCreateItems
method below as an example:private void bulkCreateItems(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).blockLast(); }
There is also a class
BulkWriter.java
in the same directory as the sample application. This class demonstrates how to handle rate limiting (429) and timeout (408) errors that may occur during bulk execution, and retrying those operations effectively. It is implemented in thebulkCreateItemsSimple()
method in the application.private void bulkCreateItemsSimple() { Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().blockLast(); }
Additionally, there are bulk create methods in the sample which illustrate how to add response processing, and set execution options:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> { CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation(); if (cosmosBulkOperationResponse.getException() != null) { logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); } else if (cosmosBulkOperationResponse.getResponse() == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) { logger.error("The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete successfully with " + "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse.getStatusCode()); } else { logger.info("Item ID: [{}] Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName()); logger.info("Status Code: {}", String.valueOf(cosmosBulkItemResponse.getStatusCode())); logger.info("Request Charge: {}", String.valueOf(cosmosBulkItemResponse.getRequestCharge())); } return Mono.just(cosmosBulkItemResponse); }).blockLast(); } private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) { CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); ImplementationBridgeHelpers .CosmosBulkExecutionOptionsHelper .getCosmosBulkExecutionOptionsAccessor() .setMaxMicroBatchSize(bulkExecutionOptions, 10); Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast(); }
Performance tips
Consider the following points for better performance when using bulk executor library:
For best performance, run your application from an Azure VM in the same region as your Cosmos DB account write region.
For achieving higher throughput:
- Set the JVM's heap size to a large enough number to avoid any memory issue in handling large number of documents. Suggested heap size: max(3 GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
- There is a preprocessing time, due to which you will get higher throughput when performing bulk operations with a large number of documents. So, if you want to import 10,000,000 documents, running bulk import 10 times on 10 bulk of documents each of size 1,000,000 is preferable than running bulk import 100 times on 100 bulk of documents each of size 100,000 documents.
It is recommended to instantiate a single CosmosAsyncClient object for the entire application within a single virtual machine that corresponds to a specific Azure Cosmos container.
Since a single bulk operation API execution consumes a large chunk of the client machine's CPU and network IO. This happens by spawning multiple tasks internally, avoid spawning multiple concurrent tasks within your application process each executing bulk operation API calls. If a single bulk operation API calls running on a single virtual machine is unable to consume your entire container's throughput (if your container's throughput > 1 million RU/s), it's preferable to create separate virtual machines to concurrently execute bulk operation API calls.
Next steps
- To learn about maven package details and release notes of bulk executor Java library, seebulk executor SDK details.
Feedback
Submit and view feedback for