Pull-modell för ändringsflöde i Azure Cosmos DB

GÄLLER FÖR: NoSQL

Du kan använda pull-modellen för ändringsflöde för att använda Azure Cosmos DB-ändringsflödet i din egen takt. Precis som med ändringsflödesprocessorn kan du använda pull-modellen för ändringsflöde för att parallellisera bearbetningen av ändringar mellan flera ändringsflödeskonsumenter.

Jämför med ändringsflödesprocessorn

Många scenarier kan bearbeta ändringsflödet med antingen ändringsflödesprocessorn eller pull-modellen för ändringsflöde. Pull-modellens fortsättningstoken och ändringsflödesprocessorns lånecontainer fungerar båda som bokmärken för det senast bearbetade objektet eller batchen med objekt i ändringsflödet.

Du kan dock inte konvertera fortsättningstoken till ett lån eller tvärtom.

Anteckning

När du i de flesta fall behöver läsa från ändringsflödet är det enklaste alternativet att använda ändringsflödesprocessorn.

Du bör överväga att använda pull-modellen i följande scenarier:

  • Läsa ändringar från en specifik partitionsnyckel.
  • För att kontrollera i vilken takt klienten får ändringar för bearbetning.
  • Utföra en engångsläsning av befintliga data i ändringsflödet (till exempel för att utföra en datamigrering).

Här är några viktiga skillnader mellan ändringsflödesprocessorn och pull-modellen för ändringsflöde:

Funktion Ändringsflödesprocessor Hämtningsmodell för ändringsflöde
Hålla reda på den aktuella punkten vid bearbetning av ändringsflödet Lån (lagras i en Azure Cosmos DB-container) Fortsättningstoken (lagras i minnet eller sparas manuellt)
Möjlighet att spela upp tidigare ändringar Ja, med push-modell Ja, med pull-modell
Avsökning för framtida ändringar Söker automatiskt efter ändringar baserat på användarangivet WithPollInterval värde Manuell
Beteende där det inte finns några nya ändringar Vänta automatiskt på värdet WithPollInterval och kontrollera sedan igen Måste kontrollera status och manuellt kontrollera igen
Bearbeta ändringar från en hel container Ja, och parallelliseras automatiskt över flera trådar och datorer som förbrukar från samma container Ja, och parallelliseras manuellt med hjälp av FeedRange
Bearbeta ändringar från endast en enda partitionsnyckel Stöds inte Yes

Anteckning

När du använder pull-modellen, till skillnad från när du läser med hjälp av ändringsflödesprocessorn, måste du uttryckligen hantera fall där det inte finns några nya ändringar.

Arbeta med pull-modellen

Om du vill bearbeta ändringsflödet med hjälp av pull-modellen skapar du en instans av FeedIterator. När du först skapar FeedIteratormåste du ange ett obligatoriskt ChangeFeedStartFrom värde, som består av både startpositionen för att läsa ändringar och det värde som du vill använda för FeedRange. FeedRange är ett intervall med partitionsnyckelvärden och anger de objekt som kan läsas från ändringsflödet med hjälp av den specifika FeedIterator. Du måste också ange ett obligatoriskt ChangeFeedMode värde för det läge där du vill bearbeta ändringar: den senaste versionen eller alla versioner och borttagningar. Använd antingen ChangeFeedMode.LatestVersion eller ChangeFeedMode.AllVersionsAndDeletes för att ange vilket läge du vill använda för att läsa ändringsflödet. När du använder alla versioner och tar bort läge måste du välja en ändringsflödesstart från värdet för antingen Now() eller från en specifik fortsättningstoken.

Du kan också ange ChangeFeedRequestOptions om du vill ange en PageSizeHint. När den här egenskapen anges anger den maximalt antal objekt som tas emot per sida. Om åtgärder i den övervakade samlingen utförs via lagrade procedurer bevaras transaktionsomfånget vid läsning av objekt från ändringsflödet. Därför kan antalet mottagna objekt vara högre än det angivna värdet så att objekten som ändras av samma transaktion returneras som en del av en atomisk batch.

Här är ett exempel på hur du hämtar FeedIterator i senaste versionsläge som returnerar entitetsobjekt, i det här fallet ett User objekt:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Tips

