Обработка канала изменений в Хранилище BLOB-объектов Azure

Канал изменений предназначен для предоставления журналов транзакций всех изменений, происходящих в больших двоичных объектах и метаданных больших двоичных объектов в вашей учетной записи хранения. В этой статье описано, как считывать записи канала изменений с помощью библиотеки обработчика канала изменений больших двоичных объектов.

Дополнительные сведения см. в статье о канале изменений в Хранилище BLOB-объектов Azure.

Получение библиотеки обработчика канала изменений больших двоичных объектов

  1. Откройте командное окно (например, Windows PowerShell).
  2. Из каталога проекта установите пакет NuGet Azure.Storage.Blobs.Changefeed.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

Считывание записей

Примечание

Канал изменений является неизменяемой и доступной только для чтения сущностью в вашей учетной записи хранения. Любое количество приложений может одновременно и независимо друг от друга считывать и обрабатывать данные канала изменений. Записи не удаляются из канала изменений, когда приложение считывает их. Состояние чтения или итерации для каждого активного читателя является независимым и поддерживается только приложением.

В указанном ниже примере выполняется итерация всех записей в канале изменений. Записи добавляются в список, а затем этот список возвращается вызывающему объекту.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

В указанном ниже примере на консоль выводятся несколько значений из каждой записи в списке.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

Возобновление чтения записей из сохраненного расположения

Вы можете сохранить свое расположение чтения в канале изменений, а затем позже возобновить итерацию записей. Чтобы сохранить расположение чтения, используйте курсор канала изменений. Курсор является строкой, и приложение может сохранить эту строку любым способом, подходящим для дизайна приложения (например, в файл или базу данных).

В указанном ниже примере выполняется итерация всех записей в канале изменений, записи добавляются в список и сохраняется курсор. Список и курсор возвращаются вызывающему объекту.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);             
    }

    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Потоковая обработка записей

Вы также можете выполнить обработку записей канала изменений, так как они зафиксированы в канале изменений. Дополнительные сведения см. в разделе о спецификациях. События изменения публикуются в канале изменений в среднем с интервалом 60 секунд. Рекомендуется выполнять опрос на наличие изменений с учетом этого периода при указании интервала опроса.

В указанном ниже примере периодически выполняется опрос на наличие изменений. При наличии записей изменений данный код обрабатывает их и сохраняет курсор канала изменений. Таким образом, если процесс остановлен, а затем снова запущен, приложение может использовать курсор, в котором он остался последний раз, для возобновления обработки записей. В указанном ниже примере курсор сохраняется в локальном файле конфигурации приложения, но приложение может сохранить его в любой другой форме, наиболее подходящей для вашего сценария.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }

                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

Чтение записей в диапазоне времени

Вы можете выполнять чтение записей в рамках определенного диапазона времени. В указанном ниже примере выполняется итерация всех записей в канале изменений в рамках диапазона времени с 15:00 2 марта 2020 г. до 2:00 7 августа 2020 г. Записи добавляются в список, а затем этот список возвращается вызывающему объекту.

Выбор сегментов для диапазона времени

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Указанное время начала округляется в меньшую сторону до ближайшего часа, а время окончания округляется в большую сторону до ближайшего часа. Пользователям могут отображаться события, произошедшие до времени начала и после времени окончания. Кроме того, возможно, что некоторые события, происходящие в период между временем начала и окончания, не отобразятся. Это связано с тем, что события могут записываться в течение часа, предшествующего времени начала, или в течение часа после времени окончания.

Дальнейшие действия