Migrera från ändringsflödesprocessorbiblioteket till Azure Cosmos DB .NET V3 SDK

GÄLLER FÖR: NoSQL

Den här artikeln beskriver de steg som krävs för att migrera ett befintligt programs kod som använder ändringsflödesprocessorbiblioteket till ändringsflödesfunktionen i den senaste versionen av .NET SDK (kallas även .NET V3 SDK).

Nödvändiga kodändringar

.NET V3 SDK har flera icke-bakåtkompatibla ändringar. Följande är de viktigaste stegen för att migrera ditt program:

  1. DocumentCollectionInfo Konvertera instanserna till Container referenser för de övervakade containrarna och lånar containrar.
  2. Anpassningar som används WithProcessorOptions bör uppdateras för användning WithLeaseConfiguration och WithPollInterval för intervall, WithStartTimeför starttid och WithMaxItems för att definiera det maximala antalet objekt.
  3. Ange på processorNameGetChangeFeedProcessorBuilder för att matcha det värde som konfigurerats för ChangeFeedProcessorOptions.LeasePrefix, eller använd string.Empty på annat sätt.
  4. Ändringarna levereras inte längre som en IReadOnlyList<Document>, i stället är det en IReadOnlyCollection<T> var T är en typ som du behöver definiera, det finns ingen basobjektklass längre.
  5. För att hantera ändringarna behöver du inte längre en implementering av IChangeFeedObserveri stället för att definiera ett ombud. Ombudet kan vara en statisk funktion, eller så kan du skapa en egen klass och skicka en instansmetod som ombud om du behöver underhålla tillståndet mellan körningarna.

Om till exempel den ursprungliga koden för att skapa ändringsflödesprocessorn ser ut så här:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

Den migrerade koden ser ut så här:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

För ombudet kan du ha en statisk metod för att ta emot händelserna. Om du använder information från IChangeFeedObserverContext kan du migrera för att använda ChangeFeedProcessorContext:

  • ChangeFeedProcessorContext.LeaseToken kan användas i stället för IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers kan användas i stället för IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics innehåller detaljerad information om svarstid för begäranden för felsökning
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Hälsohändelser och observerbarhet

Om du tidigare använde IHealthMonitor eller använde IChangeFeedObserver.OpenAsync och IChangeFeedObserver.CloseAsyncanvänder du API:et Meddelanden.

  • IChangeFeedObserver.OpenAsync kan ersättas med WithLeaseAcquireNotification.
  • IChangeFeedObserver.CloseAsync kan ersättas med WithLeaseReleaseNotification.
  • IHealthMonitor.InspectAsync kan ersättas med WithErrorNotification.

Tillstånd och lånecontainer

På samma sätt som ändringsflödesprocessorbiblioteket använder funktionen för ändringsflöde i .NET V3 SDK en lånecontainer för att lagra tillståndet. Schemana skiljer sig dock åt.

SDK V3-ändringsflödesprocessorn identifierar alla gamla bibliotekstillstånd och migrerar det automatiskt till det nya schemat vid den första körningen av den migrerade programkoden.

Du kan på ett säkert sätt stoppa programmet med den gamla koden, migrera koden till den nya versionen, starta det migrerade programmet och eventuella ändringar som inträffade när programmet stoppades hämtas och bearbetas av den nya versionen.

Ytterligare resurser

Nästa steg

Du kan nu fortsätta med att lära dig mer om ändringsflödesprocessorn i följande artiklar: