Azure Cosmos DB の変更フィード プロセッサChange feed processor in Azure Cosmos DB

適用対象: SQL API

変更フィード プロセッサは Azure Cosmos DB SDK V3 の一部です。The change feed processor is part of the Azure Cosmos DB SDK V3. それにより、変更フィードを読み取り、イベント処理を複数のコンシューマーに効率的に分散させるプロセスが簡単になります。It simplifies the process of reading the change feed and distribute the event processing across multiple consumers effectively.

変更フィード プロセッサ ライブラリの主な利点は、変更フィード内のすべてのイベントが確実に "少なくとも 1 回" は配信されるフォールト トレラントな動作です。The main benefit of change feed processor library is its fault-tolerant behavior that assures an "at-least-once" delivery of all the events in the change feed.

変更フィード プロセッサのコンポーネントComponents of the change feed processor

変更フィード プロセッサの実装には、4 つの主要なコンポーネントがあります。There are four main components of implementing the change feed processor:

  1. 監視対象コンテナー: 監視対象コンテナーには、変更フィードの生成元となるデータが含まれています。The monitored container: The monitored container has the data from which the change feed is generated. 監視対象コンテナーに対する挿入と更新が、コンテナーの変更フィードに反映されます。Any inserts and updates to the monitored container are reflected in the change feed of the container.

  2. リース コンテナー: リース コンテナーは、状態ストレージとして機能し、複数の worker 間での変更フィードの処理を調整します。The lease container: The lease container acts as a state storage and coordinates processing the change feed across multiple workers. リース コンテナーは、監視対象コンテナーと同じアカウントまたは別のアカウントに格納できます。The lease container can be stored in the same account as the monitored container or in a separate account.

  3. ホスト: ホストは、変更フィード プロセッサを使って変更をリッスンするアプリケーション インスタンスです。The host: A host is an application instance that uses the change feed processor to listen for changes. 同じリース構成の複数のインスタンスを並列に実行できますが、 インスタンス名 はインスタンスごとに異なっている必要があります。Multiple instances with the same lease configuration can run in parallel, but each instance should have a different instance name.

  4. デリゲート: デリゲートは、変更フィード プロセッサによって読み取られる変更の各バッチについて、開発者が行いたいことが定義されているコードです。The delegate: The delegate is the code that defines what you, the developer, want to do with each batch of changes that the change feed processor reads.

変更フィード プロセッサのこれら 4 要素の連携のしくみについて理解を深めるために、次の図の例を見てみましょう。To further understand how these four elements of change feed processor work together, let's look at an example in the following diagram. 監視対象コンテナーでは、ドキュメントが保存され、パーティション キーとして "City" が使われます。The monitored container stores documents and uses 'City' as the partition key. パーティション キーの値は項目を含む範囲内に分散されていることがわかります。We see that the partition key values are distributed in ranges that contain items. 2 つのホスト インスタンスがあり、変更フィード プロセッサでは、コンピューティングの分散を最大にするため、各インスタンスに異なる範囲のパーティション キー値が割り当てられます。There are two host instances and the change feed processor is assigning different ranges of partition key values to each instance to maximize compute distribution. 各範囲は並列に読み取られ、その進行状況はリース コンテナー内に他の範囲とは区別して保持されます。Each range is being read in parallel and its progress is maintained separately from other ranges in the lease container.

変更フィード プロセッサの例

変更フィード プロセッサの実装Implementing the change feed processor

エントリ ポイントは常に監視対象コンテナーであり、Container インスタンスから GetChangeFeedProcessorBuilder を呼び出します。The point of entry is always the monitored container, from a Container instance you call GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

1 番目のパラメーターは、このプロセッサの目的を説明する一意の名前です。2番目の名前は、変更を処理するデリゲートの実装です。Where the first parameter is a distinct name that describes the goal of this processor and the second name is the delegate implementation that will handle changes.

デリゲートの例を次に示します。An example of a delegate would be:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine("Started handling changes...");
    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

最後に、このプロセッサ インスタンスの名前を WithInstanceName で定義します。これは、WithLeaseContainer でリース状態を保持するコンテナーです。Finally you define a name for this processor instance with WithInstanceName and which is the container to maintain the lease state with WithLeaseContainer.

Build を呼び出すとプロセッサ インスタンスが提供され、StartAsync を呼び出すことによってそれを開始できます。Calling Build will give you the processor instance that you can start by calling StartAsync.

処理のライフ サイクルProcessing life cycle

ホスト インスタンスの通常のライフ サイクルは次のとおりです。The normal life cycle of a host instance is:

  1. 変更フィードを読み取ります。Read the change feed.
  2. 変更がない場合は、事前に定義された時間 (ビルダーの WithPollInterval でカスタマイズ可能) だけスリープし、#1 に移ります。If there are no changes, sleep for a predefined amount of time (customizable with WithPollInterval in the Builder) and go to #1.
  3. 変更がある場合は、それらを デリゲート に送信します。If there are changes, send them to the delegate.
  4. デリゲートによる変更の処理が 正常に 完了すると、最後に処理された時点でリース ストアを更新し、#1 に移ります。When the delegate finishes processing the changes successfully , update the lease store with the latest processed point in time and go to #1.

