Migrace z knihovny procesoru kanálu změn do sady .NET SDK služby Azure Cosmos DB V3

PLATÍ PRO: NoSQL

Tento článek popisuje požadované kroky k migraci kódu existující aplikace, který používá knihovnu procesoru kanálu změn do funkce kanálu změn v nejnovější verzi sady .NET SDK (označované také jako sada .NET SDK V3).

Požadované změny kódu

Sada .NET V3 SDK obsahuje několik zásadních změn, následující kroky jsou klíčovými kroky pro migraci aplikace:

  1. DocumentCollectionInfo Převeďte instance na Container odkazy pro monitorované kontejnery a kontejnery zapůjčení.
  2. Vlastní nastavení, která se používají WithProcessorOptions , by se měla aktualizovat tak, aby používala WithLeaseConfiguration intervaly WithPollInterval , WithStartTimepro počáteční čas a WithMaxItems definovala maximální počet položek.
  3. processorName Nastavte zapnutou GetChangeFeedProcessorBuilder hodnotu tak, aby odpovídala hodnotě nakonfigurované ChangeFeedProcessorOptions.LeasePrefix, nebo použijte string.Empty jinak.
  4. Změny se už nedoručují jako typIReadOnlyList<Document>, který T je třeba definovat, IReadOnlyCollection<T> ale už neexistuje žádná třída základní položky.
  5. Ke zpracování změn už nepotřebujete implementaci IChangeFeedObserver, místo toho musíte definovat delegáta. Delegátem může být statická funkce, nebo pokud potřebujete zachovat stav napříč spuštěními, můžete vytvořit vlastní třídu a předat metodu instance jako delegát.

Pokud například původní kód pro sestavení procesoru kanálu změn vypadá takto:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

Migrovaný kód bude vypadat takto:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Pro delegáta můžete mít statickou metodu pro příjem událostí. Pokud jste používali informace z IChangeFeedObserverContext aplikace, můžete migrovat a použít:ChangeFeedProcessorContext

  • ChangeFeedProcessorContext.LeaseToken lze použít místo IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers lze použít místo IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics obsahuje podrobné informace o latenci požadavků pro řešení potíží.
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Události stavu a pozorovatelnost

Pokud jste dříve používali IHealthMonitor nebo jste používali IChangeFeedObserver.OpenAsync , a IChangeFeedObserver.CloseAsyncpoužijte rozhraní API pro oznámení.

  • IChangeFeedObserver.OpenAsync lze nahradit znakem WithLeaseAcquireNotification.
  • IChangeFeedObserver.CloseAsync lze nahradit znakem WithLeaseReleaseNotification.
  • IHealthMonitor.InspectAsync lze nahradit znakem WithErrorNotification.

Kontejner stavu a zapůjčení

Podobně jako knihovna procesoru kanálu změn používá funkce kanálu změn v sadě .NET V3 SDK kontejner zapůjčení k uložení stavu. Schémata se ale liší.

Procesor kanálu změn SADY SDK V3 rozpozná jakýkoli starý stav knihovny a automaticky ho migruje do nového schématu při prvním spuštění migrovaného kódu aplikace.

Aplikaci můžete bezpečně zastavit pomocí starého kódu, migrovat kód do nové verze, spustit migrovanou aplikaci a všechny změny, ke kterým došlo v době, kdy byla aplikace zastavena, budou vyzvednuty a zpracovány novou verzí.

Další materiály

Další kroky

Teď můžete pokračovat a získat další informace o procesoru kanálu změn v následujících článcích: