A változáscsatorna feldolgozója az Azure Cosmos DB-ben

A KÖVETKEZŐKRE VONATKOZIK: SQL API

A változáscsatorna feldolgozója az Azure Cosmos DB SDK V3 része. Leegyszerűsíti a változáscsatorna olvasásának folyamatát, és hatékonyan osztja el az eseményfeldolgozást több fogyasztó között.

A változáscsatorna feldolgozói kódtárának fő előnye a hibatűrő viselkedése, amely biztosítja a változáscsatorna összes eseményének legalább egyszeri kézbesítését.

A változáscsatorna feldolgozójának összetevői

A változáscsatorna-feldolgozó implementálásának négy fő összetevője van:

  1. A monitorozott tároló: A monitorozott tároló tartalmazza azokat az adatokat, amelyekből létrejön a változáscsatorna. A monitorozott tárolóba való beszúrások és a tároló frissítései megjelennek a tároló változáscsatornájában.

  2. A bérlettároló: A bérlettároló állapottárolóként működik, és több feldolgozó között koordinálja a változáscsatorna feldolgozását. A bérlettároló tárolható ugyanabban a fiókban, mint a monitorozott tároló, de akár külön fiókban is.

  3. A számítási példány: Egy számítási példány üzemelteti a változáscsatorna feldolgozóját a változások figyeléséhez. A platformtól függően egy virtuális gép, egy Kubernetes-pod, egy Azure App Service-példány és egy tényleges fizikai gép képviselheti. A jelen cikkben a példány neveként hivatkozott egyedi azonosítóval rendelkezik.

  4. A delegált: A delegált az a kód, amely meghatározza, hogy Ön, mint fejlesztő mit szeretne tenni azon egyes módosításkötegekkel, amelyeket a változáscsatorna feldolgozója beolvas.

A változáscsatorna-feldolgozó ezen négy elemének együttes működésének további megismeréséhez tekintsünk meg egy példát az alábbi ábrán. A figyelt tároló tárolja a dokumentumokat, és a "City" partíciókulcsot használja. Láthatjuk, hogy a partíciókulcs értékei elemeket tartalmazó tartományokban vannak elosztva (minden egyes fizikai partíciót képviselő tartomány). Két számítási példány van, és a változáscsatorna feldolgozója különböző tartományokat rendel mindegyik példányhoz a számítási elosztás maximalizálása érdekében. Minden példány egyedi és eltérő névvel rendelkezik. A rendszer minden tartományt párhuzamosan olvas be, és a folyamat előrehaladása a bérlettároló többi tartományától elkülönítve marad egy bérletdokumentumon keresztül. A bérletek kombinációja a változáscsatorna feldolgozójának aktuális állapotát jelöli.

Change feed processor example

A változáscsatorna feldolgozójának implementálása

A belépési pont mindig a figyelt tároló egy ön által hívott GetChangeFeedProcessorBuilderpéldánybólContainer:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Ahol az első paraméter egy különálló név, amely leírja a processzor célját, a második pedig a módosításokat kezelő delegált implementáció.

A meghatalmazottak például a következőek lehetnek:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Ezt követően a számítási példány nevét vagy egyedi azonosítót a következővel WithInstanceNamehatározhatja meg: ennek egyedinek és eltérőnek kell lennie minden üzembe helyezett számítási példányban, és végül ez lesz az a tároló, amellyel a bérletállapot WithLeaseContainertartható fenn.

A hívással Build megadhatja a processzorpéldányt, amelyet a hívással StartAsyncindíthat.

Feldolgozási életciklus

A gazdagéppéldány normál életciklusa:

  1. A változáscsatorna olvasása.
  2. Ha nincsenek módosítások, alvó állapot előre meghatározott időtartamra (a WithPollInterval használatával szabható testre a Builderben), majd ugrás az 1. lépésre.
  3. Ha vannak módosítások, küldje el őket a meghatalmazottnak.
  4. Amikor a meghatalmazott sikeresen befejezte a módosítások feldolgozását, frissítse a bérlettárolót a legújabb feldolgozott időponttal, és lépjen az 1. pontra.

Hibakezelés

A változáscsatorna feldolgozója rugalmas a felhasználói kódhibákkal szemben. Ez azt jelenti, hogy ha a delegált implementáció nem kezelt kivétellel (4. lépés) rendelkezik, a rendszer leállítja az adott módosításköteg szálfeldolgozását, és létrehoz egy új szálat. Az új szál ellenőrzi, hogy a bérlettároló melyik időpontban rendelkezik a legújabb időponttal a partíciókulcs-értékek tartományához, majd onnan újraindul, és lényegében ugyanazt a módosításköteget küldi el a delegáltnak. Ez a viselkedés addig folytatódik, amíg a meghatalmazott megfelelően nem dolgozza fel a módosításokat, és ez az oka annak, hogy a változáscsatorna feldolgozója "legalább egyszer" garanciával rendelkezik.

