Migrate from the bulk executor library to the bulk support in Azure Cosmos DB Java V4 SDK

APPLIES TO: NoSQL

This article describes the required steps to migrate an existing application's code that uses the Java bulk executor library to the bulk support feature in the latest version of the Java SDK.

Enable bulk support

To use bulk support in the Java SDK, include the import below:

import com.azure.cosmos.models.*;

Add documents to a reactive stream

Bulk support in the Java V4 SDK works by adding documents to a reactive stream object. For example, you can add each document individually:

Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();

//  Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);

Or you can add the documents to the stream from a list, using fromIterable:

class SampleDoc {
    public SampleDoc() {
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    private String id="";
}
List<SampleDoc> docList = new ArrayList<>();
for (int i = 1; i <= 5; i++){ 
    SampleDoc doc = new SampleDoc();           
    String id = "id-"+i;
    doc.setId(id);
    docList.add(doc);
}
           
Flux<SampleDoc> docs = Flux.fromIterable(docList);

If you want to do bulk create or upsert items (similar to using DocumentBulkExecutor.importAll), you need to pass the reactive stream to a method like the following:

private void bulkUpsertItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations.getUpsertItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

You can also use a method like the below, but this is only used for creating items:

private void bulkCreateItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

The DocumentBulkExecutor.importAll method in the old BulkExecutor library was also used to bulk patch items. The old DocumentBulkExecutor.mergeAll method was also used for patch, but only for the set patch operation type. To do bulk patch operations in the V4 SDK, first you need to create patch operations:

CosmosPatchOperations patchOps = CosmosPatchOperations.create().add("/country", "United States")
        .set("/registered", 0);

Then you can pass the operations, along with the reactive stream of documents, to a method like the below. In this example, we apply both add and set patch operation types. The full set of patch operation types supported can be found here in our overview of partial document update in Azure Cosmos DB.

private void bulkPatchItems(Flux<Family> families, CosmosPatchOperations operations) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations
            .getPatchItemOperation(family.getId(), new PartitionKey(family.getLastName()), operations));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

Note

In the above example, we apply add and set to patch elements whose root parent exists. However, you cannot do this where the root parent does not exist. This is because Azure Cosmos DB partial document update is inspired by JSON Patch RFC 6902. If patching where root parent does not exist, first read back the full documents, then use a method like the below to replace the documents:

private void bulkReplaceItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations
            .getReplaceItemOperation(family.getId(), family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

And if you want to do bulk delete (similar to using DocumentBulkExecutor.deleteAll), you need to use bulk delete:

private void bulkDeleteItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations
            .getDeleteItemOperation(family.getId(), new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}

Retries, timeouts, and throughput control

The bulk support in Java V4 SDK doesn't handle retries and timeouts natively. You can refer to the guidance in Bulk Executor - Java Library, which includes a sample that implements an abstraction for handling retries and timeouts properly. The sample also has examples for local and global throughput control. You can also refer to the section should my application retry on errors for more guidance on the different kinds of errors that can occur, and best practices for handling retries.

Next steps