Memproses umpan perubahan di Azure Blob Storage

Umpan perubahan menyediakan log transaksi dari semua perubahan yang terjadi pada blob dan metadata blob di akun penyimpanan Anda. Artikel ini memperlihatkan kepada Anda cara membaca rekaman umpan perubahan dengan pustaka prosesor umpan perubahan blob.

Untuk mempelajari selengkapnya tentang umpan perubahan, lihat Umpan perubahan di Azure Blob Storage.

Dapatkan pustaka prosesor umpan perubahan blob

  1. Buka jendela perintah (Misalnya: Windows PowerShell).
  2. Dari direktori proyek Anda, instal paket Azure.Storage.Blobs.Changefeed NuGet.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

Baca rekaman

Catatan

Umpan perubahan adalah entitas imutabel hanya-baca di akun penyimpanan Anda. Sejumlah aplikasi dapat membaca dan memproses umpan perubahan secara bersamaan dan independen kapan saja. Rekaman tidak dihapus dari umpan perubahan saat aplikasi membacanya. Status baca atau iterasi dari setiap pembaca yang mengonsumsi bersifat independen dan dikelola hanya oleh aplikasi Anda.

Contoh ini akan beriterasi ke semua rekaman di umpan perubahan, menambahkannya ke daftar, lalu mengembalikan daftar itu ke pemanggil.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Contoh ini mencetak beberapa nilai ke konsol dari setiap rekaman dalam daftar.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

Lanjut membaca rekaman dari posisi tersimpan

Anda bisa memilih untuk menyimpan posisi baca Anda di umpan perubahan, lalu melanjutkan iterasi melalui rekaman di waktu mendatang. Anda bisa menyimpan posisi baca dengan mendapatkan kursor umpan perubahan. Kursor merupakan string dan aplikasi Anda dapat menyimpan string itu dengan cara apa pun yang masuk akal untuk desain aplikasi Anda (Misalnya: ke file, atau database).

Contoh ini beriterasi melalui semua rekaman di umpan perubahan, menambahkannya ke daftar, dan menyimpan kursor. Daftar dan kursor dikembalikan ke pemanggil.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);             
    }

    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Pemrosesan stream rekaman

Anda bisa memilih untuk memproses rekaman umpan perubahan selagi diterapkan ke umpan perubahan. Lihat Spesifikasi. Peristiwa perubahan diterbitkan ke umpan perubahan pada periode rata-rata 60 detik. Kami menyarankan agar Anda melakukan polling perubahan baru periode ini saat menentukan interval polling Anda.

Contoh ini secara berkala melakukan polling untuk perubahan. Jika ada perubahan rekaman, kode ini memproses rekaman tersebut dan menyimpan kursor umpan perubahan. Dengan begitu jika proses dihentikan lalu dimulai lagi, aplikasi dapat menggunakan kursor untuk melanjutkan pemrosesan rekaman tempatnya terakhir kali ditinggal. Contoh ini menyimpan kursor ke file konfigurasi aplikasi lokal, tetapi aplikasi Anda dapat menyimpannya dalam bentuk apa pun yang paling masuk akal untuk skenario Anda.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }

                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

Membaca rekaman dalam rentang waktu

Anda dapat membaca rekaman yang berada di rentang waktu tertentu. Contoh ini beriterasi melalui semua rekaman di umpan perubahan yang jatuh antara pukul 15:00 pada 2 Maret 2020 dan 02:00 pada 7 Agustus 2020, menambahkannya ke daftar, lalu mengembalikan daftar itu ke pemanggil.

Memilih segmen untuk rentang waktu

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Waktu mulai yang Anda berikan dibulatkan ke bawah ke jam terdekat dan waktu akhir dibulatkan hingga jam terdekat. Ada kemungkinan bahwa pengguna mungkin melihat kejadian sebelum waktu mulai dan setelah waktu akhir. Ada kemungkinan juga bahwa beberapa kejadian yang di antara waktu mulai dan berakhir tidak akan muncul. Itu karena kejadian mungkin direkam selama jam sebelumnya ke waktu mulai atau selama jam setelah waktu akhir.

Langkah berikutnya