Megjegyzés

Csak egy olyan forgatókönyv létezik, amelyben a rendszer nem próbálkozik újra a módosítások kötegével. Ha a hiba az első delegált végrehajtáskor fordul elő, a bérlettároló nem rendelkezik korábbi mentett állapottal az újrapróbálkozási művelethez. Ezekben az esetekben az újrapróbálkozás a kezdeti indítási konfigurációt használja, amely lehet, hogy az utolsó köteget tartalmazza, vagy nem.

Ha meg szeretné akadályozni, hogy a változáscsatorna feldolgozója folyamatosan újrapróbálkozzon ugyanazzal a módosításkötegtel, vegye fel a logikát a delegált kódba, hogy kivétel nélkül dokumentumokat írjon egy kézbesíthetetlen levelek üzenetsorába. Ez a kialakítás biztosítja, hogy nyomon tudja követni a feldolgozatlan módosításokat, miközben továbbra is folytathatja a jövőbeli módosítások feldolgozását. A kézbesíthetetlen levelek várólistája lehet egy másik Cosmos-tároló. A pontos adattár nem számít, egyszerűen azt, hogy a feldolgozatlan módosítások megmaradnak.

Emellett a változáscsatorna-becslővel monitorozhatja a változáscsatorna feldolgozópéldányainak előrehaladását a változáscsatorna olvasása közben, vagy az életciklus-értesítések használatával észlelheti a mögöttes hibákat.

Életciklus-értesítések

A változáscsatorna feldolgozója lehetővé teszi, hogy az életciklusa releváns eseményeihez csatlakozva értesítést kapjon egy vagy mindegyiknek. Javasoljuk, hogy legalább regisztrálja a hibaértesítést:

  • Regisztráljon egy kezelőt WithLeaseAcquireNotification , hogy értesítést kapjon, amikor az aktuális gazdagép bérletet szerez be a feldolgozás megkezdéséhez.
  • Regisztráljon egy kezelőt WithLeaseReleaseNotification , hogy értesítést kapjon, amikor a jelenlegi gazdagép felszabadít egy bérletet, és leállítja annak feldolgozását.
  • Regisztráljon egy kezelőt WithErrorNotification , hogy értesítést kapjon arról, ha az aktuális gazdagép kivételt észlel a feldolgozás során, és meg tudja különböztetni, hogy a forrás a felhasználó delegáltja (nem kezelt kivétel), vagy egy olyan hiba, amely miatt a feldolgozó megpróbál hozzáférni a figyelt tárolóhoz (például hálózati problémák).
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Üzembe helyezési egység

Egyetlen változáscsatorna-feldolgozó üzembehelyezési egység egy vagy több számítási példányból áll, ugyanazzal processorName a tárolókonfigurációval, de mindegyiknek eltérő a példányneve. Több üzembehelyezési egység is lehet, amelyek mindegyike eltérő üzleti folyamattal rendelkezik a módosításokhoz, és mindegyik üzembehelyezési egység egy vagy több példányból áll.

Előfordulhat például, hogy egy üzembe helyezési egység aktivál egy külső API-t, amikor változás történik a tárolóban. Előfordulhat, hogy egy másik üzembe helyezési egység valós időben helyezi át az adatokat minden alkalommal, amikor változás történik. Ha változás történik a figyelt tárolóban, az összes üzembehelyezési egység értesítést kap.

Dinamikus méretezés

Ahogy korábban említettük, egy üzembe helyezési egységen belül egy vagy több számítási példány is lehet. Az üzembehelyezési egységen belüli számítási elosztás előnyeinek kihasználásához csak a következőkre van szükség:

  1. Minden példánynak ugyanazzal a bérlettároló-konfigurációval kell rendelkeznie.
  2. Minden példánynak ugyanazzal processorNamea példánnyal kell rendelkeznie.
  3. Minden példánynak különböző példánynévvel kell rendelkeznie (WithInstanceName).

Ha ez a három feltétel teljesül, akkor a változáscsatorna feldolgozója egyenlő elosztási algoritmussal elosztja a bérlettárolóban lévő összes bérletet az adott üzembe helyezési egység összes futó példánya között, és párhuzamosítja a számítást. Egy bérletet egyszerre csak egy példány birtokolhat, így a példányok maximális száma megegyezik a bérletek számával.

