Realice operaciones en masa con datos de Azure Cosmos DB

SE APLICA A: NoSQL

En este tutorial se proporcionan instrucciones sobre cómo realizar operaciones masivas en la Versión 4 del SDK de Java para Azure Cosmos DB. Esta versión del SDK ya incluye la biblioteca Bulk Executor. Si utiliza una versión anterior del SDK de Java, se recomienda migrar a la última versión. El SDK de Java V4 de Azure Cosmos DB es la solución recomendada actualmente para la compatibilidad masiva de Java.

Actualmente, la biblioteca Bulk Executor solo es compatible con las cuentas de Azure Cosmos DB for NoSQL y API para Gremlin. Para obtener información sobre cómo usar la biblioteca Bulk Executor de .NET con la API para Gremlin, consulte realizar operaciones en masa en Azure Cosmos DB para Gremlin.

Prerrequisitos

Clonación de la aplicación de ejemplo

Ahora trabajaremos con código mediante la descarga desde GitHub de un repositorio de ejemplos genérico para la versión 4 del SDK de Java para Azure Cosmos DB. Estas aplicaciones de ejemplo pueden realizar operaciones CRUD y otras operaciones comunes en Azure Cosmos DB. Para clonar el repositorio, abra un símbolo del sistema, vaya al directorio donde quiere copiar la aplicación y ejecute el siguiente comando:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

El repositorio clonado contiene una clase de ejemplo SampleBulkQuickStartAsync.java en la carpeta /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. Esa aplicación genera documentos y ejecuta operaciones para crear, actualizar o insertar (upsert), reemplazar y eliminar elementos de forma masiva en Azure Cosmos DB. En las siguientes secciones, se revisará el código de la aplicación de ejemplo.

Ejecución de operaciones masivas en Azure Cosmos DB

  1. Las cadenas de conexión de Azure Cosmos DB se leen como argumentos y se asignan a variables definidas en el archivo /examples/common/AccountSettings.java. Estas variables de entorno se deben establecer
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Para ejecutar el ejemplo de operaciones masivas, especifique su clase principal:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. El objeto CosmosAsyncClient se inicializa con las siguientes instrucciones:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. El ejemplo creará una base de datos y un contenedor asincrónicos. A continuación, creará varios documentos en los que se ejecutarán las operaciones masivas. Estos documentos se agregarán a un objeto de secuencia reactiva Flux<Family>:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. El ejemplo contiene métodos para crear, actualizar o insertar (upsert), reemplazar y eliminar de forma masiva. En cada método, asignaremos los documentos de familias de la secuencia BulkWriter Flux<Family> con varias llamadas de método en CosmosBulkOperations. Estas operaciones se agregarán a otro objeto Flux<CosmosItemOperation> de secuencia reactiva. A continuación, la secuencia se pasará al método executeBulkOperations del elemento asincrónico container que creamos al principio y las operaciones se ejecutarán en masa. Observe el siguiente método de creación masiva a modo de ejemplo:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. En el directorio donde se ubica la aplicación de ejemplo, también habrá una clase BulkWriter.java. Esta clase muestra cómo controlar los errores de limitación de velocidad (429) y tiempo de espera (408) que pueden producirse durante la ejecución masiva y cómo volver a intentar esas operaciones de forma eficaz. Se implementa en los métodos siguientes, que también muestran cómo implementar el control de rendimiento local y global.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Además, en el ejemplo hay métodos de creación masiva que muestran cómo agregar el procesamiento de respuestas y establecer las opciones de ejecución:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    Consejos de rendimiento

    Tenga en cuenta los siguientes puntos para mejorar el rendimiento al utilizar la biblioteca BulkExecutor:

    • Para obtener el mejor rendimiento, ejecute la aplicación desde una máquina virtual de Azure en la misma región que la región de escritura de la cuenta de Azure Cosmos DB.

    • Para lograr mayor rendimiento:

      • Establezca el tamaño del montón de JVM en un número lo suficientemente alto para evitar cualquier problema de memoria al controlar grandes volúmenes de documentos. Tamaño de montón sugerido: máx.(3 GB, 3 * tamaño de[todos los documentos pasados a la API de importación en bloque en un lote)].
      • Hay un tiempo de procesamiento, por el que se obtendrá un mayor rendimiento al realizar operaciones en masa con un gran número de documentos. Por tanto, si va a importar 10 000 000 documentos, es preferible ejecutar la importación en bloque diez veces de diez documentos masivos, cada uno de un tamaño de 1 000 0000, en lugar de ejecutar cien veces la importación en bloque de cien documentos masivos, cada uno de ellos de una tamaño de 100 000 documentos.
    • Se recomienda crear una única instancia de un objeto CosmosAsyncClient en toda la aplicación, dentro de una sola máquina virtual que se corresponda con un contenedor específico de Azure Cosmos DB.

    • Esto se debe a que una única ejecución de API de operaciones en masa consume un gran fragmento de E/S de red y de CPU del equipo cliente. Esto sucede al generar varias tareas internamente y al evitar la creación de varias tareas simultáneas dentro de su proceso de aplicación, donde cada una ejecuta llamadas API de operaciones en masa. Si una única llamada API de operaciones en bloque en una única máquina virtual no puede consumir la capacidad de proceso de todo el contenedor (si la capacidad de proceso del contenedor es superior a > 1 millón RU/s), es preferible crear máquinas virtuales independientes para ejecutar llamadas API de operaciones en bloque simultáneamente.

    Pasos siguientes