Perform bulk operations on Azure Cosmos DB data

APPLIES TO: SQL API

This tutorial provides instructions on performing bulk operations in the Azure Cosmos DB Java V4 SDK. This version of the SDK comes with the bulk executor library built-in. If you are using an older version of Java SDK, it's recommended to migrate to the latest version. Azure Cosmos DB Java V4 SDK is the current recommended solution for Java bulk support.

Currently, the bulk executor library is supported only by Azure Cosmos DB SQL API and Gremlin API accounts. To learn about using bulk executor .NET library with Gremlin API, see perform bulk operations in Azure Cosmos DB Gremlin API.

Prerequisites

Clone the sample application

Now let's switch to working with code by downloading a generic samples repository for Java V4 SDK for Azure Cosmos DB from GitHub. These sample applications perform CRUD operations and other common operations on Azure Cosmos DB. To clone the repository, open a command prompt, navigate to the directory where you want to copy the application and run the following command:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

The cloned repository contains a sample SampleBulkQuickStartAsync.java in the /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async folder. The application generates documents and executes operations to bulk create, upsert, replace and delete items in Azure Cosmos DB. In the next sections, we will review the code in the sample app.

Bulk execution in Azure Cosmos DB

  1. The Azure Cosmos DB's connection strings are read as arguments and assigned to variables defined in /examples/common/AccountSettings.java file. These environment variables must be set
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

To run the bulk sample, specify its Main Class:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. The CosmosAsyncClient object is initialized by using the following statements:

         client = new CosmosClientBuilder().endpoint(AccountSettings.HOST).key(AccountSettings.MASTER_KEY)
                 .preferredRegions(preferredRegions).contentResponseOnWriteEnabled(true)
                 .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
  2. The sample creates an async database and container. It then creates multiple documents on which bulk operations will be executed. It adds these documents to a Flux<Family> reactive stream object:

        createDatabaseIfNotExists();
        createContainerIfNotExists();
    
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
        Family smithFamilyItem = Families.getSmithFamilyItem();
    
        //  Setup family items to create
        Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. The sample contains methods for bulk create, upsert, replace, and delete. In each method we map the families documents in the BulkWriter Flux<Family> stream to multiple method calls in CosmosBulkOperations. These operations are added to another reactive stream object Flux<CosmosItemOperation>. The stream is then passed to the executeBulkOperations method of the async container we created at the beginning, and operations are executed in bulk. See the bulkCreateItems method below as an example:

     private void bulkCreateItems(Flux<Family> families) {
         Flux<CosmosItemOperation> cosmosItemOperations =
                 families.map(family -> CosmosBulkOperations.getCreateItemOperation(family,
                         new PartitionKey(family.getLastName())));
         container.executeBulkOperations(cosmosItemOperations).blockLast();
     }
    
  4. There is also a class BulkWriter.java in the same directory as the sample application. This class demonstrates how to handle rate limiting (429) and timeout (408) errors that may occur during bulk execution, and retrying those operations effectively. It is implemented in the bulkCreateItemsSimple() method in the application.

        private void bulkCreateItemsSimple() {
            Family andersenFamilyItem = Families.getAndersenFamilyItem();
            Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
            CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
            CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
            BulkWriter bulkWriter = new BulkWriter(container);
            bulkWriter.scheduleWrites(andersonItemOperation);
            bulkWriter.scheduleWrites(wakeFieldItemOperation);
            bulkWriter.execute().blockLast();
        }
    
  5. Additionally, there are bulk create methods in the sample which illustrate how to add response processing, and set execution options:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations =
                families.map(family -> CosmosBulkOperations.getCreateItemOperation(family,
                        new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkOperationResponse.getResponse() == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
                logger.error("The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete successfully with " +
                                "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(),
                        cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse.getStatusCode());
            } else {
                logger.info("Item ID: [{}]  Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(),
                        cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", String.valueOf(cosmosBulkItemResponse.getStatusCode()));
                logger.info("Request Charge: {}", String.valueOf(cosmosBulkItemResponse.getRequestCharge()));
            }
            return Mono.just(cosmosBulkItemResponse);
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
        ImplementationBridgeHelpers
                .CosmosBulkExecutionOptionsHelper
                .getCosmosBulkExecutionOptionsAccessor()
                .setMaxMicroBatchSize(bulkExecutionOptions, 10);
        Flux<CosmosItemOperation> cosmosItemOperations =
                families.map(family -> CosmosBulkOperations.getCreateItemOperation(family,
                        new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    Performance tips

    Consider the following points for better performance when using bulk executor library:

    • For best performance, run your application from an Azure VM in the same region as your Cosmos DB account write region.

    • For achieving higher throughput:

      • Set the JVM's heap size to a large enough number to avoid any memory issue in handling large number of documents. Suggested heap size: max(3 GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
      • There is a preprocessing time, due to which you will get higher throughput when performing bulk operations with a large number of documents. So, if you want to import 10,000,000 documents, running bulk import 10 times on 10 bulk of documents each of size 1,000,000 is preferable than running bulk import 100 times on 100 bulk of documents each of size 100,000 documents.
    • It is recommended to instantiate a single CosmosAsyncClient object for the entire application within a single virtual machine that corresponds to a specific Azure Cosmos container.

    • Since a single bulk operation API execution consumes a large chunk of the client machine's CPU and network IO. This happens by spawning multiple tasks internally, avoid spawning multiple concurrent tasks within your application process each executing bulk operation API calls. If a single bulk operation API calls running on a single virtual machine is unable to consume your entire container's throughput (if your container's throughput > 1 million RU/s), it's preferable to create separate virtual machines to concurrently execute bulk operation API calls.

    Next steps