Azure Blob Storage の変更フィードを処理するProcess change feed in Azure Blob Storage

変更フィードでは、ストレージ アカウント内の BLOB と BLOB メタデータに対して行われるすべての変更のトランザクション ログが提供されます。Change feed provides transaction logs of all the changes that occur to the blobs and the blob metadata in your storage account. この記事では、BLOB 変更フィード プロセッサ ライブラリを使用して、変更フィード レコードを読み取る方法を示します。This article shows you how to read change feed records by using the blob change feed processor library.

変更フィードの詳細については、Azure Blob Storage の変更フィードに関するページを参照してください。To learn more about the change feed, see Change feed in Azure Blob Storage.

BLOB 変更フィード プロセッサ ライブラリを取得するGet the blob change feed processor library

  1. コマンド ウィンドウを開きます (例: Windows PowerShell)。Open a command window (For example: Windows PowerShell).
  2. ご利用のプロジェクト ディレクトリから、NuGet パッケージ Azure.Storage.Blobs.Changefeed をインストールします。From your project directory, install the Azure.Storage.Blobs.Changefeed NuGet package.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

レコードの読み取りRead records

注意

変更フィードは、ストレージ アカウントの不変および読み取り専用のエンティティです。The change feed is an immutable and read-only entity in your storage account. 任意の数のアプリケーションで、変更フィードの読み取りと処理を都合のよいときに同時に個別に行うことができます。Any number of applications can read and process the change feed simultaneously and independently at their own convenience. アプリケーションでの読み取り時に、変更フィードからレコードが削除されることはありません。Records aren't removed from the change feed when an application reads them. 使用する各リーダーの読み取りまたは反復処理の状態は独立しており、アプリケーションによってのみ維持されます。The read or iteration state of each consuming reader is independent and maintained by your application only.

この例では、変更フィードですべてのレコードを反復処理し、それらをリストに追加してから、そのリストを呼び出し元に返します。This example iterates through all records in the change feed, adds them to a list, and then returns that list to the caller.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

この例では、リスト内の各レコードからのいくつかの値をコンソールに出力します。This example prints to the console a few values from each record in the list.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

保存位置からレコードの読み取りを再開するResume reading records from a saved position

変更フィードに自分の読み取り位置を保存してから、後でレコードの反復処理を再開するように選択できます。You can choose to save your read position in the change feed, and then resume iterating through the records at a future time. 読み取り位置を保存するには、変更フィード カーソルを取得します。You can save the read position by getting the change feed cursor. カーソルは 文字列 であり、アプリケーションでは、アプリケーションの設計に適した方法でその文字列を保存できます (たとえば、ファイルまたはデータベースへ)。The cursor is a string and your application can save that string in any way that makes sense for your application's design (For example: to a file, or database).

この例では、変更フィード内のすべてのレコードを反復処理し、それらをリストに追加し、カーソルを保存します。This example iterates through all records in the change feed, adds them to a list, and saves the cursor. リストとカーソルは呼び出し元に返されます。The list and the cursor are returned to the caller.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {
    
        changeFeedEvents.Add(changeFeedEvent);             
    }
    
    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

レコードのストリーム処理Stream processing of records

変更フィード レコードが変更フィードにコミットされたときにそれらを処理するように選択できます。You can choose to process change feed records as they are committed to the change feed. 仕様」を参照してください。See Specifications. 変更イベントは、平均 60 秒の期間に変更フィードに発行されます。The change events are published to the change feed at a period of 60 seconds on average. ポーリング間隔を指定する場合は、この期間を考慮して新しい変更をポーリングすることをお勧めします。We recommend that you poll for new changes with this period in mind when specifying your poll interval.

この例では、変更を定期的にポーリングします。This example periodically polls for changes. 変更レコードが存在する場合、このコードではそれらのレコードが処理され、変更フィード カーソルが保存されます。If change records exist, this code processes those records and saves change feed cursor. このようにすると、プロセスがいったん停止されてから再び開始される場合、アプリケーションでカーソルが使用され、最後に中断した箇所のレコードの処理が再開されます。That way if the process is stopped and then started again, the application can use the cursor to resume processing records where it last left off. この例では、ローカル アプリケーション構成ファイルにカーソルが保存されますが、自分のアプリケーションでは、自分のシナリオに最も適した形式で保存することができます。This example saves the cursor to a local application configuration file, but your application can save it in any form that makes the most sense for your scenario.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }
            
                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

時間範囲内のレコードの読み取りReading records within a time range

特定の時間範囲内のレコードを読み取ることができます。You can read records that fall within a specific time range. この例では、変更フィード内の 2020 年 3 月 2 日午後 3:00 時から 2020 年 8 月 7 日午前 2:00 時までのすべてのレコードを反復処理し、それらをリストに追加してから、そのリストを呼び出し元に返します。This example iterates through all records in the change feed that fall between 3:00 PM on March 2 2020 and 2:00 AM on August 7 2020, adds them to a list, and then returns that list to the caller.

時間範囲のセグメントの選択Selecting segments for a time range

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

指定した開始時刻は、最も近い時間に切り捨てられ、終了時刻は最も近い時間に切り上げられます。The start time that you provide is rounded down to the nearest hour and the end time is rounded up to the nearest hour. 開始時刻の前と終了時刻の後に発生したイベントがユーザーに表示される可能性があります。It's possible that users might see events that occurred before the start time and after the end time. また、開始時刻と終了時刻の間に発生する一部のイベントが表示されない場合もあります。It's also possible that some events that occur between the start and end time won't appear. これは、イベントが開始時刻の前の時間中または終了時刻の後の時間中に記録される可能性があるためです。That's because events might be recorded during the hour previous to the start time or during the hour after the end time.

次のステップNext steps

変更フィード ログについて、さらに学習します。Learn more about change feed logs. Azure Blob Storage の変更フィード に関するページを参照してください。See Change feed in Azure Blob Storage