إجراء عمليات مجمعة على بيانات Azure Cosmos DB

ينطبق على: واجهة برمجة تطبيقات SQL

يقدم هذا البرنامج التعليمي إرشادات حول إجراء عمليات مجمعة في Azure Cosmos DB Java V4 SDK. يأتي هذا الإصدار من SDK مزوداً بمكتبة المنفذ المجمعة المضمنة. إذا كنت تستخدم إصداراً أقدم من Java SDK، فمن المستحسن الانتقال إلى أحدث إصدار. Azure Cosmos DB Java V4 SDK هو الحل الحالي الموصى به لدعم Java بالجملة.

حالياً، يتم دعم مكتبة مُنفذ المجموعة بواسطة حسابات واجهة برمجة التطبيقاتAzure Cosmos DB SQL، وحسابات واجهة برمجة التطبيقات Gremlin فقط. للتعرف على استخدام مكتبة NET. للمنفذ المجمع مع حسابات واجهة برمجة التطبيقات Gremlin، راجع ⁧⁩إجراء عمليات مجمعة في واجهة برمجة التطبيقات Azure Cosmos DB Gremlin⁧⁩.

المتطلبات الأساسية

استنساخ نموذج التطبيق

الآن دعنا ننتقل إلى العمل باستخدام التعليمات البرمجية عن طريق تنزيل مستودع عينات عام لـ Java V4 SDK لـ Azure Cosmos DB من GitHub. تؤدي نماذج التطبيقات هذه عمليات CRUD وعمليات أخرى شائعة على Azure Cosmos DB. لاستنساخ المستودع، افتح موجه الأوامر، وانتقل إلى الدليل حيث تريد نسخ التطبيق وتشغيل الأمر التالي:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

يحتوي المستودع المستنسخ على نموذج SampleBulkQuickStartAsync.java في المجلد /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. ينشئ التطبيق المستندات وينفذ العمليات لإنشاء العناصر بشكل مجمّع، ورفعها، واستبدالها، وحذفها في Azure Cosmos DB. في الأقسام التالية، سنراجع التعليمة البرمجية في نموذج التطبيق.

التنفيذ المجمع في Azure Cosmos DB

  1. تتم قراءة سلاسل اتصال Azure Cosmos DB كوسائط، ويتم تخصيصها للمتغيرات المحددة في ملف /examples/common/AccountSettings.java. يجب تعيين متغيرات البيئة هذه
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

لتشغيل العينة المجمعة، حدد فئتها الرئيسية:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. تتم تهيئة العنصر CosmosAsyncClient باستخدام العبارات التالية:

         client = new CosmosClientBuilder().endpoint(AccountSettings.HOST).key(AccountSettings.MASTER_KEY)
                 .preferredRegions(preferredRegions).contentResponseOnWriteEnabled(true)
                 .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
  2. يقوم النموذج بإنشاء قاعدة بيانات وحاوية غير متزامنة. ثم يقوم بإنشاء مستندات متعددة سيتم تنفيذ عمليات مجمعة عليها. يضيف هذا المستندات إلى عنصر دفق تفاعلي Flux<Family>:

        createDatabaseIfNotExists();
        createContainerIfNotExists();
    
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
        Family smithFamilyItem = Families.getSmithFamilyItem();
    
        //  Setup family items to create
        Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. يحتوي النموذج على طرق للإنشاء المجمع، والرفع، والاستبدال، والحذف. في كل طريقة، نقوم بتعيين مستندات العائلات في دفق BulkWriter Flux<Family> لاستدعاءات طريقة متعددة في CosmosBulkOperations. تتم إضافة هذه العمليات إلى عنصر دفق تفاعلي آخر Flux<CosmosItemOperation>. ثم يتم تمرير الدفق إلى executeBulkOperations طريقة غير المتزامن container التي أنشأناها في البداية، ويتم تنفيذ العمليات بشكل مجمّع. راجع الطريقة bulkCreateItems أدناه كمثال:

     private void bulkCreateItems(Flux<Family> families) {
         Flux<CosmosItemOperation> cosmosItemOperations =
                 families.map(family -> CosmosBulkOperations.getCreateItemOperation(family,
                         new PartitionKey(family.getLastName())));
         container.executeBulkOperations(cosmosItemOperations).blockLast();
     }
    
  4. توجد أيضاً فئة BulkWriter.java في نفس الدليل مثل التطبيق النموذجي. توضح هذه الفئة كيفية التعامل مع تحديد المعدل (429) وأخطاء المهلة (408) التي قد تحدث أثناء التنفيذ المجمع، وإعادة محاولة هذه العمليات بشكل فعال. يتم تنفيذه في طريقة bulkCreateItemsSimple() في التطبيق.

        private void bulkCreateItemsSimple() {
            Family andersenFamilyItem = Families.getAndersenFamilyItem();
            Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
            CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
            CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
            BulkWriter bulkWriter = new BulkWriter(container);
            bulkWriter.scheduleWrites(andersonItemOperation);
            bulkWriter.scheduleWrites(wakeFieldItemOperation);
            bulkWriter.execute().blockLast();
        }
    
  5. وبالإضافة إلى ذلك، توجد طرق إنشاء مجمعة في العينة توضح كيفية إضافة معالجة الاستجابة وتعيين خيارات التنفيذ:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations =
                families.map(family -> CosmosBulkOperations.getCreateItemOperation(family,
                        new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkOperationResponse.getResponse() == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
                logger.error("The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete successfully with " +
                                "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(),
                        cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse.getStatusCode());
            } else {
                logger.info("Item ID: [{}]  Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(),
                        cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", String.valueOf(cosmosBulkItemResponse.getStatusCode()));
                logger.info("Request Charge: {}", String.valueOf(cosmosBulkItemResponse.getRequestCharge()));
            }
            return Mono.just(cosmosBulkItemResponse);
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
        ImplementationBridgeHelpers
                .CosmosBulkExecutionOptionsHelper
                .getCosmosBulkExecutionOptionsAccessor()
                .setMaxMicroBatchSize(bulkExecutionOptions, 10);
        Flux<CosmosItemOperation> cosmosItemOperations =
                families.map(family -> CosmosBulkOperations.getCreateItemOperation(family,
                        new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    نصائح حول الأداء

    ضع في اعتبارك النقاط التالية؛ للحصول على أداء أفضل عند استخدام مكتبة المنفذ المجمع:

    • للحصول على أفضل أداء، قم بتشغيل التطبيق من جهاز ظاهري من Azure في نفس المنطقة التي توجد بها منطقة كتابة حساب Cosmos DB الخاص بك.

    • لتحقيق إنتاجية أعلى:

      • قم بتعيين حجم كومة الذاكرة المؤقتة لـ JVM إلى عدد كبير بما فيه الكفاية؛ لتجنب أية مشكلة في الذاكرة في معالجة عدد كبير من المستندات. حجم الكومة المقترح: max(3 GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
      • يوجد وقت معالجة مسبقة، والذي ستتمتع بسببه بسعة معالجة أعلى عند تنفيذ عمليات مجمعة بعدد كبير من المستندات. لذا، إذا كنت تريد استيراد 10,000,000 مستند، فإن تشغيل استيراد كميات كبيرة تبلغ 10 مرات على 10 مجموعات من المستندات، فإن كل مستند من الحجم 1,000,000 أفضل من تشغيل استيراد كميات كبيرة تبلغ 100 مرة على 100 مجموعة من المستندات لكل منها بحجم 100,000 مستند.
    • يوصى بإنشاء مثيل لعنصر CosmosAsyncClient واحد للتطبيق بأكمله داخل جهاز ظاهري واحد يتوافق مع حاوية Azure Cosmos محددة.

    • ونظراً لأن تنفيذ واجهة برمجة التطبيقات (API) لعملية واحدة غير مرغوب فيها يستهلك مجموعة كبيرة من وحدة المعالجة المركزية (CPU) الخاصة بجهاز العميل ووحدة الإدخال/ الإخراج المتصلة بالشبكة. يحدث ذلك من خلال إنتاج عدة مهام داخلياً، وتجنب إنتاج عدة مهام متزامنة داخل عملية التطبيق الخاصة بك لكل عملية تنفيذ استدعاءات API للعملية المجمعة. إذا كانت استدعاءات واجهة برمجة التطبيقات لعملية مجمعة واحدة تعمل على جهاز ظاهري واحد غير قادرة على استهلاك سعة الحاوية بالكامل (إذا كان معدل نقل الحاوية > مليون وحدة طلب/ثانية)، فمن الأفضل إنشاء أجهزة ظاهرية منفصلة لتنفيذ عمليات مجمعة في نفس وقت استدعاءات واجهة برمجة التطبيقات.

    الخطوات التالية