Wykonywanie operacji zbiorczych na danych usługi Azure Cosmos DB

DOTYCZY: NoSQL

Ten samouczek zawiera instrukcje dotyczące wykonywania operacji zbiorczych w zestawie SDK java usługi Azure Cosmos DB w wersji 4. Ta wersja zestawu SDK jest dostarczana z wbudowaną biblioteką funkcji wykonawczej operacji zbiorczych. Jeśli używasz starszej wersji zestawu Java SDK, zaleca się przeprowadzenie migracji do najnowszej wersji. Zestaw SDK języka Java w wersji 4 usługi Azure Cosmos DB jest bieżącym zalecanym rozwiązaniem do obsługi zbiorczej języka Java.

Obecnie biblioteka funkcji wykonawczej operacji zbiorczych jest obsługiwana tylko przez usługę Azure Cosmos DB dla baz danych NoSQL i interfejsu API dla kont języka Gremlin. Aby dowiedzieć się więcej na temat korzystania z biblioteki funkcji wykonawczej operacji zbiorczych platformy .NET z interfejsem API dla języka Gremlin, zobacz wykonywanie operacji zbiorczych w usłudze Azure Cosmos DB dla języka Gremlin.

Wymagania wstępne

  • Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.

  • Możesz bezpłatnie wypróbować usługę Azure Cosmos DB bez subskrypcji platformy Azure, bezpłatnie i zobowiązań. Możesz też użyć emulatora usługi Azure Cosmos DB z https://localhost:8081 punktem końcowym. Klucz podstawowy został podany w sekcji Uwierzytelnianie żądań.

  • Zestaw Java Development Kit (JDK) 1.8+

    • W systemie Ubuntu uruchom polecenie apt-get install default-jdk, aby zainstalować zestaw JDK.

    • Upewnij się, że zmienna środowiskowa JAVA_HOME wskazuje folder, w którym zainstalowano zestaw JDK.

  • Pobierz i zainstaluj archiwum binarne Maven

    • W systemie Ubuntu możesz uruchomić polecenie apt-get install maven, aby zainstalować narzędzie Maven.
  • Utwórz konto usługi Azure Cosmos DB for NoSQL, wykonując kroki opisane w sekcji Tworzenie konta bazy danych w artykule Szybki start dla języka Java.

Klonowanie przykładowej aplikacji

Teraz przejdźmy do pracy z kodem, pobierając ogólne repozytorium przykładów dla zestawu SDK języka Java w wersji 4 dla usługi Azure Cosmos DB z usługi GitHub. Te przykładowe aplikacje wykonują operacje CRUD i inne typowe operacje w usłudze Azure Cosmos DB. Aby sklonować repozytorium, otwórz wiersz polecenia, przejdź do katalogu, w którym chcesz skopiować aplikację i uruchom następujące polecenie:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

Sklonowane repozytorium zawiera przykład SampleBulkQuickStartAsync.java w folderze /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async . Aplikacja generuje dokumenty i wykonuje operacje zbiorcze tworzenia, upsert, zastępowania i usuwania elementów w usłudze Azure Cosmos DB. W następnych sekcjach przejrzymy kod w przykładowej aplikacji.

Zbiorcze wykonywanie w usłudze Azure Cosmos DB

  1. Parametry połączenia usługi Azure Cosmos DB są odczytywane jako argumenty i przypisywane do zmiennych zdefiniowanych w pliku /examples/common/AccountSettings.java. Te zmienne środowiskowe muszą być ustawione
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Aby uruchomić przykład zbiorczy, określ jej klasę główną:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. Obiekt CosmosAsyncClient jest inicjowany przy użyciu następujących instrukcji:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. Przykład tworzy asynchroniową bazę danych i kontener. Następnie tworzy wiele dokumentów, na których będą wykonywane operacje zbiorcze. Dodaje te dokumenty do obiektu reaktywnego strumienia Flux<Family> :

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. Przykład zawiera metody tworzenia zbiorczego, upsert, zastępowania i usuwania. W każdej metodzie mapujemy dokumenty rodzin w strumieniu BulkWriter Flux<Family> na wiele wywołań metody w pliku CosmosBulkOperations. Te operacje są dodawane do innego obiektu Flux<CosmosItemOperation>reaktywnego strumienia . Strumień jest następnie przekazywany do executeBulkOperations metody asynchronicznej container utworzonej na początku, a operacje są wykonywane zbiorczo. Zobacz metodę tworzenia zbiorczego poniżej jako przykład:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Istnieje również klasa BulkWriter.java w tym samym katalogu co przykładowa aplikacja. Ta klasa pokazuje, jak obsługiwać błędy ograniczania szybkości (429) i limitu czasu (408), które mogą wystąpić podczas wykonywania zbiorczego i skutecznie ponawiać te operacje. Jest on implementowany w poniższych metodach, pokazując również, jak zaimplementować lokalną i globalną kontrolę przepływności.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Ponadto w przykładzie istnieją metody tworzenia zbiorczego, które ilustrują sposób dodawania przetwarzania odpowiedzi i ustawiania opcji wykonywania:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    Wskazówki dotyczące wydajności

    Podczas korzystania z biblioteki funkcji wykonawczej operacji zbiorczych należy wziąć pod uwagę następujące kwestie:

    • Aby uzyskać najlepszą wydajność, uruchom aplikację z maszyny wirtualnej platformy Azure w tym samym regionie co region zapisu konta usługi Azure Cosmos DB.

    • Aby uzyskać większą przepływność:

      • Ustaw rozmiar sterty JVM na wystarczającą liczbę, aby uniknąć problemów z pamięcią w obsłudze dużej liczby dokumentów. Sugerowany rozmiar sterty: max(3 GB, 3 * sizeof(wszystkie dokumenty przekazane do interfejsu API importu zbiorczego w jednej partii)).
      • Istnieje czas przetwarzania wstępnego, z powodu którego uzyskasz większą przepływność podczas wykonywania operacji zbiorczych z dużą liczbą dokumentów. Dlatego jeśli chcesz zaimportować 10 000 000 dokumentów, uruchomienie importu zbiorczego 10 razy na 10 zbiorczych dokumentach o rozmiarze 1000 000 jest preferowane niż uruchamianie importu zbiorczego 100 razy na 100 zbiorczych dokumentów o każdym rozmiarze 100 000 dokumentów.
    • Zaleca się utworzenie wystąpienia pojedynczego obiektu CosmosAsyncClient dla całej aplikacji w ramach jednej maszyny wirtualnej, która odpowiada określonemu kontenerowi usługi Azure Cosmos DB.

    • Ponieważ pojedyncze wykonanie interfejsu API operacji zbiorczej zużywa duży fragment procesora CPU i operacji we/wy sieci komputera klienckiego. Dzieje się tak, tworząc wewnętrznie wiele zadań, unikając zduplikowania wielu współbieżnych zadań w ramach procesu aplikacji, z których każde wykonuje wywołania interfejsu API operacji zbiorczej. Jeśli pojedynczy interfejs API operacji zbiorczej wywołuje uruchomione na jednej maszynie wirtualnej, nie może korzystać z przepływności całego kontenera (jeśli przepływność > kontenera wynosi 1 milion RU/s), zaleca się utworzenie oddzielnych maszyn wirtualnych w celu współbieżnego wykonywania wywołań interfejsu API operacji zbiorczej.

    Następne kroki

    • Aby zapoznać się z omówieniem funkcji funkcji wykonawczej operacji zbiorczych, zobacz Omówienie funkcji wykonawczej operacji zbiorczych.