Menjelajahi umpan perubahan di Azure Cosmos DB

Selesai

Mengubah umpan di Azure Cosmos DB adalah catatan perubahan yang terus-menerus ke penampung sesuai urutan terjadinya. Ubah dukungan umpan di Azure Cosmos DB bekerja dengan mendengarkan kontainer Azure Cosmos DB untuk setiap perubahan. Kemudian melakukan output daftar dokumen yang diurutkan, yang diubah dalam urutan modifikasinya. Perubahan yang terjadi dapat diproses secara asinkron dan bertahap, dan output dapat didistribusikan ke satu atau lebih konsumen untuk pemrosesan paralel.

Ubah umpan dan operasi yang berbeda

Hari ini, Anda melihat semua sisipan dan pembaruan di feed perubahan. Anda tidak dapat memfilter umpan perubahan untuk jenis operasi tertentu. Saat ini umpan perubahan tidak mencatat operasi penghapusan. Sebagai solusinya, Anda bisa menambahkan penanda lembut pada item yang sedang dihapus. Misalnya, Anda dapat menambahkan atribut dalam item yang disebut "dihapus," mengatur nilainya ke "true," lalu mengatur nilai time-to-live (TTL) pada item. Mengatur TTL memastikan bahwa item dihapus secara otomatis.

Membaca umpan perubahan Azure Cosmos DB

Anda dapat bekerja dengan umpan perubahan Azure Cosmos DB dengan menggunakan model dorong atau model tarik. Dengan model dorong, prosesor umpan perubahan mendorong pekerjaan ke klien yang memiliki logika bisnis untuk memproses pekerjaan ini. Namun, kompleksitas dalam memeriksa pekerjaan dan menyimpan status untuk pekerjaan yang diproses terakhir ditangani dalam prosesor umpan perubahan.

Dengan model tarik, klien harus menarik pekerjaan dari server. Klien, dalam hal ini, tidak hanya memiliki logika bisnis untuk memproses pekerjaan tetapi juga menyimpan status untuk pekerjaan yang diproses terakhir, menangani penyeimbangan beban di beberapa klien yang memproses pekerjaan secara paralel, dan menangani kesalahan.

Catatan

Disarankan untuk menggunakan model pendorongan karena Anda tidak perlu khawatir tentang polling umpan perubahan untuk perubahan di masa mendatang, menyimpan status untuk perubahan terakhir yang diproses, dan manfaat lainnya.

Sebagian besar skenario yang menggunakan umpan perubahan Azure Cosmos DB menggunakan salah satu opsi model pendorongan. Namun, ada beberapa skenario ketika Anda mungkin menginginkan kontrol tingkat rendah tambahan dari model tarik. Ini termasuk:

  • Membaca perubahan dari kunci partisi tertentu
  • Mengontrol kecepatan saat klien Anda menerima perubahan untuk pemrosesan
  • Melakukan satu kali baca data yang ada di umpan perubahan (misalnya, untuk melakukan migrasi data)

Membaca umpan perubahan dengan model dorong

Ada dua cara untuk membaca dari umpan perubahan dengan model push: Pemicu Azure Functions Azure Cosmos DB, dan pustaka prosesor umpan perubahan. Azure Functions menggunakan prosesor umpan perubahan di belakang layar, jadi ini adalah cara yang sama untuk membaca umpan perubahan. Bayangkan Azure Functions hanya sebagai platform hosting untuk prosesor umpan perubahan, bukan cara yang sama sekali berbeda untuk membaca umpan perubahan. Azure Functions menggunakan prosesor umpan perubahan di belakang layar, secara otomatis menyejajarkan pemrosesan perubahan di seluruh partisi kontainer Anda.

Azure Functions

Anda dapat membuat Azure Functions reaktif kecil yang akan secara otomatis dipicu pada setiap peristiwa baru di umpan perubahan kontainer Azure Cosmos DB Anda. Dengan pemicu Azure Functions untuk Azure Cosmos DB, Anda dapat menggunakan fungsionalitas penskalaan Prosesor Umpan Perubahan dan deteksi peristiwa yang andal tanpa perlu mempertahankan infrastruktur pekerja apa pun.

Diagram showing the change feed triggering Azure Functions for processing.

Prosesor umpan perubahan

Pemroses umpan perubahan adalah bagian dari Azure Cosmos DB .NET V3 dan SDK Java V4. Hal ini menyederhanakan proses membaca feed perubahan dan mendistribusikan pemrosesan peristiwa di beberapa konsumen secara efektif.

Ada empat komponen utama untuk mengimplementasikan prosesor umpan perubahan:

  1. Kontainer terpantau: Kontainer terpantau memiliki data dari asal umpan perubahan dihasilkan. Setiap sisipan dan pembaruan pada kontainer yang dipantau tercermin dalam umpan perubahan kontainer.

  2. Kontainer sewa: Kontainer sewa bertindak sebagai penyimpanan status dan mengoordinasikan pemrosesan umpan perubahan di beberapa pekerja. Kontainer sewa dapat disimpan dalam akun yang sama dengan kontainer yang dipantau atau di akun terpisah.

  3. Instans komputasi: Instans komputasi yang menghosting prosesor umpan perubahan untuk mendengarkan perubahan. Bergantung pada platform, ini dapat diwakili oleh mesin virtual, pod kubernetes, instans Azure App Service, komputer fisik yang sebenarnya. Ini memiliki pengenal unik yang direferensikan sebagai nama instans di seluruh artikel ini.

  4. Delegasi: Delegasi adalah kode yang mendefinisikan apa yang Anda, pengembang, ingin lakukan dengan setiap batch perubahan yang dibaca prosesor umpan perubahan.

Saat menerapkan prosesor umpan perubahan, titik entri selalu merupakan kontainer yang dipantau, dari instans yang Container Anda panggil GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Di mana parameter pertama adalah nama berbeda yang menjelaskan tujuan prosesor ini dan nama kedua adalah implementasi delegasi yang akan menangani perubahan. Berikut ini adalah contoh delegasi:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Setelah itu, Anda menentukan nama instans komputasi atau pengidentifikasi unik dengan WithInstanceName, ini harus unik dan berbeda di setiap instans komputasi yang Anda sebarkan, dan akhirnya, yang merupakan kontainer untuk mempertahankan status sewa dengan WithLeaseContainer.

Memanggil Build memberi Anda instans prosesor yang dapat Anda mulai dengan memanggil StartAsync.

Siklus hidup normal instans host adalah:

  1. Membaca umpan perubahan.
  2. Jika tidak ada perubahan, tidurlah untuk jumlah waktu yang telah ditentukan sebelumnya (dapat disesuaikan dengan WithPollInterval dalam Builder) dan buka #1.
  3. Jika ada perubahan, instans mengirimkannya ke delegasi.
  4. Ketika delegasi selesai memproses perubahan dengan sukses, perbarui toko sewa dengan titik waktu pemrosesan terbaru dan lanjutkan ke #1.