エラー処理Error handling

変更フィード プロセッサには、ユーザー コード エラーに対する回復性があります。The change feed processor is resilient to user code errors. つまり、デリゲートの実装にハンドルされない例外がある場合 (ステップ #4)、その特定の変更バッチを処理しているスレッドは停止され、新しいスレッドが作成されます。That means that if your delegate implementation has an unhandled exception (step #4), the thread processing that particular batch of changes will be stopped, and a new thread will be created. 新しいスレッドでは、そのパーティション キー値範囲に対してリース ストアで保持されている最新時点が確認され、そこから処理が再開されて、同じ変更バッチがデリゲートに効率的に送信されます。The new thread will check which was the latest point in time the lease store has for that range of partition key values, and restart from there, effectively sending the same batch of changes to the delegate. デリゲートによって変更が正しく処理されるまで、この動作が続けられます。また、デリゲートのコードで例外がスローされると、そのバッチが再試行されるため、変更フィード プロセッサでは "少なくとも 1 回" が保証されます。This behavior will continue until your delegate processes the changes correctly and it's the reason the change feed processor has an "at least once" guarantee, because if the delegate code throws an exception, it will retry that batch.

変更フィード プロセッサが同じバッチの変更を継続的に再試行して "行き詰まる" ことがないように、例外が発生した場合に、ドキュメントに書き込むためのデリゲート コードのロジックを配信不能キューに追加する必要があります。To prevent your change feed processor from getting "stuck" continuously retrying the same batch of changes, you should add logic in your delegate code to write documents, upon exception, to a dead-letter queue. このように設計することで、今後の変更の処理を継続しながら、未処理の変更を追跡できます。This design ensures that you can keep track of unprocessed changes while still being able to continue to process future changes. 配信不能キューは、別の Cosmos コンテナーである可能性があります。The dead-letter queue might be another Cosmos container. 問題になるのは、データ ストア自体ではなく、ただ未処理の変更が永続化されることだけです。The exact data store does not matter, simply that the unprocessed changes are persisted.

さらに、変更フィード推定機能を使用して、変更フィードを読み取る変更フィード プロセッサ インスタンスの進行状況を監視できます。In addition, you can use the change feed estimator to monitor the progress of your change feed processor instances as they read the change feed. この推定を使用すると、CPU、メモリ、ネットワーク帯域幅などの使用可能なリソースによって、変更フィード プロセッサが "停止している" あるいは遅れているかどうかを把握できます。You can use this estimation to understand if your change feed processor is "stuck" or lagging behind due to available resources like CPU, memory, and network bandwidth.

展開単位Deployment unit

変更フィード プロセッサの 1 つの展開単位は、同じ processorName とリース コンテナーの構成を備えた 1 つまたは複数のインスタンスで成り立っています。A single change feed processor deployment unit consists of one or more instances with the same processorName and lease container configuration. 変更に対してそれぞれに異なるビジネス フローを備えた展開単位を多数保持することができ、各展開単位は 1 つまたは複数のインスタンスで構成されています。You can have many deployment units where each one has a different business flow for the changes and each deployment unit consisting of one or more instances.

たとえば、コンテナーに変更が発生するたびに、外部 API をトリガーする 1 つの展開単位があるとします。For example, you might have one deployment unit that triggers an external API anytime there is a change in your container. 別の展開単位では、変更が発生するたびに、リアルタイムでデータを移動することができます。Another deployment unit might move data, in real time, each time there is a change. 監視対象のコンテナーで変更が発生すると、すべての展開単位に対して通知が行われます。When a change happens in your monitored container, all your deployment units will get notified.

動的スケーリングDynamic scaling

前述したように、1 つの展開単位内には、1 つまたは複数のインスタンスを保持できます。As mentioned before, within a deployment unit you can have one or more instances. 展開単位内でコンピューティングの分散を利用するには、次の要件のみが重要になります。To take advantage of the compute distribution within the deployment unit, the only key requirements are:

  1. すべてのインスタンスのリース コンテナーの構成が同じである必要があります。All instances should have the same lease container configuration.
  2. すべてのインスタンスが、同じ processorName を保持している必要があります。All instances should have the same processorName.
  3. 各インスタンスには、異なるインスタンス名が設定されている必要があります (WithInstanceName)。Each instance needs to have a different instance name (WithInstanceName).

これら 3 つの条件が適用されると、変更フィード プロセッサでは均等分散アルゴリズムを使って、リース コンテナー内のすべてのリースが該当の展開単位に含まれる実行中の全インスタンスに分散されて、コンピューティングが並列化されます。If these three conditions apply, then the change feed processor will, using an equal distribution algorithm, distribute all the leases in the lease container across all running instances of that deployment unit and parallelize compute. 1 つのリースは一度に 1 つのインスタンスによってのみ所有されるため、インスタンスの最大数はリースの数と同じになります。One lease can only be owned by one instance at a given time, so the maximum number of instances equals to the number of leases.

インスタンス数は増減する可能性があり、変更フィード プロセッサではそれに従って再配布することで負荷が動的に調整されます。The number of instances can grow and shrink, and the change feed processor will dynamically adjust the load by redistributing accordingly.

さらに、スループットまたはストレージの増加に応じて、変更フィード プロセッサでコンテナーを動的に調整できます。Moreover, the change feed processor can dynamically adjust to containers scale due to throughput or storage increases. コンテナーが拡張されると、変更フィード プロセッサでは、リースを動的に増やし、既存のインスタンス間で新しいリースを分散することによって、これらのシナリオが透過的に処理されます。When your container grows, the change feed processor transparently handles these scenarios by dynamically increasing the leases and distributing the new leases among existing instances.

変更フィードとプロビジョニング済みスループットChange feed and provisioned throughput

監視対象コンテナーで変更フィード読み取り操作を行うと、RU が使用されます。Change feed read operations on the monitored container will consume RUs.

リース コンテナーに対する操作では、RU が使用されます。Operations on the lease container consume RUs. 同じリース コンテナーを使用しているインスタンスの数が多いほど、潜在的な RU の消費量が多くなります。The higher the number of instances using the same lease container, the higher the potential RU consumption will be. インスタンスの数をスケール調整および増分する場合は、リース コンテナーでの RU の消費量を必ず監視してください。Remember to monitor your RU consumption on the leases container if you decide to scale and increment the number of instances.

開始時刻Starting time

既定では、変更フィード プロセッサは、最初に開始したときに、リース コンテナーを初期化し、その処理のライフ サイクルを開始します。By default, when a change feed processor starts the first time, it will initialize the leases container, and start its processing life cycle. 変更フィード プロセッサが初めて初期化される前に監視対象コンテナー内で発生した変更が検出されることはありません。Any changes that happened in the monitored container before the change feed processor was initialized for the first time won't be detected.

以前の日時からの読み取りReading from a previous date and time

DateTime のインスタンスを WithStartTime ビルダー拡張機能に渡すことで、 特定の日時 以降の変更を読み取るよう変更フィード プロセッサを初期化することができます。It's possible to initialize the change feed processor to read changes starting at a specific date and time , by passing an instance of a DateTime to the WithStartTime builder extension:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

変更フィード プロセッサは、その特定の日時に対して初期化され、それ以降に発生した変更の読み取りを開始します。The change feed processor will be initialized for that specific date and time and start reading the changes that happened after.

最初からの読み取りReading from the beginning

データの移行やコンテナーの履歴全体の分析など、他のシナリオでは、 そのコンテナーの有効期間の最初 から変更フィードを読み取る必要があります。In other scenarios like data migrations or analyzing the entire history of a container, we need to read the change feed from the beginning of that container's lifetime. これを行うために、ビルダー拡張機能で WithStartTime を使用できますが、DateTime.MinValue.ToUniversalTime() を渡すと、次のように DateTime の最小値の UTC 表現が生成されます。To do that, we can use WithStartTime on the builder extension, but passing DateTime.MinValue.ToUniversalTime(), which would generate the UTC representation of the minimum DateTime value, like so:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

この変更フィード プロセッサは初期化され、コンテナーの有効期間の最初から変更の読み取りを開始します。The change feed processor will be initialized and start reading changes from the beginning of the lifetime of the container.

注意

これらのカスタマイズ オプションは、変更フィード プロセッサの開始時点を設定するためだけに機能します。These customization options only work to setup the starting point in time of the change feed processor. リース コンテナーが初めて初期化された後、それらを変更しても影響はありません。Once the leases container is initialized for the first time, changing them has no effect.

変更フィード プロセッサをホストする場所Where to host the change feed processor

変更フィード プロセッサは、長時間実行されるプロセスまたはタスクをサポートする任意のプラットフォームでホストできます。The change feed processor can be hosted in any platform that supports long running processes or tasks:

変更フィード プロセッサは有効期間が短い環境で実行できますが、その状態がリース コンテナーによって維持されるため、これらの環境の開始および停止サイクルでは、通知の受信に遅延が加えられます (環境が開始されるたびにプロセッサを開始するオーバーヘッドが生じるため)。While change feed processor can run in short lived environments, because the lease container maintains the state, the startup cycle of these environments will add delay to receiving the notifications (due to the overhead of starting the processor every time the environment is started).

その他のリソースAdditional resources

次のステップNext steps

以下の記事で、変更フィード プロセッサに関してさらに詳しく知ることができます。You can now proceed to learn more about change feed processor in the following articles: