Tömeges műveletek végrehajtása az Azure Cosmos DB-adatokon

A KÖVETKEZŐRE VONATKOZIK: NoSQL

Ez az oktatóanyag útmutatást nyújt a tömeges műveletek azure Cosmos DB Java V4 SDK-ban való végrehajtásához. Az SDK ezen verziója beépített tömeges végrehajtói kódtárat tartalmaz. Ha a Java SDK régebbi verzióját használja, javasoljuk, hogy a legújabb verzióra migráljon. Az Azure Cosmos DB Java V4 SDK a Java tömeges támogatásának jelenleg ajánlott megoldása.

A tömeges végrehajtói kódtárat jelenleg csak az Azure Cosmos DB for NoSQL és a Gremlin-fiókokhoz készült API támogatja. A tömeges végrehajtó .NET-kódtár gremlin API-val való használatáról a Gremlinhez készült Azure Cosmos DB tömeges műveleteinek végrehajtásával kapcsolatban olvashat.

Előfeltételek

  • Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.

  • Ingyenesen kipróbálhatja az Azure Cosmos DB-t Azure-előfizetés nélkül, ingyenesen és kötelezettségvállalásokkal. Vagy használhatja az Azure Cosmos DB Emulatort a https://localhost:8081 végponttal. Az elsődleges kulcs a Kérelmek hitelesítése című részben található.

  • Java fejlesztői készlet (JDK) 1.8+

    • Ubuntu rendszeren futtassa az apt-get install default-jdk parancsot a JDK telepítéséhez.

    • Ügyeljen arra, hogy a JAVA_HOME környezeti változó arra a mappára mutasson, ahová a JDK telepítve lett.

  • Maven bináris archívum letöltése és telepítése

    • Ubuntu rendszeren futtathatja az apt-get install maven parancsot a Maven telepítéséhez.
  • Hozzon létre egy Azure Cosmos DB for NoSQL-fiókot a Java rövid útmutatójának adatbázisfiók létrehozása szakaszában leírt lépések végrehajtásával.

A mintaalkalmazás klónozása

Most váltsunk a kód használatára egy általános mintaadattár letöltésével az Azure Cosmos DB-hez készült Java V4 SDK-hoz a GitHubról. Ezek a mintaalkalmazások CRUD-műveleteket és más gyakori műveleteket hajtanak végre az Azure Cosmos DB-ben. Az adattár klónozásához nyisson meg egy parancssort, keresse meg azt a könyvtárat, ahová másolni szeretné az alkalmazást, és futtassa a következő parancsot:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

A klónozott adattár egy mintát SampleBulkQuickStartAsync.java tartalmaz a /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async mappában. Az alkalmazás dokumentumokat hoz létre, és műveleteket hajt végre az Azure Cosmos DB-ben lévő elemek tömeges létrehozásához, frissítéséhez, cseréjéhez és törléséhez. A következő szakaszokban áttekintjük a mintaalkalmazás kódját.

Tömeges végrehajtás az Azure Cosmos DB-ben

  1. Az Azure Cosmos DB kapcsolati sztring argumentumként olvashatók, és a fájlbanexamples/common/AccountSettings.java meghatározott változókhoz vannak rendelve. Ezeket a környezeti változókat be kell állítani
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

A tömeges minta futtatásához adja meg annak fő osztályát:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. Az CosmosAsyncClient objektum inicializálása a következő utasítások használatával történik:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. A minta létrehoz egy aszinkron adatbázist és tárolót. Ezután több dokumentumot hoz létre, amelyeken tömeges műveleteket hajt végre. Ezeket a dokumentumokat hozzáadja egy Flux<Family> reaktív streamobjektumhoz:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. A minta metódusokat tartalmaz a tömeges létrehozáshoz, a frissítéshez, a csere és a törléshez. Minden metódusban a BulkWriter Flux<Family> streamben lévő családdokumentumokat több metódushívásra képezzük le CosmosBulkOperations. Ezek a műveletek egy másik reaktív streamobjektumhoz Flux<CosmosItemOperation>lesznek hozzáadva. Ezt követően a rendszer átadja a streamet az executeBulkOperations elején létrehozott aszinkron container metódusnak, és a műveletek tömegesen lesznek végrehajtva. Példaként lásd az alábbi tömeges létrehozási módszert:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Egy osztály BulkWriter.java is ugyanabban a könyvtárban található, mint a mintaalkalmazás. Ez az osztály bemutatja, hogyan kezelhetők a sebességkorlátozási (429) és időtúllépési (408) hibák, amelyek tömeges végrehajtás során fordulhatnak elő, és hogyan lehet hatékonyan újrapróbálkozni. Az alábbi módszerekben implementálva, a helyi és a globális átviteli sebesség szabályozásának implementálását is bemutatja.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Emellett a mintában tömeges létrehozási módszerek is találhatók, amelyek bemutatják a válaszfeldolgozás hozzáadását és a végrehajtási beállítások beállítását:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    Teljesítménnyel kapcsolatos tippek

    Fontolja meg a következő szempontokat a nagyobb teljesítmény érdekében a tömeges végrehajtói kódtár használata esetén:

    • A legjobb teljesítmény érdekében futtassa az alkalmazást egy Azure-beli virtuális gépről ugyanabban a régióban, mint az Azure Cosmos DB-fiók írási régiója.

    • Magasabb átviteli sebesség elérése érdekében:

      • Állítsa a JVM halomméretét elegendő számúra, hogy elkerülje a sok dokumentum kezelése során felmerülő memóriaproblémát. Javasolt halomméret: max(3 GB, 3 * sizeof(az összes dokumentum, amely egy kötegben a tömeges importálási API-nak lett átadva)).
      • Van egy előfeldolgozási idő, amely miatt nagyobb átviteli sebességre lesz szüksége, amikor nagy számú dokumentummal végez tömeges műveleteket. Ha tehát 10 000 000 dokumentumot szeretne importálni, akkor a 10-szeres tömeges importálást 10 alkalommal, 10-szer 100 000 000-es méretben kell futtatni, mint a 100 000-es méretű dokumentumok 100-szeres tömeges importálását.
    • Ajánlott egyetlen CosmosAsyncClient objektumot példányosítani a teljes alkalmazáshoz egyetlen virtuális gépen belül, amely egy adott Azure Cosmos DB-tárolónak felel meg.

    • Mivel egyetlen tömeges műveleti API-végrehajtás az ügyfélszámítógép PROCESSZOR- és hálózati I/O-jának nagy részét használja fel. Ez több feladat belső ívásával történik, így elkerülheti, hogy az alkalmazásfolyamaton belül egyszerre több feladat is létrejönjön, és tömeges műveleti API-hívásokat hajt végre. Ha egyetlen virtuális gépen futó egyetlen tömeges műveleti API-hívás nem tudja felhasználni a teljes tároló átviteli sebességét (ha a tároló átviteli sebessége > 1 millió RU/s), célszerű külön virtuális gépeket létrehozni a tömeges műveleti API-hívások egyidejű végrehajtásához.

    További lépések

    • A tömeges végrehajtói funkciók áttekintését a tömeges végrehajtók áttekintésében tekintheti meg.