Model vyžádání kanálu změn ve službě Azure Cosmos DB
PLATÍ PRO: NoSQL
Pomocí modelu vyžádání kanálu změn můžete kanál změn služby Azure Cosmos DB využívat vlastním tempem. Podobně jako u procesoru kanálu změn můžete použít model vyžádání kanálu změn k paralelizaci zpracování změn mezi několika příjemci kanálu změn.
Porovnání s procesorem kanálu změn
Mnoho scénářů může kanál změn zpracovat pomocí procesoru kanálu změn nebo modelu vyžádání kanálu změn. Tokeny pokračování modelu vyžádání a kontejner zapůjčení procesoru kanálu změn fungují jako záložky pro poslední zpracovanou položku nebo dávku položek v kanálu změn.
Tokeny pokračování ale nemůžete převést na zapůjčení nebo naopak.
Poznámka
Ve většině případů, když potřebujete číst z kanálu změn, je nejjednodušší možností použít procesor kanálu změn.
V těchto scénářích byste měli zvážit použití modelu vyžádání:
- Čtení změn z konkrétního klíče oddílu
- K řízení tempa, s jakým klient přijímá změny ke zpracování.
- K jednorázovému čtení existujících dat v kanálu změn (například k migraci dat).
Tady jsou některé klíčové rozdíly mezi procesorem kanálu změn a modelem vyžádání kanálu změn:
Funkce | Procesor kanálu změn | Model vyžádání kanálu změn |
---|---|---|
Sledování aktuálního bodu při zpracování kanálu změn | Zapůjčení (uložené v kontejneru Azure Cosmos DB) | Pokračovací token (uložený v paměti nebo ručně trvalý) |
Možnost přehrání minulých změn | Ano, s modelem push | Ano, s modelem pull |
Dotazování na budoucí změny | Automaticky kontroluje změny na základě hodnoty zadané WithPollInterval uživatelem. |
Ruční |
Chování v případě, že nedojde k žádným novým změnám | Automaticky počkejte na hodnotu a WithPollInterval pak znovu zkontrolujte |
Musí zkontrolovat stav a ručně znovu zkontrolovat. |
Zpracování změn z celého kontejneru | Ano a automaticky paralelizovány napříč několika vlákny a počítači, které využívají ze stejného kontejneru | Ano, a ručně paralelizovat pomocí FeedRange |
Zpracování změn pouze z jednoho klíče oddílu | Nepodporováno | Yes |
Poznámka
Při použití modelu vyžádání změn, na rozdíl od čtení pomocí procesoru kanálu změn, musíte explicitně zpracovat případy, kdy nedojde k žádným novým změnám.
Práce s modelem vyžádání
Pokud chcete kanál změn zpracovat pomocí modelu vyžádání, vytvořte instanci objektu FeedIterator
. Při počátečním vytvoření FeedIterator
musíte zadat požadovanou ChangeFeedStartFrom
hodnotu, která se skládá z počáteční pozice pro čtení změn a hodnoty, kterou chcete použít pro FeedRange
. Je FeedRange
rozsah hodnot klíče oddílu a určuje položky, které lze číst z kanálu změn pomocí konkrétního FeedIterator
souboru . Musíte také zadat požadovanou ChangeFeedMode
hodnotu pro režim, ve kterém chcete zpracovávat změny: nejnovější verze nebo všechny verze a odstranění. Použijte buď nebo ChangeFeedMode.LatestVersion
ChangeFeedMode.AllVersionsAndDeletes
k určení, který režim chcete použít ke čtení kanálu změn. Pokud používáte všechny verze a režim odstranění, musíte vybrat kanál změn, který začíná hodnotou nebo Now()
z konkrétního pokračovacího tokenu.
Volitelně můžete zadat ChangeFeedRequestOptions
nastavení PageSizeHint
. Pokud je tato vlastnost nastavená, nastaví maximální počet položek přijatých na stránku. Pokud se operace v monitorované kolekci provádějí prostřednictvím uložených procedur, je při čtení položek z kanálu změn zachován obor transakce. V důsledku toho může být počet přijatých položek vyšší než zadaná hodnota, takže položky změněné stejnou transakcí jsou vráceny jako součást jedné atomické dávky.
Tady je příklad, jak získat FeedIterator
v režimu nejnovější verze, který vrací objekty entit, v tomto případě User
objekt:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tip
Před verzí 3.34.0
je možné použít režim nejnovější verze nastavením ChangeFeedMode.Incremental
. Režim Incremental
nejnovější verze kanálu změn a LatestVersion
aplikace, které používají některý z těchto režimů, budou mít stejné chování.
Všechny verze a režim odstranění jsou ve verzi Preview a lze ho použít s verzemi .NET SDK ve verzi >Preview = 3.32.0-preview
. Tady je příklad pro získání FeedIterator
ve všech verzích a režimu odstranění, který vrací dynamické objekty:
FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Poznámka
V režimu nejnovější verze obdržíte objekty, které představují položku, která se změnila, s některými dalšími metadaty. Všechny verze a režim odstranění vrátí jiný datový model. Další informace najdete v tématu Analýza objektu odpovědi.
Využívání kanálu změn prostřednictvím datových proudů
FeedIterator
pro oba režimy kanálu změn má dvě možnosti. Kromě příkladů, které vrací objekty entit, můžete také získat odpověď s Stream
podporou. Streamy umožňují číst data, aniž byste je nejprve deserializovali, takže ušetříte prostředky klienta.
Tady je příklad, jak získat FeedIterator
v režimu nejnovější verze, který vrátí Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Využití změn pro celý kontejner
Pokud do souboru nezadáte FeedRange
parametr FeedIterator
, můžete zpracovat celý kanál změn kontejneru vlastním tempem. Tady je příklad, který začne číst všechny změny od aktuálního času pomocí režimu nejnovější verze:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Vzhledem k tomu, že kanál změn je ve skutečnosti nekonečný seznam položek, které zahrnují všechny budoucí zápisy a aktualizace, hodnota HasMoreResults
je vždy true
. Když se pokusíte přečíst kanál změn a nejsou k dispozici žádné nové změny, obdržíte odpověď se stavem NotModified
. V předchozím příkladu se tento problém zpracuje tak, že před opětovnou kontrolou změn počká pět sekund.
Využití změn pro klíč oddílu
V některých případech můžete chtít zpracovat pouze změny pro konkrétní klíč oddílu. Můžete získat FeedIterator
pro konkrétní klíč oddílu a zpracovat změny stejným způsobem jako pro celý kontejner.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Použití FeedRange pro paralelizaci
V procesoru kanálu změn se práce automaticky rozdělí mezi více příjemců. V modelu vyžádání kanálu změn můžete použít FeedRange
k paralelizaci zpracování kanálu změn. A FeedRange
představuje rozsah hodnot klíče oddílu.
Tady je příklad, který ukazuje, jak získat seznam oblastí pro kontejner:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Když získáte seznam FeedRange
hodnot pro kontejner, získáte jednu hodnotu FeedRange
pro každý fyzický oddíl.
Pomocí nástroje FeedRange
můžete vytvořit FeedIterator
paralelizaci zpracování kanálu změn na více počítačích nebo vláknech. Na rozdíl od předchozího příkladu, který ukázal, jak získat FeedIterator
klíč pro celý kontejner nebo jeden klíč oddílu, můžete použít FeedRanges k získání více FeedIterators, které mohou zpracovávat kanál změn paralelně.
V případě, že chcete použít FeedRanges, musíte mít proces orchestrátoru, který získá FeedRanges a distribuuje je do těchto počítačů. Toto rozdělení může být:
- Použití
FeedRange.ToJsonString
a distribuce této řetězcové hodnoty Příjemci můžou tuto hodnotu použít sFeedRange.FromJsonString
. - Pokud je distribuce v procesu, předání odkazu na
FeedRange
objekt.
Tady je ukázka, která ukazuje, jak číst od začátku kanálu změn kontejneru pomocí dvou hypotetických samostatných počítačů, které čtou paralelně:
Stroj 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Stroj 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Uložení tokenů pokračování
Pozici můžete uložit získáním pokračovacího FeedIterator
tokenu. Pokračovací token je řetězcová hodnota, která sleduje naposledy zpracované změny feedIteratoru a umožňuje FeedIterator
pokračovat v tomto okamžiku později. Token pokračování, pokud je zadaný, má přednost před časem zahájení a začíná od počátečních hodnot. Následující kód přečte kanál změn od vytvoření kontejneru. Jakmile nebudou k dispozici žádné další změny, zachová token pro pokračování, aby bylo možné později pokračovat ve využívání kanálu změn.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Pokud používáte režim nejnovější verze, platnost pokračovacího tokenu nikdy nevyprší, FeedIterator
dokud kontejner Azure Cosmos DB stále existuje. Pokud používáte všechny verze a režim odstranění, je token pro pokračování platný, FeedIterator
pokud ke změnám došlo v rámci časového období uchovávání pro průběžné zálohování.