Explorer le flux de modification dans Azure Cosmos DB

Effectué

Le flux de modification dans Azure Cosmos DB est un enregistrement persistant des modifications apportées à un conteneur dans l’ordre dans lequel elles se produisent. Dans Azure Cosmos DB, le flux de modification écoute les modifications apportées à un conteneur Azure Cosmos DB. Il renvoie ensuite la liste chronologique de documents qui ont été modifiés, dans l’ordre dans lequel ils ont été modifiés. Les modifications conservées peuvent être traitées de manière asynchrone et incrémentielle, puis réparties sur un ou plusieurs consommateurs pour un traitement en parallèle.

Flux de modification et différentes opérations

Aujourd’hui, vous voyez toutes les insertions et mises à jour dans le flux de modification. Vous ne pouvez pas filtrer le flux de modification pour un type d’opération spécifique. Le flux de modification ne consigne pas les opérations de suppression. Pour contourner ce problème, vous pouvez ajouter un marqueur réversible sur les éléments en cours de suppression. Par exemple, vous pouvez ajouter un attribut dans l’élément appelé « supprimé », définir sa valeur sur « true », puis définir une valeur de durée de vie (TTL) sur l’élément. La définition de la durée de vie garantit que l’élément est automatiquement supprimé.

Lire le flux de modification Azure Cosmos DB

Vous pouvez utiliser le flux de modifications Azure Cosmos DB à l’aide d’un modèle d’envoi (push) ou de tirage (pull). Avec un modèle d’envoi (push), le processeur de flux de modification envoie (push) le travail à un client qui dispose d’une logique métier pour traiter ce travail. Toutefois, la complexité liée à la vérification du travail et au stockage de l’état du dernier travail traité est gérée sur le processeur de flux de modification.

Avec un modèle d’extraction (pull), le client doit extraire le travail du serveur. Dans ce cas, le client dispose d’une logique métier pour non seulement traiter le travail, mais également stocker l’état du dernier travail traité, gérer l’équilibrage de charge entre plusieurs clients traitant un travail en parallèle et gérer les erreurs.

Notes

Il est recommandé d’utiliser le modèle push, car vous n’aurez pas à vous soucier de l’interrogation du flux de modification pour les modifications futures, du stockage de l’état pour la dernière modification traitée, entre autres avantages.

La plupart des scénarios utilisant le flux de modification Azure Cosmos DB a recours à l’une des options du modèle push. Toutefois, dans certains scénarios, vous souhaiterez peut-être le contrôle de bas niveau supplémentaire du modèle pull, notamment :

  • Lecture des modifications à partir d’une clé de partition particulière
  • Contrôle de la vitesse à laquelle votre client reçoit les modifications à traiter
  • Lecture unique des données existantes dans le flux de modification (par exemple, pour effectuer une migration de données)

Lecture du flux de modification avec un modèle push

Deux méthodes s’offrent à vous pour lire le flux de modification avec un modèle push : les déclencheurs Azure Cosmos DB d’Azure Functions et la bibliothèque de processeur de flux de modification. Azure Functions utilise le processeur de flux de modification en arrière-plan. Ces deux méthodes sont donc similaires pour lire le flux de modifications. Imaginez Azure Functions comme une simple plateforme d’hébergement pour le processeur de flux de modification, et non comme une méthode entièrement séparée de lecture du flux de modification. Azure Functions utilise le processeur de flux de modification en arrière-plan ; il parallélise automatiquement le traitement des modifications entre les partitions de votre conteneur.

Azure Functions

Vous pouvez créer de petites fonctions Azure Functions réactives qui se déclencheront automatiquement sur chaque nouvel événement dans le flux de modification de votre conteneur Azure Cosmos DB. Avec le déclencheur Azure Functions pour Azure Cosmos DB, vous pouvez utiliser la mise à l’échelle du processeur de flux de modification et la fonctionnalité fiable de détection d’événement sans conserver d’infrastructure Worker.

Diagram showing the change feed triggering Azure Functions for processing.

Processeur de flux de modification

Le processeur de flux de modification fait partie des kits de développement logiciel Azure Cosmos DB .NET V3 et Java V4. Il simplifie le processus de lecture du flux de modification et répartit efficacement le traitement des événements sur plusieurs consommateurs.

Il existe quatre composants principaux dans l’implémentation du processeur de flux de modification :

  1. Conteneur supervisé : le conteneur supervisé est constitué des données à partir desquelles le flux de modification est généré. Toutes les insertions et les mises à jour apportées au conteneur supervisé sont répercutées dans le flux de modification du conteneur.

  2. Conteneur de baux : Le conteneur de baux fait office d’entrepôt d’état et coordonne le traitement du flux de modification entre plusieurs rôles de travail. Le conteneur de baux peut être stocké dans le même compte que le conteneur surveillé ou dans un compte distinct.

  3. L’instance de calcul : une instance de calcul héberge le processeur de flux de modification pour repérer les modifications. Selon la plateforme, elle peut être représentée par une machine virtuelle, un pod Kubernetes, une instance Azure App Service ou une machine physique réelle. Elle possède un identificateur unique référencé comme nom de l’instance tout au long de cet article.

  4. Délégué : Le délégué est le code qui définit ce que vous, le développeur, souhaitez faire avec chaque lot de modifications que le processeur de flux de modification lit.

Lors de l’implémentation du processeur de flux de modification, le point d’entrée est toujours le conteneur supervisé, à partir d’une instance Container vous appelez GetChangeFeedProcessorBuilder :

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Le premier paramètre est un nom distinct qui décrit l’objectif de ce processeur et le deuxième nom est l’implémentation de délégué qui gérera les modifications. Voici un exemple de délégué :

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Ensuite, vous définissez le nom de l’instance de calcul ou l’identificateur unique avec WithInstanceName, qui doit être unique et différent pour chaque instance de calcul que vous déployez et, enfin, qui est le conteneur avec lequel gérer l’état du bail avec WithLeaseContainer.

Le fait d’appeler Build vous donne l’instance de processeur que vous pouvez démarrer en appelant StartAsync.

Le cycle de vie normal d’une instance d’hôte est le suivant :

  1. Lire le flux de modification.
  2. Si aucune modification n’est apportée, veillez pendant un intervalle de temps prédéfini (personnalisable avec WithPollInterval dans le Builder) et accédez à #1.
  3. En cas de modifications, envoyez-les au délégué.
  4. Une fois les changements correctement traités par le délégué, mise à jour du magasin de baux par rapport au dernier point dans le temps traité, puis retour à l’étape 1.