Effectuer des opérations en bloc sur des données Azure Cosmos DB

S’APPLIQUE À : NoSQL

Ce tutoriel fournit des instructions sur l’exécution d’opérations en bloc dans le Kit de développement logiciel (SDK) Java V4 Azure Cosmos DB. Cette version du Kit de développement logiciel (SDK) est fournie avec la bibliothèque d’exécuteurs en bloc intégrée. Si vous utilisez une version antérieure du SDK Java, il est recommandé de migrer vers la version la plus récente. Le kit de développement logiciel (SDK) Java Azure Cosmos DB V4 est la solution actuellement recommandée pour la prise en charge en bloc Java.

Actuellement, la bibliothèque de l’exécuteur en bloc est prise en charge uniquement par les comptes Azure Cosmos DB for NoSQL et API pour Gremlin. Pour en savoir plus sur l’utilisation de la bibliothèque .NET de l’exécuteur en bloc avec l’API pour Gremlin, consultez Effectuer des opérations en bloc dans Azure Cosmos DB pour Gremlin.

Prérequis

Clonage de l’exemple d’application

Nous allons maintenant passer à l’utilisation de code en téléchargeant un référentiel d’exemples génériques pour le Kit de développement logiciel (SDK) Java V4 pour Azure Cosmos DB à partir de GitHub. Ces exemples d’applications effectuent des opérations CRUD et d’autres opérations courantes sur Azure Cosmos DB. Pour cloner le répertoire, ouvrez une invite de commandes, accédez au répertoire dans lequel souhaitez copier l’application, puis exécutez la commande suivante :

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

Le référentiel cloné contient un exemple SampleBulkQuickStartAsync.java dans le dossier /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. L’application génère des documents et exécute des opérations en bloc de création, d’upsert, de remplacement et de suppression d’éléments dans Azure Cosmos DB. Dans les sections suivantes, nous allons examiner le code de l’exemple d’application.

Exécution en bloc dans Azure Cosmos DB

  1. Les chaînes de connexion d’Azure Cosmos DB sont lues en tant qu’arguments, et affectées à des variables définies dans le fichier /examples/common/AccountSettings.java. Ces variables d’environnement doivent être définies
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Pour exécuter l’exemple d’opération en bloc, spécifiez sa Classe principale :

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. L’objet CosmosAsyncClient est initialisé à l’aide des instructions suivantes :

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. L’exemple crée une base de données et un conteneur asynchrones. Il crée ensuite plusieurs documents sur lesquels des opérations en bloc seront exécutées. Il ajoute ces documents à un objet flux réactif Flux<Family> :

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. L’exemple contient des méthodes pour l’exécution en bloc d’opérations de création, d’upsert, de remplacement et de suppression. Dans chaque méthode, nous mappons les documents de familles dans le flux BulkWriter Flux<Family> à plusieurs appels de méthode dans CosmosBulkOperations. Ces opérations sont ajoutées à un autre objet flux réactif Flux<CosmosItemOperation>. Le flux est ensuite passé à la méthode executeBulkOperations du container asynchrone que nous avons créé au début, et les opérations sont exécutées en bloc. La méthode de création en bloc ci-dessous présente un exemple :

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Il existe également une classe BulkWriter.java dans le même répertoire que l’exemple d’application. Cette classe montre comment gérer les erreurs de limitation de débit (429) et de délai d’expiration (408) qui peuvent se produire pendant une exécution en bloc, et réessayer efficacement d’effectuer ces opérations. Elle est implémentée dans les méthodes ci-dessous, montrant également comment implémenter le contrôle du débit local et global.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. En outre, il existe des méthodes de création en bloc dans l’exemple, qui illustrent la manière d’ajouter un traitement de réponse et de définir des options d’exécution :

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    Conseils sur les performances

    Pour bénéficier de meilleures performances lors de l’utilisation de la bibliothèque de l’exécuteur en bloc, considérez les points suivants :

    • Pour de meilleures performances, exécutez votre application à partir d’une machine virtuelle Azure qui se trouve dans la région d’écriture du compte Azure Cosmos DB.

    • Pour atteindre un débit plus élevé :

      • Affectez à la taille du tas de la machine virtuelle Java une valeur suffisamment élevée pour éviter tout problème de mémoire lors du traitement d’un grand nombre de documents. Suggestion de taille de tas : max(3 GB, 3 * sizeof(tous les documents transmis à l’API d’importation en bloc dans un lot)).
      • Il y a un temps de prétraitement, grâce auquel vous obtiendrez un débit supérieur lors de l’exécution d’opérations en bloc avec un grand nombre de documents. Si vous souhaitez importer 10 000 000 documents, il est préférable d’exécuter une importation en bloc 10 fois sur 10 lots de documents en contenant chacun 1 000 000, plutôt que d’exécuter une importation en bloc 100 fois sur 100 lots de documents en contenant chacun 100 000.
    • Nous vous recommandons d’instancier un objet CosmosAsyncClient unique pour l’ensemble de l’application au sein d’une seule machine virtuelle, qui correspond à un conteneur Azure Cosmos DB spécifique.

    • L’exécution d’une API d’opération en bloc consomme une grande partie des E/S réseau et du processeur de l’ordinateur client. Cela est dû à la génération automatique de plusieurs tâches en interne. Évitez de générer plusieurs tâches simultanées dans votre processus d’application, exécutant chacune des appels d’API d’opérations en bloc. Si un appel d’API d’opération en bloc en cours d’exécution sur une seule machine virtuelle ne peut pas consommer le débit complet de votre conteneur (si le débit de votre conteneur est supérieur à 1 million RU/s), il est préférable de créer des machines virtuelles distinctes pour exécuter simultanément les appels d’API d’opérations en bloc.

    Étapes suivantes