Model vyžádání kanálu změn ve službě Azure Cosmos DB

PLATÍ PRO: NoSQL

Pomocí modelu vyžádání kanálu změn můžete kanál změn služby Azure Cosmos DB využívat vlastním tempem. Podobně jako u procesoru kanálu změn můžete použít model vyžádání kanálu změn k paralelizaci zpracování změn mezi několika příjemci kanálu změn.

Porovnání s procesorem kanálu změn

Mnoho scénářů může kanál změn zpracovat pomocí procesoru kanálu změn nebo modelu vyžádání kanálu změn. Tokeny pokračování modelu vyžádání a kontejner zapůjčení procesoru kanálu změn fungují jako záložky pro poslední zpracovanou položku nebo dávku položek v kanálu změn.

Tokeny pokračování ale nemůžete převést na zapůjčení nebo naopak.

Poznámka

Ve většině případů, když potřebujete číst z kanálu změn, je nejjednodušší možností použít procesor kanálu změn.

V těchto scénářích byste měli zvážit použití modelu vyžádání:

  • Čtení změn z konkrétního klíče oddílu
  • K řízení tempa, s jakým klient přijímá změny ke zpracování.
  • K jednorázovému čtení existujících dat v kanálu změn (například k migraci dat).

Tady jsou některé klíčové rozdíly mezi procesorem kanálu změn a modelem vyžádání kanálu změn:

Funkce Procesor kanálu změn Model vyžádání kanálu změn
Sledování aktuálního bodu při zpracování kanálu změn Zapůjčení (uložené v kontejneru Azure Cosmos DB) Pokračovací token (uložený v paměti nebo ručně trvalý)
Možnost přehrání minulých změn Ano, s modelem push Ano, s modelem pull
Dotazování na budoucí změny Automaticky kontroluje změny na základě hodnoty zadané WithPollInterval uživatelem. Ruční
Chování v případě, že nedojde k žádným novým změnám Automaticky počkejte na hodnotu a WithPollInterval pak znovu zkontrolujte Musí zkontrolovat stav a ručně znovu zkontrolovat.
Zpracování změn z celého kontejneru Ano a automaticky paralelizovány napříč několika vlákny a počítači, které využívají ze stejného kontejneru Ano, a ručně paralelizovat pomocí FeedRange
Zpracování změn pouze z jednoho klíče oddílu Nepodporováno Yes

Poznámka

Při použití modelu vyžádání změn, na rozdíl od čtení pomocí procesoru kanálu změn, musíte explicitně zpracovat případy, kdy nedojde k žádným novým změnám.

Práce s modelem vyžádání

Pokud chcete kanál změn zpracovat pomocí modelu vyžádání, vytvořte instanci objektu FeedIterator. Při počátečním vytvoření FeedIteratormusíte zadat požadovanou ChangeFeedStartFrom hodnotu, která se skládá z počáteční pozice pro čtení změn a hodnoty, kterou chcete použít pro FeedRange. Je FeedRange rozsah hodnot klíče oddílu a určuje položky, které lze číst z kanálu změn pomocí konkrétního FeedIteratorsouboru . Musíte také zadat požadovanou ChangeFeedMode hodnotu pro režim, ve kterém chcete zpracovávat změny: nejnovější verze nebo všechny verze a odstranění. Použijte buď nebo ChangeFeedMode.LatestVersionChangeFeedMode.AllVersionsAndDeletes k určení, který režim chcete použít ke čtení kanálu změn. Pokud používáte všechny verze a režim odstranění, musíte vybrat kanál změn, který začíná hodnotou nebo Now() z konkrétního pokračovacího tokenu.

Volitelně můžete zadat ChangeFeedRequestOptions nastavení PageSizeHint. Pokud je tato vlastnost nastavená, nastaví maximální počet položek přijatých na stránku. Pokud se operace v monitorované kolekci provádějí prostřednictvím uložených procedur, je při čtení položek z kanálu změn zachován obor transakce. V důsledku toho může být počet přijatých položek vyšší než zadaná hodnota, takže položky změněné stejnou transakcí jsou vráceny jako součást jedné atomické dávky.

Tady je příklad, jak získat FeedIterator v režimu nejnovější verze, který vrací objekty entit, v tomto případě User objekt:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Tip

Před verzí 3.34.0je možné použít režim nejnovější verze nastavením ChangeFeedMode.Incremental. Režim Incremental nejnovější verze kanálu změn a LatestVersion aplikace, které používají některý z těchto režimů, budou mít stejné chování.

Všechny verze a režim odstranění jsou ve verzi Preview a lze ho použít s verzemi .NET SDK ve verzi >Preview = 3.32.0-preview. Tady je příklad pro získání FeedIterator ve všech verzích a režimu odstranění, který vrací dynamické objekty:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Poznámka

V režimu nejnovější verze obdržíte objekty, které představují položku, která se změnila, s některými dalšími metadaty. Všechny verze a režim odstranění vrátí jiný datový model. Další informace najdete v tématu Analýza objektu odpovědi.

Využívání kanálu změn prostřednictvím datových proudů

FeedIterator pro oba režimy kanálu změn má dvě možnosti. Kromě příkladů, které vrací objekty entit, můžete také získat odpověď s Stream podporou. Streamy umožňují číst data, aniž byste je nejprve deserializovali, takže ušetříte prostředky klienta.

Tady je příklad, jak získat FeedIterator v režimu nejnovější verze, který vrátí Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Využití změn pro celý kontejner

Pokud do souboru nezadáte FeedRange parametr FeedIterator, můžete zpracovat celý kanál změn kontejneru vlastním tempem. Tady je příklad, který začne číst všechny změny od aktuálního času pomocí režimu nejnovější verze:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Vzhledem k tomu, že kanál změn je ve skutečnosti nekonečný seznam položek, které zahrnují všechny budoucí zápisy a aktualizace, hodnota HasMoreResults je vždy true. Když se pokusíte přečíst kanál změn a nejsou k dispozici žádné nové změny, obdržíte odpověď se stavem NotModified . V předchozím příkladu se tento problém zpracuje tak, že před opětovnou kontrolou změn počká pět sekund.

Využití změn pro klíč oddílu

V některých případech můžete chtít zpracovat pouze změny pro konkrétní klíč oddílu. Můžete získat FeedIterator pro konkrétní klíč oddílu a zpracovat změny stejným způsobem jako pro celý kontejner.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Použití FeedRange pro paralelizaci

V procesoru kanálu změn se práce automaticky rozdělí mezi více příjemců. V modelu vyžádání kanálu změn můžete použít FeedRange k paralelizaci zpracování kanálu změn. A FeedRange představuje rozsah hodnot klíče oddílu.

Tady je příklad, který ukazuje, jak získat seznam oblastí pro kontejner:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Když získáte seznam FeedRange hodnot pro kontejner, získáte jednu hodnotu FeedRange pro každý fyzický oddíl.

Pomocí nástroje FeedRangemůžete vytvořit FeedIterator paralelizaci zpracování kanálu změn na více počítačích nebo vláknech. Na rozdíl od předchozího příkladu, který ukázal, jak získat FeedIterator klíč pro celý kontejner nebo jeden klíč oddílu, můžete použít FeedRanges k získání více FeedIterators, které mohou zpracovávat kanál změn paralelně.

V případě, že chcete použít FeedRanges, musíte mít proces orchestrátoru, který získá FeedRanges a distribuuje je do těchto počítačů. Toto rozdělení může být:

  • Použití FeedRange.ToJsonString a distribuce této řetězcové hodnoty Příjemci můžou tuto hodnotu použít s FeedRange.FromJsonString.
  • Pokud je distribuce v procesu, předání odkazu na FeedRange objekt.

Tady je ukázka, která ukazuje, jak číst od začátku kanálu změn kontejneru pomocí dvou hypotetických samostatných počítačů, které čtou paralelně:

Stroj 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Stroj 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Uložení tokenů pokračování

Pozici můžete uložit získáním pokračovacího FeedIterator tokenu. Pokračovací token je řetězcová hodnota, která sleduje naposledy zpracované změny feedIteratoru a umožňuje FeedIterator pokračovat v tomto okamžiku později. Token pokračování, pokud je zadaný, má přednost před časem zahájení a začíná od počátečních hodnot. Následující kód přečte kanál změn od vytvoření kontejneru. Jakmile nebudou k dispozici žádné další změny, zachová token pro pokračování, aby bylo možné později pokračovat ve využívání kanálu změn.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Pokud používáte režim nejnovější verze, platnost pokračovacího tokenu nikdy nevyprší, FeedIterator dokud kontejner Azure Cosmos DB stále existuje. Pokud používáte všechny verze a režim odstranění, je token pro pokračování platný, FeedIterator pokud ke změnám došlo v rámci časového období uchovávání pro průběžné zálohování.

Další kroky