A példányok száma növekedhet és csökkenhet, és a változáscsatorna feldolgozója dinamikusan módosítja a terhelést ennek megfelelően újraelosztással.

Emellett a változáscsatorna feldolgozója dinamikusan alkalmazkodhat a tárolók skálázásához az átviteli sebesség vagy a tárterület növekedése miatt. A tároló növekedésével a változáscsatorna feldolgozója transzparens módon kezeli ezeket a forgatókönyveket a bérletek dinamikus növelésével és az új bérletek meglévő példányok közötti elosztásával.

Változáscsatorna és kiosztott átviteli sebesség

A figyelt tároló változáscsatorna-olvasási műveletei kérelemegységeket használnak fel. Győződjön meg arról, hogy a monitorozott tároló nem tapasztal szabályozást, ellenkező esetben késések tapasztalhatók a változáscsatorna eseményeinek fogadása során a processzorokon.

A bérlettárolón végzett műveletek (az állapot frissítése és karbantartása) kérelemegységeket használnak fel. Minél több példány használja ugyanazt a bérlettárolót, annál nagyobb lesz a lehetséges kérelemegység-felhasználás. Győződjön meg arról, hogy a bérlettároló nem tapasztal szabályozást, ellenkező esetben késések tapasztalhatók a változáscsatorna-események fogadásában a processzorokon, bizonyos esetekben, amikor a szabályozás magas, a processzorok feldolgozása teljesen leállhat.

Kezdés időpontja

Alapértelmezés szerint amikor egy változáscsatorna feldolgozója először indul el, inicializálja a bérlettárolót, és elindítja a feldolgozási életciklusát. A figyelt tárolóban a változáscsatorna feldolgozójának első inicializálása előtt történt módosítások nem lesznek észlelhetők.

Beolvasás egy korábbi dátumból és időpontból

A változáscsatorna feldolgozóját inicializálhatja úgy, hogy egy adott dátumtól és időponttól kezdve olvassa a módosításokat, ha átad egy példányt DateTime a WithStartTime szerkesztőbővítménynek:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

A változáscsatorna feldolgozója inicializálva lesz az adott dátumhoz és időponthoz, és elkezdi olvasni az utána történt módosításokat.

Megjegyzés

A változáscsatorna feldolgozójának adott időpontban és időpontban történő elindítása nem támogatott a többrégiós írási fiókokban.

Olvasás az elejétől

Más forgatókönyvekben, például az adatmigrálásokban vagy egy tároló teljes előzményeinek elemzésében be kell olvasnunk a változáscsatornát a tároló élettartamának elejétől kezdve. Ehhez használhatjuk WithStartTime a szerkesztőbővítményt, de a továbbítás DateTime.MinValue.ToUniversalTime(), amely a minimális DateTime érték UTC-ábrázolását hozza létre, az alábbihoz hasonlóan:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

A változáscsatorna feldolgozója inicializálva lesz, és a tároló élettartamának kezdetétől kezdve megkezdi a módosítások beolvasását.

Megjegyzés

Ezek a testreszabási lehetőségek csak a változáscsatorna feldolgozójának kezdőpontjának beállítására szolgálnak. A bérlettároló első inicializálása után a módosításuknak nincs hatása.

A bérlettároló megosztása

A bérlettárolót több üzembehelyezési egység között is megoszthatja, mindegyik üzembe helyezési egység egy másik figyelt tárolót figyel, vagy más processorNametárolóval rendelkezik. Ezzel a konfigurációval minden üzembe helyezési egység független állapotot tart fenn a bérlettárolón. Tekintse át a kérelemegység-felhasználást a bérlettárolón , és győződjön meg arról, hogy a kiosztott átviteli sebesség elegendő az összes üzembehelyezési egység számára.

A változáscsatorna feldolgozójának helye

A változáscsatorna feldolgozója bármilyen platformon üzemeltethető, amely támogatja a hosszú ideig futó folyamatokat vagy feladatokat:

Bár a változáscsatorna feldolgozója rövid élettartamú környezetekben is futtatható, mivel a bérlettároló fenntartja az állapotot, a környezetek indítási ciklusa késlelteti az értesítések fogadását (mivel a processzor indítása a környezet minden indításakor többletterhelést jelent).

További források

Következő lépések

A változáscsatorna feldolgozójáról az alábbi cikkekben olvashat bővebben: