Azure Cosmos DB での Transactional Outbox パターン

Azure Cosmos DB
Azure Service Bus
Azure Functions

分散システムに信頼性の高いメッセージングを実装することは困難な場合があります。 この記事では、Transactional Outbox パターンを使用して、信頼できるメッセージングとイベントの確実な配信を実現する方法について説明します。これは、べき等メッセージ処理のサポートにおける重要な部分です。 これを実現するには、Azure Cosmos DB トランザクション バッチと変更フィードを Azure Service Bus と組み合わせて使用します。

概要

マイクロサービス アーキテクチャの人気はますます高まっており、特に大規模なアプリケーションで、スケーラビリティ、保守容易性、機敏性などの問題を解決することが期待されています。 ただし、このアーキテクチャ パターンにより、データ処理に関する課題ももたらされます。 分散アプリケーションでは、各サービスは、動作するために必要なデータをサービスが所有する専用のデータストアで個別に保持します。 このようなシナリオをサポートするため、通常は 1 つのサービスからメッセージング バスを介してアプリケーションの他のサービスにデータ (イベント) を配信するメッセージング ソリューション (RabbitMQ、Kafka、Azure Service Bus など) を使用します。 内部または外部のコンシューマーは、これらのメッセージをサブスクライブして、データが操作されるとすぐに変更の通知を受け取ることができます。

その分野でよく知られている例が、注文システムです。ユーザーが注文を作成すると、Ordering サービスは REST エンドポイントを介してクライアント アプリケーションからデータを受信します。 データを検証するために、ペイロードを Order オブジェクトの内部表現にマップします。 データベースへのコミットが正常に行われると、メッセージ バスに OrderCreated イベントが発行されます。 新しい注文に関心のある他のサービス (InventoryInvoicing サービスなど) は、OrderCreated メッセージをサブスクライブして処理し、独自のデータベースに保存します。

次の擬似コードは、このプロセスが通常、Ordering サービスの観点からどのように見えるかを示しています。

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

この方法は、order オブジェクトの保存と対応するイベントの発行の間にエラーが発生するまで正常に機能します。 次のようなさまざまな理由により、この時点でイベントの送信が失敗する可能性があります。

  • ネットワーク エラー
  • メッセージ サービスの停止
  • ホスト エラー

エラーが何であれ、結果として、OrderCreated イベントをメッセージ バスに発行できません。 他のサービスには、注文が作成されたことは通知されません。 Ordering サービスは今や、実際のビジネス プロセスとは関係のないさまざまな処理を行う必要があります。 オンラインに戻ったらすぐにメッセージ バスに配置する必要があるイベントを追跡する必要があります。 イベントを失うことで、アプリケーションでデータの不整合が発生する最悪の事態が発生する可能性があります。

Diagram that shows event handling without the Transactional Outbox pattern.

解決策

これらの状況を回避するのに役立つ Transactional Outbox と呼ばれるよく知られたパターンがあります。 これにより、イベントが最終的にメッセージ ブローカーにプッシュされる前に、イベントがデータストア (通常はデータベースの Outbox テーブル) に保存されるようになります。 ビジネス オブジェクトとそれに対応するイベントが同じデータベース トランザクション内に保存されている場合は、データが失われないことが保証されます。 すべてがコミットされるか、エラーが発生した場合はすべてがロールバックされます。 最終的にイベントを発行するために、別のサービスまたはワーカー プロセスにより、Outbox テーブルに未処理のエントリが照会され、イベントが発行され、それらが処理済みとしてマークされます。 このパターンにより、ビジネス オブジェクトの作成または変更後にイベントが失われることはありません。

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

このアーキテクチャの Visio ファイルをダウンロードします。

リレーショナル データベースでは、パターンの実装は簡単です。 たとえば、サービスで Entity Framework Core が使用されている場合、Entity Framework コンテキストを使用してデータベース トランザクションが作成され、ビジネス オブジェクトとイベントが保存され、トランザクションのコミットまたはロールバックが実行されます。 イベントを処理するワーカー サービスも簡単に実装できます。これにより Outbox テーブルに対して新しいエントリが定期的に照会され、新しく挿入されたイベントがメッセージ バスに発行され、最後にこれらのエントリが処理済みとしてマークされます。

実際には、最初に見たときほど簡単ではありません。 最も重要なことは、OrderUpdated イベントが OrderCreated イベントの前に発行されないように、イベントの順序が保持されていることを確認する必要がある点です。

Azure Cosmos DB での実装

このセクションでは、Azure Cosmos DB に Transactional Outbox パターンを実装して、Azure Cosmos DB の変更フィードと Service Bus を活用して、さまざまなサービス間で信頼性の高い順序どおりのメッセージングを実現する方法について説明します。 Contact オブジェクト (FirstNameLastNameEmailCompany 情報など) を管理するサンプル サービスを示します。 これはコマンド クエリ責務分離 (CQRS) パターンを使用し、ドメイン駆動設計の基本的な概念に従っています。 実装のサンプル コードは、GitHub にあります。

サンプル サービスの Contact オブジェクトの構造は次のとおりです。

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Contact が作成または更新されるとすぐに、現在の変更に関する情報を含むイベントが発行されます。 特に、ドメイン イベントは次になる場合があります。

  • ContactCreated。 連絡先が追加されたときに発生します。
  • ContactNameUpdatedFirstName または LastName が変更されたときに発生します。
  • ContactEmailUpdated。 メール アドレスが更新されたときに発生します。
  • ContactCompanyUpdated。 会社のプロパティのいずれかが変更されたときに発生します。

トランザクション バッチ

このパターンを実装するには、Contact ビジネス オブジェクトと対応するイベントが同じデータベース トランザクションに保存されるようにする必要があります。 Azure Cosmos DB では、トランザクションはリレーショナル データベース システムとは異なる方法で動作します。 "トランザクション バッチ"と呼ばれる Azure Cosmos DB トランザクションは、ACID (Atomicity (不可分性)、Consistency (一貫性)、Isolation (独立性)、Durability (永続性)) プロパティを保証するために、単一の論理パーティションで動作します。 トランザクション バッチ操作で 2 つのドキュメントを異なるコンテナーまたは論理パーティションに保存することはできません。 サンプル サービスの場合、これはビジネス オブジェクトと 1 つまたは複数のイベントの両方が同じコンテナーと論理パーティションに配置されることを意味します。

コンテキスト、リポジトリ、UnitOfWork

サンプル実装の中核となるのは、同じトランザクション バッチに保存されるオブジェクトを追跡する "コンテナー コンテキスト"です。 これは作成および変更されたオブジェクトのリストを保持し、単一の Azure Cosmos DB コンテナーで動作します。 インターフェイスは次のようになります。

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

コンテナー コンテキスト コンポーネントのリストは、ContactDomainEvent のオブジェクトを追跡します。 どちらも同じコンテナーに配置されます。 つまり、複数の種類のオブジェクトが同じ Azure Cosmos DB コンテナーに格納され、Type プロパティを使用してビジネス オブジェクトとイベントを区別します。

種類ごとに、データ アクセスを定義して実装する専用リポジトリがあります。 Contact リポジトリ インターフェイスには、次のメソッドが用意されています。

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Event リポジトリは似ていますが、ストアに新しいイベントを作成するメソッドが 1 つしかない点が異なります。

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

両方のリポジトリ インターフェイスを実装すると、単一の IContainerContext インスタンスへの依存関係の挿入を介して参照が取得され、両方が同じ Azure Cosmos DB コンテキストで動作することが保証されます。

最後のコンポーネントは UnitOfWork です。これにより、IContainerContext インスタンスに保持されている変更が Azure Cosmos DB にコミットされます。

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

イベント処理: 作成と発行

Contact オブジェクトが作成、変更、または (論理的に) 削除されるたび、サービスによって対応するイベントが発生されます。 提供されるソリューションの中核となるのは、ドメイン駆動設計 (DDD) と、Jimmy Bogard によって提案されたメディエーター パターンの組み合わせです。 彼は、ドメイン オブジェクトの変更によって発生したイベントのリストを保持し、実際のオブジェクトをデータベースに保存する前に、これらのイベントを発行することを提案しています。

変更のリストはドメイン オブジェクト自体に保持されるため、他のコンポーネントがイベントのチェーンを変更することはできません。 ドメイン オブジェクト内のイベント (IEvent インスタンス) を保持する動作は、インターフェイス IEventEmitter<IEvent> を介して定義され、抽象 DomainEntity クラスに実装されます。

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Contact オブジェクトによってドメイン イベントが発生します。 Contact エンティティは DDD の基本的な概念に従い、ドメイン プロパティのセッターをプライベートとして構成します。 このクラスにはパブリック セッターは存在しません。 代わりに、内部状態を操作するメソッドが用意されています。 これらのメソッドでは、特定の変更 (ContactNameUpdated または ContactEmailUpdated など) に対して適切なイベントを発生させることができます。

連絡先の名前の更新の例を次に示します。 (イベントはメソッドの最後に発生します。)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

変更を追跡する対応する ContactNameUpdatedEvent は、次のようになります。

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

これまで、イベントはドメイン オブジェクトにログされるだけで、データベースに保存されることも、メッセージ ブローカーに発行されることもありません。 推奨事項に従って、イベントのリストは、ビジネス オブジェクトがデータストアに保存される直前に処理されます。 この場合、プライベート RaiseDomainEvents メソッドで実装される、IContainerContext インスタンスの SaveChangesAsync メソッドで発生します。 (dObjs は、コンテナー コンテキストの追跡対象エンティティのリストです。)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

最後の行では、アプリケーション内でイベントを発行するために、MediatR パッケージ (C# のメディエーター パターンの実装) が使用されています。 ContactNameUpdatedEvent のようなすべてのイベントが MediatR パッケージの INotification インターフェイスを実装しているため、そうすることが可能になっています。

これらのイベントは、対応するハンドラーによって処理する必要があります。 ここでは、IEventsRepository の実装が機能します。 NameUpdated イベント ハンドラーのサンプルを次に示します。

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

IEventRepository インスタンスは、コンストラクターを介してハンドラー クラスに挿入されます。 サービスで ContactNameUpdatedEvent が発行されるとすぐに、Handle メソッドが呼び出され、イベント リポジトリ インスタンスを使用して通知オブジェクトが作成されます。 次に、その通知オブジェクトが、IContainerContext オブジェクトの追跡対象オブジェクトのリストに挿入され、同じトランザクション バッチに保存されているオブジェクトが Azure Cosmos DB に結合されます。

ここまでで、コンテナー コンテキストで処理するオブジェクトが認識されています。 追跡対象オブジェクトを最終的に Azure Cosmos DB に保持するために、IContainerContext 実装によってトランザクション バッチが作成され、関連するすべてのオブジェクトが追加され、データベースに対して操作が実行されます。 説明されているプロセスは、SaveChangesAsync メソッドによって呼び出される SaveInTransactionalBatchAsync メソッドで処理されます。

トランザクション バッチを作成して実行するために必要な実装の重要な部分を次に示します。

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

(連絡先オブジェクトの名前を更新するための) ここまでのプロセスの概要を次に示します。

  1. クライアントは連絡先の名前を更新する必要があります。 SetName メソッド が連絡先オブジェクトで呼び出され、プロパティが更新されます。
  2. ContactNameUpdated イベントが、ドメイン オブジェクト内のイベントのリストに追加されます。
  3. 連絡先リポジトリの Update メソッドが呼び出されます。これにより、ドメイン オブジェクトがコンテナー コンテキストに追加されます。 これで、このオブジェクトが追跡されるようになります。
  4. CommitAsyncUnitOfWork インスタンスで呼び出され、これにより SaveChangesAsync がコンテナー コンテキストで呼び出されます。
  5. SaveChangesAsync 内で、ドメイン オブジェクトのリスト内のすべてのイベントは、MediatR インスタンスによって発行され、イベント リポジトリを介して "同じコンテナー コンテキスト"に追加されます。
  6. SaveChangesAsyncTransactionalBatch が作成されます。 連絡先オブジェクトとイベントの両方が保持されます。
  7. TransactionalBatch が実行され、データが Azure Cosmos DB にコミットされます。
  8. SaveChangesAsyncCommitAsync が正常に返されます。

永続性

上記のコード スニペットでわかるように、Azure Cosmos DB に保存されたすべてのオブジェクトは、DataObject インスタンスにラップされます。 このオブジェクトにより、次の一般的なプロパティが提供されます。

  • ID
  • PartitionKey
  • Type
  • StateCreated と同様に、Updated はAzure Cosmos DB に保持されません。
  • Etagオプティミスティック ロック用。
  • TTLTime To Live プロパティ。前のドキュメントを自動クリーンアップします。
  • Data。 汎用データ オブジェクト。

これらのプロパティは、IDataObject と呼ばれる汎用インターフェイスで定義され、リポジトリとコンテナー コンテキストで使用されます。


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

DataObject インスタンスにラップされデータベースに保存されたオブジェクトは、次のサンプル (ContactContactNameUpdatedEvent) のようになります。

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

ContactContactNameUpdatedEvent (domainEvent 型) のドキュメントが同じパーティション キーを持ち、両方のドキュメントが同じ論理パーティションに保持されることがわかります。

変更フィードの処理

イベントのストリームを読み取り、それらをメッセージ ブローカーに送信するために、サービスでは Azure Cosmos DB 変更フィードが使用されます。

変更フィードは、コンテナー内の変更の永続的なログです。 バックグラウンドで動作し、変更を追跡します。 1 つの論理パーティション内では、変更の順序が保証されます。 変更フィードを読み取る最も便利な方法は、Azure Cosmos DB トリガーで Azure 関数を使用することです。 もう 1 つのオプションは、変更フィード プロセッサ ライブラリを使用することです。 これにより、(IHostedService インターフェイスを介して) 変更フィード処理をバックグラウンド サービスとして Web API に統合できます。 このサンプルでは、抽象クラス BackgroundService を実装する単純なコンソール アプリケーションを使用して、.NET Core アプリケーションで実行時間の長いバックグラウンド タスクをホストします。

Azure Cosmos DB 変更フィードから変更を受け取るには、ChangeFeedProcessor オブジェクトをインスタンス化し、メッセージ処理用のハンドラー メソッドを登録して、変更のリッスンを開始する必要があります。

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

次に、ハンドラー メソッド (この場合は HandleChangesAsync) によってメッセージが処理されます。 このサンプルでは、スケーラビリティのためにパーティション化され、重複除去機能が有効になっている Service Bus トピックにイベントが発行されます。 Contact オブジェクトの変更に関心があるサービスは、その Service Bus ト ピックをサブスクライブして受信し、独自のコンテキスト向けに変更を処理できます。

生成される Service Bus メッセージには SessionId プロパティがあります。 Service Bus でセッションを使用すると、メッセージの順序が保持されます (FIFO)。 このユース ケースでは、順序を維持する必要があります。

変更フィードからのメッセージを処理するスニペットを次に示します。

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

エラー処理

変更の処理中にエラーが発生した場合、変更フィード ライブラリは、最後のバッチが正常に処理された位置でメッセージの読み取りを再開します。 たとえば、アプリケーションが 10,000 件のメッセージを正常に処理し、バッチ 10,001 から 10,025 の作業中にエラーが発生した場合は、再起動して 10,001 の位置で作業を開始できます。 このライブラリにより、Azure Cosmos DB の Leases コンテナーに保存された情報を介して処理されたものが自動的に追跡されます。

サービスによって、再処理されたメッセージの一部が既に Service Bus に送信されている可能性があります。 通常、そのシナリオでは、メッセージ処理が重複します。 前に説明したように、Service Bus には重複メッセージ検出の機能があり、こうしたシナリオで有効にする必要があります。 アプリケーションによって制御されるメッセージの MessageId プロパティに基づいて、メッセージが既に Service Bus トピック (またはキュー) に追加されているかどうかがサービスによってチェックされます。 そのプロパティは、イベント ドキュメントの ID に設定されます。 同じメッセージが Service Bus に再度送信されると、サービスではそれは無視されドロップされます。

ハウスキープ処理

一般的な Transactional Outbox の実装では、サービスにより処理されたイベントが更新され、Processed プロパティがtrue に設定され、メッセージが正常に発行されたことが示されます。 この動作は、ハンドラー メソッドで手動で実装できます。 現在のシナリオでは、このようなプロセスは必要ありません。 Azure Cosmos DB は、(Leases コンテナーと組み合わせて) 変更フィードを使用して処理されたイベントを追跡します。

最後の手順として、最新のレコードまたはドキュメントのみを保持するように、時々コンテナーからイベントを削除する必要があります。 定期的にクリーンアップを行うために、実装で Azure Cosmos DB の別の機能である Time To Live (TTL) がドキュメントに適用されます。 Azure Cosmos DB は、ドキュメントに追加できる TTL プロパティ (秒単位の期間) に基づいて、ドキュメントを自動的に削除できます。 このサービスにより、TTL プロパティを持つドキュメントのコンテナーが常にチェックされます。 ドキュメントの有効期限が切れるとすぐに、Azure Cosmos DB によってそれがデータベースから削除されます。

すべてのコンポーネントが期待した通りに動作すると、イベントが処理され、数秒で迅速に発行されます。 Azure Cosmos DB でエラーが発生すると、イベントがメッセージ バスに送信されません。これはビジネス オブジェクトと対応するイベントの両方をデータベースに保存できないためです。 考慮すべき唯一の点は、バックグラウンド ワーカー (変更フィード プロセッサ) または Service Bus が使用できない場合に、DomainEvent ドキュメントに適切な TTL 値を設定することです。 実稼働環境では、複数日の期間を選択することをお勧めします。 たとえば、10 日です。 こうすると、関連するすべてのコンポーネントが、アプリケーション内で変更を処理または発行するのに十分な時間を持つことができます。

まとめ

Transactional Outbox パターンにより、分散システムでドメイン イベントを確実に発行する問題が解決されます。 ビジネス オブジェクトの状態とそのイベントを同じトランザクション バッチでコミットし、バックグラウンド プロセッサをメッセージ リレーとして使用することで、他のサービス (内部または外部) が最終的に依存する情報を確実に受信できるようになります。 このサンプルは、Transactional Outbox パターンの従来の実装ではありません。 Azure Cosmos DB 変更フィードや Time To Live などの機能を使用して、物事をシンプルかつクリーンに保ちます。

このシナリオで使用されている Azure コンポーネントの概要を次に示します。

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

このアーキテクチャの Visio ファイルをダウンロードします。

このソリューションの利点は次のとおりです。

  • 信頼性の高いメッセージングとイベントの確実な配信。
  • Service Bus を使用したイベントの保存された順序とメッセージの重複除去。
  • イベント ドキュメントの処理が正常に行われたことを示す追加の Processed プロパティを維持する必要がない。
  • TTL を使用した Azure Cosmos DB からのイベントの削除。 このプロセスでは、ユーザーまたはアプリケーションの要求を処理するために必要な要求ユニットは使用されません。 代わりに、バックグラウンド タスクで "残りの" 要求ユニットが使用されます。
  • ChangeFeedProcessor (または Azure 関数) を使用したメッセージのエラー防止処理。
  • 省略可能: それぞれが変更フィードに独自のポインターを保持する複数の変更フィード プロセッサ。

考慮事項

この記事で説明したサンプル アプリケーションは、Azure Cosmos DB と Service Bus を使用して Azure に Transactional Outbox パターンを実装する方法を示しています。 NoSQL データベースを使用する他の方法もあります。 ビジネス オブジェクトとイベントがデータベースに確実に保存されることを保証するために、イベントのリストをビジネス オブジェクト ドキュメントに埋め込むことができます。 この方法の欠点は、クリーンアップ プロセスでイベントを含む各ドキュメントを更新する必要がある点です。 これは、TTL を使用する場合と比較して、特に要求ユニットのコストの観点からは理想的ではありません。

ここで提供されているサンプル コードを、運用環境に対応したコードと見なすべきではありません。 マルチスレッドに関していくつかの制限があり、とりわけ、DomainEntity クラスでのイベントの処理方法と、CosmosContainerContext 実装でのオブジェクトの追跡方法に制限があります。 これは独自の実装の開始点として使用してください。 または、NServiceBusMassTransit などのこの機能が既に組み込まれている既存のライブラリを使用することを検討してください。

このシナリオのデプロイ

このシナリオをテストするためのソース コード、展開ファイル、手順は、GitHub (https://github.com/mspnp/transactional-outbox-pattern) にあります。

共同作成者

この記事は、Microsoft によって保守されています。 当初の寄稿者は以下のとおりです。

プリンシパル作成者:

パブリックでない LinkedIn プロファイルを表示するには、LinkedIn にサインインします。

次のステップ

詳細については、次の記事を参照してください。