Model ściągania zestawienia zmian w usłudze Azure Cosmos DB

DOTYCZY: NoSQL

Możesz użyć modelu ściągania zestawienia zmian, aby korzystać ze źródła zmian usługi Azure Cosmos DB we własnym tempie. Podobnie jak w przypadku procesora zestawienia zmian, można użyć modelu ściągania zestawienia zmian w celu równoległego przetwarzania zmian w wielu odbiorcach zestawienia zmian.

Porównanie z procesorem zestawienia zmian

Wiele scenariuszy może przetworzyć zestawienie zmian przy użyciu procesora zestawienia zmian lub modelu ściągania zestawienia zmian. Tokeny kontynuacji modelu ściągania i kontener dzierżawy procesora zestawienia zmian działają jako zakładki dla ostatniego przetworzonego elementu lub partii elementów w kanale zmian.

Nie można jednak przekonwertować tokenów kontynuacji na dzierżawę lub odwrotnie.

Uwaga

W większości przypadków, gdy trzeba odczytać ze zestawienia zmian, najprostszą opcją jest użycie procesora zestawienia zmian.

Należy rozważyć użycie modelu ściągania w następujących scenariuszach:

  • Aby odczytać zmiany z określonego klucza partycji.
  • Aby kontrolować tempo, w którym klient otrzymuje zmiany do przetwarzania.
  • Aby wykonać jednorazowy odczyt istniejących danych w kanale zmian (na przykład w celu przeprowadzenia migracji danych).

Poniżej przedstawiono kilka kluczowych różnic między procesorem zestawienia zmian a modelem ściągania zestawienia zmian:

Cecha Procesor zestawienia zmian Model ściągania zestawienia zmian
Śledzenie bieżącego punktu przetwarzania zestawienia zmian Dzierżawa (przechowywana w kontenerze usługi Azure Cosmos DB) Token kontynuacji (przechowywany w pamięci lub ręcznie utrwalone)
Możliwość odtwarzania poprzednich zmian Tak, z modelem wypychania Tak, z modelem ściągania
Sondowanie przyszłych zmian Automatyczne sprawdzanie zmian na podstawie wartości określonej przez WithPollInterval użytkownika Ręcznie
Zachowanie, w którym nie ma nowych zmian Automatycznie zaczekaj na WithPollInterval wartość, a następnie ponownie sprawdź Musi sprawdzić stan i ręcznie ponownie sprawdzić
Przetwarzanie zmian z całego kontenera Tak, i automatycznie zrównoleglizowane między wieloma wątkami i maszynami, które korzystają z tego samego kontenera Tak i ręcznie równoległe przy użyciu polecenia FeedRange
Przetwarzanie zmian tylko z jednego klucza partycji Nieobsługiwane Tak

Uwaga

W przypadku korzystania z modelu ściągania, w przeciwieństwie do odczytywania przy użyciu procesora zestawienia zmian, należy jawnie obsługiwać przypadki, w których nie ma nowych zmian.

Praca z modelem ściągania

Aby przetworzyć zestawienie zmian przy użyciu modelu ściągania, utwórz wystąpienie klasy FeedIterator. Podczas początkowego tworzenia FeedIteratornależy określić wymaganą ChangeFeedStartFrom wartość, która składa się zarówno z pozycji początkowej do odczytywania zmian, jak i wartości, której chcesz użyć dla FeedRangeelementu . Jest FeedRange to zakres wartości klucza partycji i określa elementy, które mogą być odczytywane z zestawienia zmian przy użyciu tego konkretnego FeedIteratorelementu . Należy również określić wymaganą ChangeFeedMode wartość dla trybu, w którym chcesz przetworzyć zmiany: najnowszą wersję lub wszystkie wersje i usunięcia. Użyj polecenia ChangeFeedMode.LatestVersion lub ChangeFeedMode.AllVersionsAndDeletes , aby wskazać tryb, którego chcesz użyć do odczytania zestawienia zmian. Jeśli używasz wszystkich wersji i trybu usuwania, musisz wybrać źródło zmian rozpoczynające się od wartości lub Now() z określonego tokenu kontynuacji.

Opcjonalnie można określić, ChangeFeedRequestOptions aby ustawić wartość PageSizeHint. Po ustawieniu ta właściwość ustawia maksymalną liczbę odebranych elementów na stronę. Jeśli operacje w monitorowanej kolekcji są wykonywane za pomocą procedur składowanych, zakres transakcji jest zachowywany podczas odczytywania elementów ze źródła zmian. W rezultacie liczba odebranych elementów może być większa niż określona wartość, aby elementy zmienione przez tę samą transakcję zostały zwrócone w ramach jednej partii niepodzielnej.

Oto przykład sposobu uzyskiwania FeedIterator w trybie najnowszej wersji, który zwraca obiekty jednostki, w tym przypadku User obiektu:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Porada

Przed wersją 3.34.0można użyć najnowszego trybu wersji przez ustawienie ChangeFeedMode.Incremental. Zarówno, jak Incremental i LatestVersion odwoływać się do najnowszego trybu wersji zestawienia zmian i aplikacji, które korzystają z obu trybów, będą widzieć to samo zachowanie.

