Migrera från biblioteket för ändringsflödesprocessorn till Azure Cosmos DB .NET V3 SDK

GÄLLER FÖR: SQL API

Den här artikeln beskriver de steg som krävs för att migrera ett befintligt programs kod som använder ändringsflödesprocessorbiblioteket till ändringsflödesfunktionen i den senaste versionen av .NET SDK (kallas även .NET V3 SDK).

Nödvändiga kodändringar

.NET V3 SDK har flera icke-bakåtkompatibla ändringar. Följande är de viktigaste stegen för att migrera ditt program:

  1. DocumentCollectionInfo Konvertera instanserna till Container referenser för containrarna övervakas och lånar ut.
  2. Anpassningar som använder WithProcessorOptions ska uppdateras för att använda WithLeaseConfiguration och WithPollInterval för intervall, WithStartTimeför starttid och WithMaxItems för att definiera det maximala antalet objekt.
  3. Ange på processorNameGetChangeFeedProcessorBuilder för att matcha värdet som konfigurerats på ChangeFeedProcessorOptions.LeasePrefix, eller använd string.Empty på annat sätt.
  4. Ändringarna levereras inte längre som en IReadOnlyList<Document>, i stället är det en IReadOnlyCollection<T> där T är en typ som du behöver definiera, det finns ingen basobjektklass längre.
  5. För att hantera ändringarna behöver du inte längre en implementering av IChangeFeedObserveri stället för att definiera ett ombud. Ombudet kan vara en statisk funktion, eller så kan du skapa din egen klass och skicka en instansmetod som ombud om du behöver upprätthålla tillståndet mellan körningarna.

Om till exempel den ursprungliga koden för att skapa ändringsflödesprocessorn ser ut så här:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

Den migrerade koden ser ut så här:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

För ombudet kan du ha en statisk metod för att ta emot händelserna. Om du använder information från IChangeFeedObserverContext kan du migrera för att använda ChangeFeedProcessorContext:

  • ChangeFeedProcessorContext.LeaseToken kan användas i stället för IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers kan användas i stället för IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics innehåller detaljerad information om svarstid för begäran för felsökning
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Tillstånd och lånecontainer

Precis som i biblioteket för ändringsflödesprocessorn använder funktionen för ändringsflöde i .NET V3 SDK en lånecontainer för att lagra tillståndet. Schemana skiljer sig dock åt.

SDK V3-ändringsflödesprocessorn identifierar alla gamla bibliotekstillstånd och migrerar det till det nya schemat automatiskt vid den första körningen av den migrerade programkoden.

Du kan på ett säkert sätt stoppa programmet med den gamla koden, migrera koden till den nya versionen, starta det migrerade programmet och eventuella ändringar som gjordes när programmet stoppades hämtas och bearbetas av den nya versionen.

Ytterligare resurser

Nästa steg

Du kan nu fortsätta att lära dig mer om ändringsflödesprocessorn i följande artiklar: