Выполнение массовых операций с данными в Azure Cosmos DB

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

В этом руководстве приведены инструкции по выполнению массовых операций в пакете SDK Azure Cosmos DB для Java версии 4. В состав этой версии пакета SDK входит библиотека исполнителя массовых операций. Если вы используете более старую версию пакета SDK Java, мы рекомендуем перейти на последнюю версию. Пакет SDK Azure Cosmos DB для Java версии 4 — это рекомендуемое решение для поддержки пакетной службы Java.

В настоящее время библиотека массового исполнителя поддерживается только azure Cosmos DB для NoSQL и API для учетных записей Gremlin. Дополнительные сведения об использовании библиотеки .NET массового исполнителя с API для Gremlin см. в статье о выполнении массовых операций в Azure Cosmos DB для Gremlin.

Необходимые компоненты

Клонирование примера приложения

Теперь перейдем к написанию кода. Для этого скачаем репозиторий универсальных примеров для пакета SDK Azure Cosmos DB для Java версии 4 из GitHub. Эти примеры приложений выполняют операции CRUD и другие распространенные операции в Azure Cosmos DB. Чтобы клонировать репозиторий, откройте командную строку, перейдите в каталог, в который вы хотите скопировать приложение, и выполните следующую команду:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

Пример SampleBulkQuickStartAsync.java находится в папке /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async клонированного репозитория. Приложение создает документы и выполняет массовые операции создания, обновления и вставки (upsert), замены и удаления элементов в Azure Cosmos DB. В следующих разделах мы рассмотрим код примера приложения.

Выполнение массовых операций в Azure Cosmos DB

  1. Строки подключения Azure Cosmos DB считываются в качестве аргументов и присваиваются переменным, определенным в файле /examples/common/AccountSettings.java. Необходимо задать эти переменные среды:
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Чтобы запустить пример с массовыми операциями, укажите его основной класс:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. Объект CosmosAsyncClient инициализируется с помощью следующих операторов:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. В этом примере создаются асинхронная база данных и контейнер. Затем создаются несколько документов, в которых будут выполняться массовые операции. Эти документы добавляются в объект реактивного потока Flux<Family>:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. Этот пример содержит методы для массовых операций создания, обновления и вставки (upsert), замены и удаления. В каждом методе мы сопоставляем документы семейств в потоке BulkWriter Flux<Family> с несколькими вызовами методов в CosmosBulkOperations. Эти операции добавляются в другой объект реактивного потока Flux<CosmosItemOperation>. Затем поток передается методу executeBulkOperations асинхронного container, который был создан в начале, и выполняются массовые операции. См. приведенный ниже метод массового создания в качестве примера:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Также существует класс BulkWriter.java, находящийся в том же каталоге, что и приложение, используемое в качестве примера. В этом классе показано, как обрабатывать ошибки ограничения скорости (429) и истечения времени ожидания (408), которые могут возникнуть во время выполнения массовых операций, и запускать эти операции повторно в случае ошибок. Он реализуется в приведенных ниже методах, а также показывает, как реализовать локальный и глобальный контроль пропускной способности.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Кроме того, в примере есть методы массового создания, иллюстрирующие добавление обработки ответов и настройку параметров выполнения:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    Советы по производительности

    Учтите следующие моменты для повышения производительности при использовании библиотеки массового исполнителя:

    • Для повышения производительности запустите приложение из виртуальной машины Azure в том же регионе, что и область записи учетной записи Azure Cosmos DB.

    • Чтобы получить более высокую пропускную способность, сделайте следующее:

      • Установите для размера кучи виртуальной машины Java достаточно высокое значение, чтобы избежать проблем с памятью при обработке большого количества документов. Рекомендуется в качестве размера кучи указать большее из следующих значений: 3 ГБ или 3 × общий размер документов, передаваемых в API массового импорта в одном пакете.
      • Имеется время предварительной обработки, благодаря которому вы получаете более высокую пропускную способность при выполнении массовых операций с большим количеством документов. Таким образом, если вы хотите импортировать 10 000 000 документов, лучше всего 10 раз выполнить массовый импорт 10 пакетов документов, размер которых соответствует 1 000 000 документов, чем 100 раз выполнить массовый импорт 100 пакетов документов, размер которых соответствует 100 000 документов.
    • Рекомендуется создать экземпляр одного объекта CosmosAsyncClient для всего приложения на одной виртуальной машине, соответствующей конкретному контейнеру Azure Cosmos DB.

    • Это полезно, так как однократное выполнение API массовой операции потребляет большую часть ресурсов ЦП и операций ввода-вывода сети клиентского компьютера. Это происходит при порождении нескольких задач изнутри. Избегайте появления в процессе приложения нескольких параллельных задач, каждая из которых выполняет вызовы API массовой операции. Если один вызов API массовой операции, который выполняется на одной виртуальной машине, не может использовать всю пропускную способность вашего контейнера (если пропускная способность контейнера > 1 млн ЕЗ/с), предпочтительнее создавать отдельные виртуальные машины для одновременного выполнения вызовов API.

    Следующие шаги