Azure Cosmos DB のデータに対して一括操作を実行する

適用対象: NoSQL

このチュートリアルでは、Azure Cosmos DB Java V4 SDK で一括操作を実行する手順について説明します。 このバージョンの SDK には、バルク エグゼキューター ライブラリが組み込まれています。 以前のバージョンの Java SDK を使用している場合は、最新バージョンへの移行をお勧めします。 Java の一括サポートに対して現在推奨されるソリューションは、Azure Cosmos DB Java V4 SDK です。

現在、バルク エグゼキューター ライブラリは、Azure Cosmos DB for NoSQL および Gremlin 用 API アカウントによってのみサポートされています。 Gremlin 用 API でのバルク エグゼキューター .NET ライブラリの使用について詳しくは、Azure Cosmos DB for Gremlin での一括操作の実行に関するページを参照してください。

前提条件

サンプル アプリケーションの複製

次に、Java V4 SDK for Azure Cosmos DB 用の汎用サンプル リポジトリを GitHub からダウンロードして、コードの操作に切り替えましょう。 これらのサンプル アプリケーションでは、Azure Cosmos DB に対する CRUD 操作や他の一般的な操作が実行されます。 リポジトリをクローンするために、コマンド プロンプトを開き、コピー先のディレクトリに移動し、次のコマンドを実行します。

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

クローンされたリポジトリの /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async フォルダーにはサンプル SampleBulkQuickStartAsync.java が含まれています。 アプリケーションによってドキュメントが生成され、Azure Cosmos DB 内のアイテムの作成、アップサート、置換、削除を一括して行う操作が実行されます。 以下のセクションでは、サンプル アプリのコードを確認します。

Azure Cosmos DB での一括実行

  1. Azure Cosmos DB の接続文字列は、引数として読み取られ、/examples/common/AccountSettings.java ファイルで定義されている変数に代入されます。 次の環境変数を、
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

一括サンプルを実行するには、メイン クラスを指定します。

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. CosmosAsyncClient オブジェクトは、以下のステートメントを使って初期化されます。

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. このサンプルでは、非同期データベースとコンテナーを作成します。 その後、一括操作が実行される複数のドキュメントを作成します。 これらのドキュメントは、Flux<Family> リアクティブ ストリーム オブジェクトに追加されます。

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. サンプルには、一括作成、アップサート、置換、削除のためのメソッドが含まれます。 各メソッドでは、BulkWriter Flux<Family> ストリーム内のファミリ ドキュメントを、CosmosBulkOperations の複数のメソッド呼び出しにマップします。 これらの操作は、別のリアクティブ ストリーム オブジェクト Flux<CosmosItemOperation> に追加されます。 その後、ストリームは最初に作成した非同期 containerexecuteBulkOperations メソッドに渡され、操作が一括で実行されます。 例として、以下の bulk create メソッドを参照してください。

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. サンプル アプリケーションと同じディレクトリに BulkWriter.java クラスもあります。 このクラスでは、一括実行中に発生する可能性があるレート制限 (429) エラーとタイムアウト (408) エラーを処理し、それらの操作を効果的に再試行する方法が示されています。 下のメソッドに実装されていますが、ローカルとグローバルのスループット コントロールを実装する方法もわかります。

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. さらに、サンプルには、応答処理を追加し、実行オプションを設定する方法を示す一括作成メソッドがあります。

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    パフォーマンスに関するヒント

    Bulk Executor ライブラリを使用する場合は、パフォーマンスを向上させるために、次の点を考慮してください。

    • パフォーマンスを最大限に高めるには、Azure Cosmos DB アカウントの書き込みリージョンと同じリージョンにある Azure VM からアプリケーションを実行します。

    • 高いスループットを実現するには、次のようにします。

      • 多数のドキュメントを処理するときのメモリの問題を回避するため、JVM のヒープ サイズを十分に大きい値に設定します。 推奨されるヒープ サイズ: max(3 GB, 3 * sizeof(1 回のバッチで一括インポート API に渡されるすべてのドキュメント))。
      • 前処理にかかる時間があるため、多数のドキュメントで一括操作を実行すると、スループットが高くなります。 そのため、10,000, 000 個のドキュメントをインポートする場合、100,000 ドキュメントずつ 100 回の一括インポートを実行するより、1,000,000 ドキュメントずつ 10 回の一括インポートを実行する方が効率的です。
    • 特定の Azure Cosmos DB コンテナーに対応する単一の仮想マシン内でアプリケーション全体に対して 1 つの CosmosAsyncClient オブジェクトをインスタンス化することをお勧めします。

    • 1 つの一括操作 API 実行でクライアント マシンの CPU とネットワーク IO が大量に消費されます。 これは、内部的に複数のタスクを生成することで、一括操作 API 呼び出しを実行するたびにアプリケーション プロセス内で複数の同時実行タスクが生成されないようにするためです。 単一の仮想マシンで実行される 1 つの一括操作 API 呼び出しでコンテナー全体のスループットを消費できない場合 (コンテナーのスループットが 100 万 RU/秒を超える場合)、別個の仮想マシンを作成して、一括操作 API 呼び出しを同時に実行することをお勧めします。

    次のステップ