Utforska ändringsflöde i Azure Cosmos DB

Slutförd

Ändringsflöde i Azure Cosmos DB är en beständig post med ändringar i en container i den ordning de sker. Stödet för ändringsflöde i Azure Cosmos DB fungerar genom att lyssna efter ändringar på en Azure Cosmos DB-container. Funktionen returnerar sedan den sorterade listan över dokument som ändrats i den ordning de ändrades. De beständiga ändringarna kan bearbetas asynkront och inkrementellt, och utdata kan distribueras över en eller flera konsumenter för parallell bearbetning.

Ändringsflöde och olika åtgärder

I dag visas alla infogningar och uppdateringar i ändringsflödet. Du kan inte filtrera ändringsflödet för en viss typ av åtgärd. Ändringsflödet loggar för närvarande inte borttagningsåtgärder. Som en lösning kan du lägga till en mjuk markör för de objekt som tas bort. Du kan till exempel lägga till ett attribut i objektet med namnet "deleted", ange värdet "true" och sedan ange ett TTL-värde (time-to-live) för objektet. Om du anger TTL ser du till att objektet tas bort automatiskt.

Läsa ändringsflödet i Azure Cosmos DB

Du kan arbeta med Azure Cosmos DB-ändringsflödet med hjälp av antingen en push-modell eller en pull-modell. Med en push-modell skickar ändringsflödesprocessorn arbete till en klient som har affärslogik för bearbetning av det här arbetet. Komplexiteten i att söka efter arbete och lagra tillstånd för det senast bearbetade arbetet hanteras dock i ändringsflödesprocessorn.

Med en pull-modell måste klienten hämta arbetet från servern. Klienten har i det här fallet inte bara affärslogik för bearbetning av arbete utan även lagringstillstånd för det senast bearbetade arbetet, hantering av belastningsutjämning över flera klienter som bearbetar arbete parallellt och hanteringsfel.

Kommentar

Vi rekommenderar att du använder push-modellen eftersom du inte behöver oroa dig för att avsöka ändringsflödet för framtida ändringar, lagra tillstånd för den senast bearbetade ändringen och andra fördelar.

De flesta scenarier som använder Azure Cosmos DB-ändringsflödet använder något av push-modellalternativen. Det finns dock vissa scenarier där du kanske vill ha ytterligare kontroll på låg nivå av pull-modellen. Dessa kan vara:

  • Läsa ändringar från en viss partitionsnyckel
  • Kontrollera i vilken takt klienten får ändringar för bearbetning
  • Göra en engångsläsning av befintliga data i ändringsflödet (till exempel för att utföra en datamigrering)

Läsa ändringsflöde med en push-modell

Det finns två sätt att läsa från ändringsflödet med en push-modell: Azure Functions Azure Cosmos DB-utlösare och ändringsflödesprocessorbiblioteket. Azure Functions använder ändringsflödesprocessorn i bakgrunden, så det här är båda liknande sätt att läsa ändringsflödet. Tänk på Azure Functions som en värdplattform för ändringsflödesprocessorn, inte ett helt annat sätt att läsa ändringsflödet. Azure Functions använder ändringsflödesprocessorn i bakgrunden. Den parallelliserar automatiskt ändringsbearbetningen mellan containerns partitioner.

Azure Functions

Du kan skapa små reaktiva Azure Functions som automatiskt utlöses på varje ny händelse i din Azure Cosmos DB-containers ändringsflöde. Med Azure Functions-utlösaren för Azure Cosmos DB kan du använda ändringsflödesprocessorns skalning och tillförlitliga funktioner för händelseidentifiering utan att behöva underhålla någon arbetsinfrastruktur.

Diagram showing the change feed triggering Azure Functions for processing.

Ändringsflödesprocessor

Ändringsflödesprocessorn är en del av Azure Cosmos DB .NET V3 och Java V4 SDK:er. Det förenklar processen för att läsa ändringsflödet och distribuerar händelsebearbetningen över flera konsumenter effektivt.

Det finns fyra huvudkomponenter i implementeringen av ändringsflödesprocessorn:

  1. Den övervakade containern: Den övervakade containern har de data som ändringsflödet genereras från. Infogningar och uppdateringar till den övervakade containern visas i containerns ändringsflöde.

  2. Lånecontainern: Lånecontainern fungerar som en tillståndslagring och samordnar bearbetningen av ändringsflödet mellan flera arbetare. Lånecontainern kan lagras i samma konto som den övervakade containern eller i ett separat konto.

  3. Beräkningsinstansen: En beräkningsinstans är värd för ändringsflödesprocessorn för att lyssna efter ändringar. Beroende på plattform kan den representeras av en virtuell dator, en kubernetes-podd, en Azure App Service-instans, en verklig fysisk dator. Den har en unik identifierare som refereras till som instansnamn i hela den här artikeln.

  4. Ombudet: Ombudet är den kod som definierar vad du, utvecklaren, vill göra med varje batch med ändringar som ändringsflödesprocessorn läser.

När du implementerar ändringsflödesprocessorn är startpunkten alltid den övervakade containern, från en Container instans som du anropar GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Där den första parametern är ett distinkt namn som beskriver målet med den här processorn och det andra namnet är den delegatimplementering som hanterar ändringar. Följande är ett exempel på ett ombud:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Efteråt definierar du namnet på beräkningsinstansen eller den unika identifieraren med WithInstanceName, detta bör vara unikt och olika i varje beräkningsinstans som du distribuerar, och slutligen, vilket är containern för att underhålla lånetillståndet med WithLeaseContainer.

Samtal Build ger dig den processorinstans som du kan starta genom att anropa StartAsync.

Den normala livscykeln för en värdinstans är:

  1. Läs ändringsflödet.
  2. Om det inte finns några ändringar kan du viloläge under en fördefinierad tid (anpassningsbart med WithPollInterval i Builder) och gå till #1.
  3. Om det finns ändringar kan du skicka dem till ombudet.
  4. När ombudet har slutfört bearbetningen av ändringarna uppdaterar du lånelagringsplatsen med den senaste bearbetade tidpunkten och går till #1.