Wszystkie wersje i tryb usuwania są dostępne w wersji zapoznawczej i mogą być używane z wersjami >zestawu .NET SDK w wersji zapoznawczej = 3.32.0-preview. Oto przykład uzyskiwania FeedIterator we wszystkich wersjach i usuwania trybu, który zwraca obiekty dynamiczne:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Uwaga

W trybie najnowszej wersji są odbierane obiekty reprezentujące zmieniony element z dodatkowymi metadanymi. Wszystkie wersje i tryb usuwania zwracają inny model danych. Aby uzyskać więcej informacji, zobacz Analizowanie obiektu odpowiedzi.

Korzystanie ze zestawienia zmian za pośrednictwem strumieni

FeedIterator oba tryby zestawienia zmian mają dwie opcje. Oprócz przykładów, które zwracają obiekty jednostki, można również uzyskać odpowiedź z Stream pomocą techniczną. Strumienie umożliwiają odczytywanie danych bez uprzedniego deserializacji, dzięki czemu można zaoszczędzić na zasobach klienta.

Oto przykład sposobu uzyskiwania FeedIterator w trybie najnowszej wersji, który zwraca wartość Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Korzystanie ze zmian dla całego kontenera

Jeśli nie podasz parametru FeedRange do FeedIteratormetody , możesz przetworzyć kanał informacyjny zmian całego kontenera we własnym tempie. Oto przykład, który rozpoczyna odczytywanie wszystkich zmian, począwszy od bieżącego czasu przy użyciu najnowszego trybu wersji:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Ponieważ kanał informacyjny zmian jest w rzeczywistości nieskończoną listą elementów, które obejmują wszystkie przyszłe zapisy i aktualizacje, wartość parametru HasMoreResults to zawsze true. Podczas próby odczytania zestawienia zmian i braku dostępnych nowych zmian otrzymasz odpowiedź ze stanem NotModified . W poprzednim przykładzie jest ona obsługiwana przez oczekiwanie pięć sekund przed ponownym sprawdzeniem zmian.

Korzystanie ze zmian klucza partycji

W niektórych przypadkach możesz przetworzyć tylko zmiany dla określonego klucza partycji. Można uzyskać FeedIterator dla określonego klucza partycji i przetworzyć zmiany w taki sam sposób, jak w przypadku całego kontenera.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Używanie elementu FeedRange do przetwarzania równoległego

W procesorze zestawienia zmian praca jest automatycznie rozłożona na wielu odbiorców. W modelu ściągania zestawienia zmian można użyć elementu FeedRange , aby zrównać przetwarzanie zestawienia zmian. Element FeedRange reprezentuje zakres wartości klucza partycji.

Oto przykład pokazujący, jak uzyskać listę zakresów dla kontenera:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Po otrzymaniu FeedRange listy wartości dla kontenera uzyskasz jedną FeedRange na partycję fizyczną.

Za pomocą elementu FeedRangemożna utworzyć element FeedIterator umożliwiający równoległe przetwarzanie zestawienia zmian na wielu maszynach lub wątkach. W przeciwieństwie do poprzedniego przykładu, który pokazał, jak uzyskać element FeedIterator dla całego kontenera lub pojedynczego klucza partycji, można użyć elementu FeedRanges, aby uzyskać wiele kanałów FeedIterator, które mogą przetwarzać kanał informacyjny zmian równolegle.

W przypadku, gdy chcesz użyć elementu FeedRanges, musisz mieć proces orkiestratora, który uzyskuje element FeedRanges i dystrybuuje je na tych maszynach. Ta dystrybucja może być następująca:

  • Używanie FeedRange.ToJsonString i rozpowszechnianie tej wartości ciągu. Użytkownicy mogą używać tej wartości za pomocą FeedRange.FromJsonStringpolecenia .
  • Jeśli dystrybucja jest w trakcie przetwarzania, przekazując odwołanie do FeedRange obiektu.

Oto przykład pokazujący, jak odczytać od początku źródła zmian kontenera przy użyciu dwóch hipotetycznych oddzielnych maszyn, które odczytują równolegle:

Maszyna 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Maszyna 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Zapisywanie tokenów kontynuacji

Możesz zapisać pozycję, FeedIterator uzyskując token kontynuacji. Token kontynuacji to wartość ciągu, która śledzi ostatnio przetworzone zmiany w usłudze FeedIterator i umożliwia FeedIterator wznowienie w tym momencie później. Token kontynuacji, jeśli zostanie określony, ma pierwszeństwo przed czasem rozpoczęcia i rozpoczyna się od wartości początkowych. Poniższy kod odczytuje zestawienie zmian od momentu utworzenia kontenera. Po dokonaniu kolejnych zmian będzie on utrwalał token kontynuacji, aby można było później wznowić użycie zestawienia zmian.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

W przypadku korzystania z trybu najnowszej wersji token kontynuacji nigdy nie wygasa tak długo, FeedIterator jak kontener usługi Azure Cosmos DB nadal istnieje. Jeśli używasz wszystkich wersji i trybu usuwania, FeedIterator token kontynuacji jest ważny, o ile zmiany wystąpiły w oknie przechowywania dla ciągłych kopii zapasowych.

Następne kroki