إجراء عمليات مجمعة على بيانات Azure Cosmos DB
ينطبق على:
واجهة برمجة تطبيقات SQL
يقدم هذا البرنامج التعليمي إرشادات حول إجراء عمليات مجمعة في Azure Cosmos DB Java V4 SDK. يأتي هذا الإصدار من SDK مزوداً بمكتبة المنفذ المجمعة المضمنة. إذا كنت تستخدم إصداراً أقدم من Java SDK، فمن المستحسن الانتقال إلى أحدث إصدار. Azure Cosmos DB Java V4 SDK هو الحل الحالي الموصى به لدعم Java بالجملة.
حالياً، يتم دعم مكتبة مُنفذ المجموعة بواسطة حسابات واجهة برمجة التطبيقاتAzure Cosmos DB SQL، وحسابات واجهة برمجة التطبيقات Gremlin فقط. للتعرف على استخدام مكتبة NET. للمنفذ المجمع مع حسابات واجهة برمجة التطبيقات Gremlin، راجع إجراء عمليات مجمعة في واجهة برمجة التطبيقات Azure Cosmos DB Gremlin.
المتطلبات الأساسية
إذا لم يكن لديك اشتراك Azure، فأنشئ حساباً مجانياً قبل أن تبدأ.
بدلاً من ذلك، يمكنك تجربة Azure Cosmos DB مجاناً دون اشتراك Azure، مجاناً دون التزامات. أو يمكنك استخدام Azure Cosmos DB Emulator مع
https://localhost:8081نقطة النهاية. يتم توفير المفتاح الأساسي في طلبات المصادقة.-
فيما يتعلق بـ Ubuntu، قم بتشغيل
apt-get install default-jdkلتثبيت JDK.تأكد من تعيين متغير بيئة JAVA_HOME؛ للإشارة إلى المجلد المثبت عليه JDK.
قم بتنزيل، ثم تثبيت أرشيف Maven ثنائي
- على Ubuntu، يمكنك تشغيل
apt-get install maven لتثبيت Maven.
- على Ubuntu، يمكنك تشغيل
أنشئ حساب واجهة برمجة التطبيقات Azure Cosmos DB SQL باستخدام الخطوات الموضحة في قسم إنشاء حساب قاعدة بيانات في مقالة Java quickstart.
استنساخ نموذج التطبيق
الآن دعنا ننتقل إلى العمل باستخدام التعليمات البرمجية عن طريق تنزيل مستودع عينات عام لـ Java V4 SDK لـ Azure Cosmos DB من GitHub. تؤدي نماذج التطبيقات هذه عمليات CRUD وعمليات أخرى شائعة على Azure Cosmos DB. لاستنساخ المستودع، افتح موجه الأوامر، وانتقل إلى الدليل حيث تريد نسخ التطبيق وتشغيل الأمر التالي:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
يحتوي المستودع المستنسخ على نموذج SampleBulkQuickStartAsync.java في المجلد /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. ينشئ التطبيق المستندات وينفذ العمليات لإنشاء العناصر بشكل مجمّع، ورفعها، واستبدالها، وحذفها في Azure Cosmos DB. في الأقسام التالية، سنراجع التعليمة البرمجية في نموذج التطبيق.
التنفيذ المجمع في Azure Cosmos DB
- تتم قراءة سلاسل اتصال Azure Cosmos DB كوسائط، ويتم تخصيصها للمتغيرات المحددة في ملف /
examples/common/AccountSettings.java. يجب تعيين متغيرات البيئة هذه
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
لتشغيل العينة المجمعة، حدد فئتها الرئيسية:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
تتم تهيئة العنصر
CosmosAsyncClientباستخدام العبارات التالية:client = new CosmosClientBuilder().endpoint(AccountSettings.HOST).key(AccountSettings.MASTER_KEY) .preferredRegions(preferredRegions).contentResponseOnWriteEnabled(true) .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();يقوم النموذج بإنشاء قاعدة بيانات وحاوية غير متزامنة. ثم يقوم بإنشاء مستندات متعددة سيتم تنفيذ عمليات مجمعة عليها. يضيف هذا المستندات إلى عنصر دفق تفاعلي
Flux<Family>:createDatabaseIfNotExists(); createContainerIfNotExists(); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); Family johnsonFamilyItem = Families.getJohnsonFamilyItem(); Family smithFamilyItem = Families.getSmithFamilyItem(); // Setup family items to create Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);يحتوي النموذج على طرق للإنشاء المجمع، والرفع، والاستبدال، والحذف. في كل طريقة، نقوم بتعيين مستندات العائلات في دفق BulkWriter
Flux<Family>لاستدعاءات طريقة متعددة فيCosmosBulkOperations. تتم إضافة هذه العمليات إلى عنصر دفق تفاعلي آخرFlux<CosmosItemOperation>. ثم يتم تمرير الدفق إلىexecuteBulkOperationsطريقة غير المتزامنcontainerالتي أنشأناها في البداية، ويتم تنفيذ العمليات بشكل مجمّع. راجع الطريقةbulkCreateItemsأدناه كمثال:private void bulkCreateItems(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).blockLast(); }توجد أيضاً فئة
BulkWriter.javaفي نفس الدليل مثل التطبيق النموذجي. توضح هذه الفئة كيفية التعامل مع تحديد المعدل (429) وأخطاء المهلة (408) التي قد تحدث أثناء التنفيذ المجمع، وإعادة محاولة هذه العمليات بشكل فعال. يتم تنفيذه في طريقةbulkCreateItemsSimple()في التطبيق.private void bulkCreateItemsSimple() { Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().blockLast(); }وبالإضافة إلى ذلك، توجد طرق إنشاء مجمعة في العينة توضح كيفية إضافة معالجة الاستجابة وتعيين خيارات التنفيذ:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> { CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation(); if (cosmosBulkOperationResponse.getException() != null) { logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); } else if (cosmosBulkOperationResponse.getResponse() == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) { logger.error("The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete successfully with " + "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse.getStatusCode()); } else { logger.info("Item ID: [{}] Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName()); logger.info("Status Code: {}", String.valueOf(cosmosBulkItemResponse.getStatusCode())); logger.info("Request Charge: {}", String.valueOf(cosmosBulkItemResponse.getRequestCharge())); } return Mono.just(cosmosBulkItemResponse); }).blockLast(); } private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) { CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); ImplementationBridgeHelpers .CosmosBulkExecutionOptionsHelper .getCosmosBulkExecutionOptionsAccessor() .setMaxMicroBatchSize(bulkExecutionOptions, 10); Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast(); }نصائح حول الأداء
ضع في اعتبارك النقاط التالية؛ للحصول على أداء أفضل عند استخدام مكتبة المنفذ المجمع:
للحصول على أفضل أداء، قم بتشغيل التطبيق من جهاز ظاهري من Azure في نفس المنطقة التي توجد بها منطقة كتابة حساب Cosmos DB الخاص بك.
لتحقيق إنتاجية أعلى:
- قم بتعيين حجم كومة الذاكرة المؤقتة لـ JVM إلى عدد كبير بما فيه الكفاية؛ لتجنب أية مشكلة في الذاكرة في معالجة عدد كبير من المستندات. حجم الكومة المقترح: max(3 GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
- يوجد وقت معالجة مسبقة، والذي ستتمتع بسببه بسعة معالجة أعلى عند تنفيذ عمليات مجمعة بعدد كبير من المستندات. لذا، إذا كنت تريد استيراد 10,000,000 مستند، فإن تشغيل استيراد كميات كبيرة تبلغ 10 مرات على 10 مجموعات من المستندات، فإن كل مستند من الحجم 1,000,000 أفضل من تشغيل استيراد كميات كبيرة تبلغ 100 مرة على 100 مجموعة من المستندات لكل منها بحجم 100,000 مستند.
يوصى بإنشاء مثيل لعنصر CosmosAsyncClient واحد للتطبيق بأكمله داخل جهاز ظاهري واحد يتوافق مع حاوية Azure Cosmos محددة.
ونظراً لأن تنفيذ واجهة برمجة التطبيقات (API) لعملية واحدة غير مرغوب فيها يستهلك مجموعة كبيرة من وحدة المعالجة المركزية (CPU) الخاصة بجهاز العميل ووحدة الإدخال/ الإخراج المتصلة بالشبكة. يحدث ذلك من خلال إنتاج عدة مهام داخلياً، وتجنب إنتاج عدة مهام متزامنة داخل عملية التطبيق الخاصة بك لكل عملية تنفيذ استدعاءات API للعملية المجمعة. إذا كانت استدعاءات واجهة برمجة التطبيقات لعملية مجمعة واحدة تعمل على جهاز ظاهري واحد غير قادرة على استهلاك سعة الحاوية بالكامل (إذا كان معدل نقل الحاوية > مليون وحدة طلب/ثانية)، فمن الأفضل إنشاء أجهزة ظاهرية منفصلة لتنفيذ عمليات مجمعة في نفس وقت استدعاءات واجهة برمجة التطبيقات.
الخطوات التالية
- لمعرفة المزيد حول تفاصيل حزمة maven، وملاحظات الإصدار الخاصة بمكتبة Java القابلة للتنفيذ المجمع، راجع تفاصيل مجموعة برامج تطوير البرامج القابلة للتنفيذ.