Model ściągania zestawienia zmian w usłudze Azure Cosmos DB
DOTYCZY: NoSQL
Możesz użyć modelu ściągania zestawienia zmian, aby korzystać ze źródła zmian usługi Azure Cosmos DB we własnym tempie. Podobnie jak w przypadku procesora zestawienia zmian, można użyć modelu ściągania zestawienia zmian w celu równoległego przetwarzania zmian w wielu odbiorcach zestawienia zmian.
Porównanie z procesorem zestawienia zmian
Wiele scenariuszy może przetworzyć zestawienie zmian przy użyciu procesora zestawienia zmian lub modelu ściągania zestawienia zmian. Tokeny kontynuacji modelu ściągania i kontener dzierżawy procesora zestawienia zmian działają jako zakładki dla ostatniego przetworzonego elementu lub partii elementów w kanale zmian.
Nie można jednak przekonwertować tokenów kontynuacji na dzierżawę lub odwrotnie.
Uwaga
W większości przypadków, gdy trzeba odczytać ze zestawienia zmian, najprostszą opcją jest użycie procesora zestawienia zmian.
Należy rozważyć użycie modelu ściągania w następujących scenariuszach:
- Aby odczytać zmiany z określonego klucza partycji.
- Aby kontrolować tempo, w którym klient otrzymuje zmiany do przetwarzania.
- Aby wykonać jednorazowy odczyt istniejących danych w kanale zmian (na przykład w celu przeprowadzenia migracji danych).
Poniżej przedstawiono kilka kluczowych różnic między procesorem zestawienia zmian a modelem ściągania zestawienia zmian:
Cecha | Procesor zestawienia zmian | Model ściągania zestawienia zmian |
---|---|---|
Śledzenie bieżącego punktu przetwarzania zestawienia zmian | Dzierżawa (przechowywana w kontenerze usługi Azure Cosmos DB) | Token kontynuacji (przechowywany w pamięci lub ręcznie utrwalone) |
Możliwość odtwarzania poprzednich zmian | Tak, z modelem wypychania | Tak, z modelem ściągania |
Sondowanie przyszłych zmian | Automatyczne sprawdzanie zmian na podstawie wartości określonej przez WithPollInterval użytkownika |
Ręcznie |
Zachowanie, w którym nie ma nowych zmian | Automatycznie zaczekaj na WithPollInterval wartość, a następnie ponownie sprawdź |
Musi sprawdzić stan i ręcznie ponownie sprawdzić |
Przetwarzanie zmian z całego kontenera | Tak, i automatycznie zrównoleglizowane między wieloma wątkami i maszynami, które korzystają z tego samego kontenera | Tak i ręcznie równoległe przy użyciu polecenia FeedRange |
Przetwarzanie zmian tylko z jednego klucza partycji | Nieobsługiwane | Tak |
Uwaga
W przypadku korzystania z modelu ściągania, w przeciwieństwie do odczytywania przy użyciu procesora zestawienia zmian, należy jawnie obsługiwać przypadki, w których nie ma nowych zmian.
Praca z modelem ściągania
Aby przetworzyć zestawienie zmian przy użyciu modelu ściągania, utwórz wystąpienie klasy FeedIterator
. Podczas początkowego tworzenia FeedIterator
należy określić wymaganą ChangeFeedStartFrom
wartość, która składa się zarówno z pozycji początkowej do odczytywania zmian, jak i wartości, której chcesz użyć dla FeedRange
elementu . Jest FeedRange
to zakres wartości klucza partycji i określa elementy, które mogą być odczytywane z zestawienia zmian przy użyciu tego konkretnego FeedIterator
elementu . Należy również określić wymaganą ChangeFeedMode
wartość dla trybu, w którym chcesz przetworzyć zmiany: najnowszą wersję lub wszystkie wersje i usunięcia. Użyj polecenia ChangeFeedMode.LatestVersion
lub ChangeFeedMode.AllVersionsAndDeletes
, aby wskazać tryb, którego chcesz użyć do odczytania zestawienia zmian. Jeśli używasz wszystkich wersji i trybu usuwania, musisz wybrać źródło zmian rozpoczynające się od wartości lub Now()
z określonego tokenu kontynuacji.
Opcjonalnie można określić, ChangeFeedRequestOptions
aby ustawić wartość PageSizeHint
. Po ustawieniu ta właściwość ustawia maksymalną liczbę odebranych elementów na stronę. Jeśli operacje w monitorowanej kolekcji są wykonywane za pomocą procedur składowanych, zakres transakcji jest zachowywany podczas odczytywania elementów ze źródła zmian. W rezultacie liczba odebranych elementów może być większa niż określona wartość, aby elementy zmienione przez tę samą transakcję zostały zwrócone w ramach jednej partii niepodzielnej.
Oto przykład sposobu uzyskiwania FeedIterator
w trybie najnowszej wersji, który zwraca obiekty jednostki, w tym przypadku User
obiektu:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Porada
Przed wersją 3.34.0
można użyć najnowszego trybu wersji przez ustawienie ChangeFeedMode.Incremental
. Zarówno, jak Incremental
i LatestVersion
odwoływać się do najnowszego trybu wersji zestawienia zmian i aplikacji, które korzystają z obu trybów, będą widzieć to samo zachowanie.
Wszystkie wersje i tryb usuwania są dostępne w wersji zapoznawczej i mogą być używane z wersjami >zestawu .NET SDK w wersji zapoznawczej = 3.32.0-preview
. Oto przykład uzyskiwania FeedIterator
we wszystkich wersjach i usuwania trybu, który zwraca obiekty dynamiczne:
FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Uwaga
W trybie najnowszej wersji są odbierane obiekty reprezentujące zmieniony element z dodatkowymi metadanymi. Wszystkie wersje i tryb usuwania zwracają inny model danych. Aby uzyskać więcej informacji, zobacz Analizowanie obiektu odpowiedzi.
Korzystanie ze zestawienia zmian za pośrednictwem strumieni
FeedIterator
oba tryby zestawienia zmian mają dwie opcje. Oprócz przykładów, które zwracają obiekty jednostki, można również uzyskać odpowiedź z Stream
pomocą techniczną. Strumienie umożliwiają odczytywanie danych bez uprzedniego deserializacji, dzięki czemu można zaoszczędzić na zasobach klienta.
Oto przykład sposobu uzyskiwania FeedIterator
w trybie najnowszej wersji, który zwraca wartość Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Korzystanie ze zmian dla całego kontenera
Jeśli nie podasz parametru FeedRange
do FeedIterator
metody , możesz przetworzyć kanał informacyjny zmian całego kontenera we własnym tempie. Oto przykład, który rozpoczyna odczytywanie wszystkich zmian, począwszy od bieżącego czasu przy użyciu najnowszego trybu wersji:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Ponieważ kanał informacyjny zmian jest w rzeczywistości nieskończoną listą elementów, które obejmują wszystkie przyszłe zapisy i aktualizacje, wartość parametru HasMoreResults
to zawsze true
. Podczas próby odczytania zestawienia zmian i braku dostępnych nowych zmian otrzymasz odpowiedź ze stanem NotModified
. W poprzednim przykładzie jest ona obsługiwana przez oczekiwanie pięć sekund przed ponownym sprawdzeniem zmian.
Korzystanie ze zmian klucza partycji
W niektórych przypadkach możesz przetworzyć tylko zmiany dla określonego klucza partycji. Można uzyskać FeedIterator
dla określonego klucza partycji i przetworzyć zmiany w taki sam sposób, jak w przypadku całego kontenera.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Używanie elementu FeedRange do przetwarzania równoległego
W procesorze zestawienia zmian praca jest automatycznie rozłożona na wielu odbiorców. W modelu ściągania zestawienia zmian można użyć elementu FeedRange
, aby zrównać przetwarzanie zestawienia zmian. Element FeedRange
reprezentuje zakres wartości klucza partycji.
Oto przykład pokazujący, jak uzyskać listę zakresów dla kontenera:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Po otrzymaniu FeedRange
listy wartości dla kontenera uzyskasz jedną FeedRange
na partycję fizyczną.
Za pomocą elementu FeedRange
można utworzyć element FeedIterator
umożliwiający równoległe przetwarzanie zestawienia zmian na wielu maszynach lub wątkach. W przeciwieństwie do poprzedniego przykładu, który pokazał, jak uzyskać element FeedIterator
dla całego kontenera lub pojedynczego klucza partycji, można użyć elementu FeedRanges, aby uzyskać wiele kanałów FeedIterator, które mogą przetwarzać kanał informacyjny zmian równolegle.
W przypadku, gdy chcesz użyć elementu FeedRanges, musisz mieć proces orkiestratora, który uzyskuje element FeedRanges i dystrybuuje je na tych maszynach. Ta dystrybucja może być następująca:
- Używanie
FeedRange.ToJsonString
i rozpowszechnianie tej wartości ciągu. Użytkownicy mogą używać tej wartości za pomocąFeedRange.FromJsonString
polecenia . - Jeśli dystrybucja jest w trakcie przetwarzania, przekazując odwołanie do
FeedRange
obiektu.
Oto przykład pokazujący, jak odczytać od początku źródła zmian kontenera przy użyciu dwóch hipotetycznych oddzielnych maszyn, które odczytują równolegle:
Maszyna 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Maszyna 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Zapisywanie tokenów kontynuacji
Możesz zapisać pozycję, FeedIterator
uzyskując token kontynuacji. Token kontynuacji to wartość ciągu, która śledzi ostatnio przetworzone zmiany w usłudze FeedIterator i umożliwia FeedIterator
wznowienie w tym momencie później. Token kontynuacji, jeśli zostanie określony, ma pierwszeństwo przed czasem rozpoczęcia i rozpoczyna się od wartości początkowych. Poniższy kod odczytuje zestawienie zmian od momentu utworzenia kontenera. Po dokonaniu kolejnych zmian będzie on utrwalał token kontynuacji, aby można było później wznowić użycie zestawienia zmian.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
W przypadku korzystania z trybu najnowszej wersji token kontynuacji nigdy nie wygasa tak długo, FeedIterator
jak kontener usługi Azure Cosmos DB nadal istnieje. Jeśli używasz wszystkich wersji i trybu usuwania, FeedIterator
token kontynuacji jest ważny, o ile zmiany wystąpiły w oknie przechowywania dla ciągłych kopii zapasowych.