قم بتغيير معالج موجز التغيير في Azure Cosmos DB

ينطبق على: واجهة برمجة تطبيقات SQL

يعد معالج موجز التغيير جزءاً من Azure Cosmos DB SDK V3. إنه يبسط عملية قراءة موجز التغيير وتوزيع معالجة الحدث عبر مستهلكين متعددين بشكل فعال.

تتمثل الفائدة الرئيسية لمكتبة معالج موجز التغيير في سلوكها المتسامح مع الأخطاء الذي يضمن تسليم جميع الأحداث في موجز التغيير "مرة واحدة على الأقل".

مكونات معالج موجز التغيير

هناك أربعة مكونات رئيسية لتنفيذ معالج موجز التغيير:

  1. الحاوية المراقبة: تحتوي الحاوية المراقبة على البيانات التي تم إنشاء موجز التغيير منها. تنعكس أي إدخالات وتحديثات للحاوية المراقبة في موجز التغيير الخاص بالحاوية.

  2. حاوية الإيجار: تعمل حاوية الإيجار كمخزن للدولة وتنسق معالجة موجز التغيير عبر العديد من العمال. يمكن تخزين حاوية الإيجار في نفس الحساب مثل الحاوية المراقبة أو في حساب منفصل.

  3. مثيل الحساب : يستضيف مثيل الحساب معالج خلاصة التغيير للاستماع إلى التغييرات. اعتمادًا على المنصة، يمكن تمثيلها بواسطة VM، و kubernetes pod، و Azure App Service، وآلة مادية فعلية. يحتوي على معرف فريد يشار إليه باسم اسم الحالة في جميع أنحاء هذه المقالة.

  4. المفوض: المفوض هو الكود الذي يحدد ما تريد، بصفتك المطور، أن تفعله مع كل دفعة من التغييرات التي يقرأها معالج موجز التغيير.

لفهم كيفية عمل هذه العناصر الأربعة لمعالج موجز التغيير معاً، دعنا نلقي نظرة على مثال في الرسم التخطيطي التالي. تخزن الحاوية المراقبة المستندات وتستخدم "المدينة" كمفتاح القسم. نرى أن قيم مفتاح التقسيم موزعة في نطاقات (كل نطاق يمثل قسمًا ماديًا ) يحتوي على عناصر. هناك حالتين حسابيتين ويقوم معالج الخلاصة بالتغيير بتعيين نطاقات مختلفة لكل حالة لزيادة توزيع الحوسبة، ويكون لكل حالة اسم مميز ومختلف. تتم قراءة كل نطاق بالتوازي ويتم الحفاظ على تقدمه بشكل منفصل عن النطاقات الأخرى في حاوية الإيجار. تمثل عقود الإيجار الحالة الحالية لمعالج التغذية بالتغيير.

Change feed processor example

تنفيذ معالج موجز التغيير

دائماً ما تكون نقطة الدخول هي الحاوية المراقبة، من Containerمثيل تستدعيهGetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

حيث إن المعلمة الأولى هي اسم مميز يصف الهدف من هذا المعالج والاسم الثاني هو تنفيذ المفوض الذي سيتعامل مع التغييرات.

مثال على المندوب سيكون:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

بعد ذلك، تقوم بتعريف اسم مثيل الحساب أو المعرف المميز باستخدام WithInstanceName، يجب أن يكون هذا مميزاً ومختلفًا في كل مثيل حساب تقوم بنشره، وأخيرًا الحاوية التي تحافظ على حالة الإيجار مع WithLeaseContainer.

سيمنحك الاتصال بـ Buildمثيل المعالج الذي يمكنك البدء من خلال استدعاءStartAsync.

دورة حياة المعالجة

دورة الحياة العادية لمثيل المضيف هي:

  1. اقرأ موجز التغيير.
  2. إذا لم تكن هناك تغييرات، فاستمر في النوم لفترة محددة مسبقاً (قابلة للتخصيص باستخدام WithPollIntervalفي Builder) وانتقل إلى رقم 1.
  3. إذا كانت هناك تغييرات، فأرسلها إلى المفوض.
  4. عندما ينتهي المفوض من معالجة التغييرات بنجاح، قم بتحديث مخزن الإيجار بآخر نقطة تمت معالجتها في الوقت المناسب وانتقل إلى رقم 1.

معالجة الأخطاء

معالج موجز التغيير قابل للتعامل مع أخطاء كود المستخدم. هذا يعني أنه إذا كان تنفيذ المفوض الخاص بك يحتوي على استثناء غير معالج (الخطوة رقم 4)، فسيتم إيقاف معالجة مؤشر الترابط لتلك المجموعة المعينة من التغييرات، وسيتم إنشاء مؤشر ترابط جديد. سيتحقق مؤشر الترابط الجديد من أحدث نقطة في الوقت الذي يمتلكه متجر التأجير لهذا النطاق من قيم مفتاح القسم، ثم إعادة التشغيل من هناك، وإرسال مجموعة التغييرات نفسها إلى المفوض بشكل فعال. سيستمر هذا السلوك حتى يقوم مندوبك بمعالجة التغييرات بشكل صحيح وهذا هو السبب في أن معالج خلاصة التغيير لديه ضمان "مرة واحدة على الأقل ".

ملاحظة

يوجد سيناريو واحد فقط حيث لن تتم إعادة محاولة مجموعة من التغييرات. إذا حدث الفشل في أول تنفيذ للمفوض على الإطلاق، فلن يكون لمخزن التأجير حالة محفوظة سابقة لاستخدامها في إعادة المحاولة. في هذه الحالات، قد تستخدم إعادة المحاولة تهيئة البداية الأولية، والتي قد تتضمن أو لا تتضمن الدفعة الأخيرة.

لمنع معالج موجز التغيير من "التعطل" باستمرار إعادة محاولة نفس مجموعة التغييرات، يجب إضافة منطق في رمز المفوض لكتابة المستندات، عند الاستثناء، إلى قائمة انتظار الرسائل المهملة. يضمن هذا التصميم أنه يمكنك تتبع التغييرات غير المعالجة بينما تظل قادراً على الاستمرار في معالجة التغييرات المستقبلية. قد تكون قائمة انتظار الرسائل المهملة حاوية أخرى من طراز Cosmos. لا يهم مخزن البيانات الدقيقة، ببساطة أن يتم استمرار التغييرات غير المجهزة.

بالإضافة إلى ذلك، يمكنك استخدام مقدر خلاصة التغيير لرصد التقدم المحرز في حالات معالج خلاصة التغيير أثناء قراءتهم خلاصة التغيير أو استخدام إشعارات دورة الحياة للكشف عن الأعطال الأساسية.

إخطارات دورة الحياة

يتيح لك معالج تغذية التغيير ربطك بالأحداث ذات الصلة في دورة حياتها ، ويمكنك اختيار أن يتم إخطارك بواحد منها أو جميعها. التوصية هي تسجيل إشعار الخطأ على الأقل:

  • قم بتسجيل معالج WithLeaseAcquireNotification ليتم إخطاره عندما يحصل المضيف الحالي على عقد إيجار لبدء معالجته.
  • قم بتسجيل معالج WithLeaseReleaseNotification ليتم إخطاره عندما يصدر المضيف الحالي عقد إيجار ويتوقف عن معالجته.
  • سجل معالجًا WithErrorNotification ليتم إخطاره عندما يواجه المضيف الحالي استثناءً أثناء المعالجة، أو يكون قادرًا على التمييز بين ما إذا كان المصدر هو مندوب المستخدم (استثناء غير مُعَالج) أو خطأ يواجهه المعالج في محاولة الوصول إلى الحاوية المراقبة (على سبيل المثال، مشكلات الشبكة).
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

وحدة الانتشار

تتكون وحدة نشر معالج موجز التغيير الفردي من مثيلات حساب واحد أو أكثر بنفس processorNameوتكوين حاوية الإيجار. يمكن أن يكون لديك العديد من وحدات النشر حيث يكون لكل منها تدفق أعمال مختلف للتغييرات وتتكون كل وحدة نشر من مثيل واحد أو أكثر.

على سبيل المثال، قد يكون لديك وحدة نشر واحدة تقوم بتشغيل واجهة برمجة تطبيقات خارجية في أي وقت يحدث تغيير في الحاوية الخاصة بك. قد تقوم وحدة نشر أخرى بنقل البيانات، في الوقت الفعلي، في كل مرة يحدث فيها تغيير. عندما يحدث تغيير في الحاوية المراقبة الخاصة بك، سيتم إعلام جميع وحدات النشر الخاصة بك.

تحجيم ديناميكي

كما ذكرنا من قبل، داخل وحدة النشر، يمكن أن يكون لديك واحد أو أكثر من حالات الحساب. للاستفادة من توزيع الحوسبة داخل وحدة النشر، فإن المتطلبات الأساسية الوحيدة هي:

  1. يجب أن تحتوي جميع المثيلات على نفس تكوين حاوية الإيجار.
  2. يجب أن يكون لجميع المثيلات نفس processorName.
  3. يجب أن يكون لكل مثيل اسم مثيل مختلف (WithInstanceName).

إذا تم تطبيق هذه الشروط الثلاثة، فسيقوم معالج موجز التغيير، باستخدام خوارزمية توزيع متساوٍ، بتوزيع جميع الإيجارات في حاوية الإيجار عبر جميع مثيلات التشغيل لوحدة النشر تلك وموازنة الحساب. لا يمكن امتلاك عقد إيجار واحد إلا من خلال مثيل واحد في وقت معين، وبالتالي فإن الحد الأقصى لعدد الحالات يساوي عدد الإيجارات.

يمكن أن ينمو عدد المثيلات ويتقلص، وسيقوم معالج موجز التغيير بضبط الحمل ديناميكياً عن طريق إعادة التوزيع وفقاً لذلك.

علاوة على ذلك، يمكن أن يتكيف معالج موجز التغيير ديناميكياً مع مقياس الحاويات بسبب زيادة الإنتاجية أو التخزين. عندما تنمو الحاوية الخاصة بك، يتعامل معالج موجز التغيير بشفافية مع هذه السيناريوهات عن طريق زيادة الإيجارات ديناميكياً وتوزيع الإيجارات الجديدة بين المثيلات الحالية.

موجز التغيير والإنتاجية المقدمة

سيستهلك تغيير عمليات قراءة الخلاصة على الحاوية المراقبة وحدات الطلب . تأكد من أن الحاوية التي تتم مراقبتها لا تعاني من التضييق، وإلا ستواجه تأخيرات في تلقي أحداث خلاصة التغيير على المعالجات خاصتك.

تستهلك العمليات على حاوية الإيجار (تحديث وصيانة الحالة) وحدات الطلب . وكلما ارتفع عدد الحالات التي تستخدم نفس حاوية الإيجار، زاد استهلاك وحدات الطلب المحتملة. تأكد من أن حاوية الإيجار خاصتك لا تعاني من التضييق، وإلا ستواجه تأخيرات في تلقي أحداث خلاصة التغيير على المعالجات خاصتك، وفي بعض الحالات التي يكون فيها الاختناق مرتفعًا، قد تتوقف المعالجات عن المعالجة تمامًا.

وقت البدء

بشكل افتراضي، عندما يبدأ معالج موجز التغيير في المرة الأولى، فإنه سيهيئ حاوية الإيجار، ويبدأ دورة حياة المعالجة. لن يتم اكتشاف أي تغييرات حدثت في الحاوية المراقبة قبل تهيئة معالج موجز التغيير للمرة الأولى.

القراءة من تاريخ ووقت سابقين

من الممكن تهيئة معالج موجز التغيير لقراءة التغييرات التي تبدأ في تاريخ ووقت محددين، عن طريق تمرير مثيل DateTimeإلى WithStartTimeامتداد المنشئ:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

ستتم تهيئة معالج موجز التغيير لهذا التاريخ والوقت المحددين والبدء في قراءة التغييرات التي حدثت بعد ذلك.

ملاحظة

بدء تشغيل معالج موجز التغيير في تاريخ ووقت محددين غير مدعوم في حسابات الكتابة متعددة المناطق.

القراءة من البداية

في سيناريوهات أخرى مثل عمليات ترحيل البيانات أو تحليل السجل الكامل للحاوية، نحتاج إلى قراءة موجز التغيير من بداية عمر الحاوية. للقيام بذلك، يمكننا استخدام WithStartTimeعلى امتداد Builder، ولكن تمريرDateTime.MinValue.ToUniversalTime()، والذي سينشئ تمثيل UTC للحد الأدنى من القيمة DateTime، مثل:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

ستتم تهيئة معالج موجز التغيير وبدء قراءة التغييرات من بداية عمر الحاوية.

ملاحظة

تعمل خيارات التخصيص هذه فقط لإعداد نقطة البداية في وقت معالج موجز التغيير. بمجرد تهيئة حاوية عقود الإيجار لأول مرة، لن يكون لتغييرها أي تأثير.

مشاركة حاوية الإيجار

يمكنك مشاركة حاوية الإيجار عبر وحدات نشر متعددة، حيث تستمع كل وحدة نشر إلى حاوية مراقبة مختلفة أو لديها processorName مختلفة. وبهذا التشكيل، ستحتفظ كل وحدة من وحدات النشر بحالة مستقلة على حاوية الإيجار. راجع طلب استهلاك الوحدة على حاوية الإيجار للتأكد من أن معدل النقل المقدم كافي لجميع وحدات النشر.

أين تستضيف معالج موجز التغيير

يمكن استضافة معالج موجز التغيير في أي نظام أساسي يدعم العمليات أو المهام طويلة الأمد:

بينما يمكن تشغيل معالج موجز التغيير في بيئات قصيرة العمر، نظراً لأن حاوية التأجير تحافظ على الحالة، فإن دورة بدء تشغيل هذه البيئات ستضيف تأخيراً لتلقي الإخطارات (بسبب الحمل الزائد لبدء المعالج في كل مرة يتم فيها بدء تشغيل البيئة).

الموارد الإضافية

الخطوات التالية

يمكنك الآن المتابعة لمعرفة المزيد حول معالج موجز التغيير في المقالات التالية: