Share via


Migrer de la bibliothèque du processeur de flux de modification vers le kit de développement logiciel (SDK) Azure Cosmos DB .NET V3

S’APPLIQUE À : NoSQL

Cet article décrit la procédure de migration d'un code d'application existante utilisant la bibliothèque du processeur de flux de modification vers la fonctionnalité de flux de modification de la dernière version du kit de développement logiciel (SDK) .NET (également appelé kit de développement logiciel (SDK) .NET V3).

Modifications de code requises

Le kit de développement logiciel (SDK) .NET V3 présente plusieurs changements cassants et vous trouverez ci-dessous les principales étapes pour migrer votre application :

  1. Convertissez les instances DocumentCollectionInfo en références Container pour les conteneurs surveillés et de baux.
  2. Les personnalisations utilisant WithProcessorOptions doivent être mises à jour pour utiliser WithLeaseConfiguration et WithPollInterval pour les intervalles, WithStartTimepour l'heure de début et WithMaxItems pour définir le nombre maximal d’éléments.
  3. Définissez processorName sur GetChangeFeedProcessorBuilder à des fins de correspondance avec la valeur configurée sur ChangeFeedProcessorOptions.LeasePrefix ou utilisez string.Empty.
  4. Les modifications ne sont plus fournies sous forme de IReadOnlyList<Document>, mais de IReadOnlyCollection<T>T correspond à un type que vous devez définir. Il n'existe plus de classe d'élément de base.
  5. Pour gérer les modifications, vous n’avez plus besoin d’une implémentation de IChangeFeedObserver , mais vous devez définir un délégué. Le délégué peut être une fonction statique ou, s'il vous faut maintenir l'état entre les exécutions vous pouvez créer votre propre classe et transmettre une méthode d’instance en tant que délégué.

Par exemple, si le code d’origine pour créer le processeur de flux de modification se présente comme suit :

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

Le code migré se présentera comme suit :

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Pour le délégué, vous pouvez avoir une méthode statique pour recevoir les événements. Si vous avez consommé des informations à partir du IChangeFeedObserverContext vous pouvez migrer pour utiliser le ChangeFeedProcessorContext :

  • ChangeFeedProcessorContext.LeaseToken peut être utilisé à la place de IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers peut être utilisé à la place de IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics contient des informations détaillées sur la latence des demandes de résolution des problèmes
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Événements d’intégrité et observabilité

Si vous utilisiez IHealthMonitor précédemment ou si vous utilisiez IChangeFeedObserver.OpenAsync et IChangeFeedObserver.CloseAsync, utilisez l’API Notifications.

  • IChangeFeedObserver.OpenAsync peut être remplacé par WithLeaseAcquireNotification.
  • IChangeFeedObserver.CloseAsync peut être remplacé par WithLeaseReleaseNotification.
  • IHealthMonitor.InspectAsync peut être remplacé par WithErrorNotification.

État et conteneur de baux

À l’instar de la bibliothèque du processeur de flux de modification, la fonctionnalité de flux de modification du kit de développement logiciel (SDK) .NET V3 utilise un conteneur de baux pour stocker l’état. Toutefois, les schémas sont différents.

Le processeur de flux de modification du kit de développement logiciel (SDK) V3 détecte tout état de bibliothèque antérieur et le migre automatiquement vers le nouveau schéma lors de la première exécution du code de l’application migrée.

Vous pouvez arrêter en toute sécurité l’application à l’aide de l’ancien code, migrer le code vers la nouvelle version, démarrer l’application migrée, après quoi toutes les modifications apportées pendant l’arrêt de l’application sont récupérées et traitées par la nouvelle version.

Ressources supplémentaires

Étapes suivantes

Pour plus d’informations sur le processeur de flux de modification, consultez les articles suivants :