Före version 3.34.0kan det senaste versionsläget användas genom att ange ChangeFeedMode.Incremental. Både Incremental och LatestVersion refererar till det senaste versionsläget för ändringsflödet och program som använder något av lägena ser samma beteende.

Alla versioner och borttagningar är i förhandsversion och kan användas med förhandsversionen av .NET SDK-versioner >= 3.32.0-preview. Här är ett exempel på hur du hämtar FeedIterator i alla versioner och tar bort läge som returnerar dynamiska objekt:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Anteckning

I det senaste versionsläget får du objekt som representerar objektet som har ändrats, med några extra metadata. Alla versioner och borttagningsläge returnerar en annan datamodell. Mer information finns i Parsa svarsobjektet.

Använda ändringsflödet via strömmar

FeedIterator för båda ändringsflödeslägena har två alternativ. Förutom de exempel som returnerar entitetsobjekt kan du även hämta svaret med Stream stöd. Med strömmar kan du läsa data utan att först deserialisera dem, så att du sparar på klientresurser.

Här är ett exempel på hur du hämtar FeedIterator i det senaste versionsläget som returnerar Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Använda ändringarna för en hel container

Om du inte anger en FeedRange parameter till FeedIteratorkan du bearbeta en hel containers ändringsflöde i din egen takt. Här är ett exempel som börjar läsa alla ändringar, med början vid den aktuella tidpunkten med hjälp av det senaste versionsläget:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Eftersom ändringsflödet i praktiken är en oändlig lista över objekt som omfattar alla framtida skrivningar och uppdateringar är värdet HasMoreResults för alltid true. När du försöker läsa ändringsflödet och det inte finns några nya ändringar tillgängliga får du ett svar med NotModified status. I föregående exempel hanteras det genom att vänta fem sekunder innan du söker efter ändringar igen.

Använda ändringarna för en partitionsnyckel

I vissa fall kanske du bara vill bearbeta ändringarna för en specifik partitionsnyckel. Du kan hämta FeedIterator för en specifik partitionsnyckel och bearbeta ändringarna på samma sätt som för en hel container.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Använda FeedRange för parallellisering

I ändringsflödesprocessorn sprids arbetet automatiskt över flera konsumenter. I pull-modellen för ändringsflöde kan du använda FeedRange för att parallellisera bearbetningen av ändringsflödet. A FeedRange representerar ett intervall med partitionsnyckelvärden.

Här är ett exempel som visar hur du hämtar en lista över intervall för din container:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

När du får en lista med FeedRange värden för containern får du en FeedRange per fysisk partition.

Med hjälp av en FeedRangekan du skapa en FeedIterator för att parallellisera bearbetningen av ändringsflödet mellan flera datorer eller trådar. Till skillnad från föregående exempel som visade hur du hämtar en FeedIterator för hela containern eller en enda partitionsnyckel kan du använda FeedRanges för att hämta flera FeedIterators, som kan bearbeta ändringsflödet parallellt.

Om du vill använda FeedRanges måste du ha en orkestreringsprocess som hämtar FeedRanges och distribuerar dem till dessa datorer. Den här fördelningen kan vara:

  • Använda FeedRange.ToJsonString och distribuera det här strängvärdet. Konsumenterna kan använda det här värdet med FeedRange.FromJsonString.
  • Om fördelningen pågår skickar du objektreferensen FeedRange .

Här är ett exempel som visar hur du läser från början av containerns ändringsflöde med hjälp av två hypotetiska separata datorer som läser parallellt:

Dator 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Dator 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Spara fortsättningstoken

Du kan spara positionen för din FeedIterator genom att hämta fortsättningstoken. En fortsättningstoken är ett strängvärde som håller reda på feediteratorns senast bearbetade ändringar och gör att den FeedIterator kan återupptas senare. Fortsättningstoken, om den anges, har företräde framför starttiden och börjar från början värden. Följande kod läser igenom ändringsflödet sedan containern skapades. När inga fler ändringar är tillgängliga bevaras en fortsättningstoken så att ändringsflödesförbrukningen kan återupptas senare.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

När du använder det senaste versionsläget FeedIterator upphör fortsättningstoken aldrig att gälla så länge Azure Cosmos DB-containern fortfarande finns. När du använder alla versioner och tar bort läge FeedIterator är fortsättningstoken giltig så länge ändringarna har gjorts i kvarhållningsfönstret för kontinuerliga säkerhetskopieringar.

Nästa steg