Azure Event Hubs との間でイベントを送受信する - .NET (Azure.Messaging.EventHubs)

このクイックスタートでは、Azure.Messaging.EventHubs .NET ライブラリを使用して、イベント ハブとの間でイベントを送受信する方法について説明します。

前提条件

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。

このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。

  • Microsoft Azure サブスクリプション。 Azure Event Hubs を含む Azure サービスを使用するには、サブスクリプションが必要です。 既存の Microsoft Azure アカウントをお持ちでない場合は、アカウントを作成する際に、無料試用版にサインアップするか、MSDN サブスクライバー特典を利用できます。
  • Microsoft Visual Studio 2019。 Azure Event Hubs クライアント ライブラリでは、C# 8.0 で導入された新機能を利用しています。 以前のバージョンの C# 言語でもライブラリを使用できますが、新しい構文は利用できません。 完全な構文を使用するには、[.NET Core SDK](https://dotnet.microsoft.com/download) 3.0 以上で、[言語バージョン](/dotnet/csharp/language-reference/configure-language-version#override-a-default)`latest` に設定してコンパイルすることをお勧めします。 Visual Studio を使用している場合、Visual Studio 2019 より前のバージョンには、C# 8.0 プロジェクトをビルドするために必要なツールとの互換性がありません。 無料の Community エディションを含む Visual Studio 2019 は、こちらからダウンロードできます。
  • Event Hubs 名前空間とイベント ハブを作成する。 最初の手順では、Azure Portal を使用して Event Hubs 型の名前空間を作成し、アプリケーションがイベント ハブと通信するために必要な管理資格情報を取得します。 名前空間とイベント ハブを作成するには、こちらの記事の手順に従います。 その後、次の記事の手順に従って、Event Hubs 名前空間用の接続文字列を取得します: 接続文字列を取得する。 この接続文字列は、このクイックスタートの後の手順で必要になります。

送信イベント

このセクションでは、イベント ハブにイベントを送信する .NET Core コンソール アプリケーションの作成方法を説明します。

コンソール アプリケーションの作成

  1. Visual Studio 2019 を起動します。

  2. [新しいプロジェクトの作成] を選択します。

  3. **[新しいプロジェクトの作成]** ダイアログ ボックスで、次の手順に従います。このダイアログ ボックスが表示されない場合は、メニューで **[ファイル]****[新規]****[プロジェクト]** の順に選択します。

    1. プログラミング言語として **[C#]** を選択します。

    2. アプリケーションの種類として [コンソール] を選択します。

    3. 結果リストから **[コンソール アプリケーション]** を選択します。

    4. 次に、 [次へ] を選択します。

      Image showing the New Project dialog box

  4. プロジェクト名として「EventHubsSender」、ソリューション名として「EventHubsQuickStart」と入力し、 [OK] を選択してプロジェクトを作成します。

    Image showing the page where you enter solution and project names

Event Hubs NuGet パッケージの追加

  1. メニューから [ツール]>[NuGet パッケージ マネージャー]>[パッケージ マネージャー コンソール] の順に選択します。

  2. 次のコマンドを実行して、Azure.Messaging.EventHubs NuGet パッケージをインストールします。

    Install-Package Azure.Messaging.EventHubs
    

イベント ハブにイベントを送信するコードの記述

  1. Program.cs ファイルの先頭に次の using ステートメントを追加します。

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. Event Hubs 接続文字列およびイベント ハブ名を表す定数を、Program クラスに追加します。

        // connection string to the Event Hubs namespace
        private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    
        // name of the event hub
        private const string eventHubName = "<EVENT HUB NAME>";
    
        // number of events to be sent to the event hub
        private const int numOfEvents = 3;
    

    Note

    プレースホルダーの値を、名前空間への接続文字列とイベント ハブの名前に置き換えます。 接続文字列が名前空間レベルの接続文字列であることを確認します。

  3. Program クラスに次の静的プロパティを追加します。 コードのコメントを参照してください。

        // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
        // of the application, which is best practice when events are being published or read regularly.
        static EventHubProducerClient producerClient;    
    
  4. Main メソッドを次の async Main メソッドで置き換えます。 詳細については、コードのコメントを参照してください。 コードの重要な手順を次に示します。

    1. 名前空間へのプライマリ接続文字列とイベント ハブの名前を使って、EventHubProducerClient オブジェクトを作成します。
    2. EventHubProducerClient オブジェクトで CreateBatchAsync メソッドを呼び出して、EventDataBatch オブジェクトを作成します。
    3. EventDataBatch.TryAdd メソッドを使って、バッチにイベントを追加します。
    4. EventHubProducerClient.SendAsync メソッドを使って、メッセージのバッチをイベント ハブに送信します。
        static async Task Main()
        {
            // Create a producer client that you can use to send events to an event hub
            producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
            // Create a batch of events 
            using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
            for (int i = 1; i <= numOfEvents; i++)
            {
                if (! eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"Event {i}"))))
                {
                    // if it is too large for the batch
                    throw new Exception($"Event {i} is too large for the batch and cannot be sent.");
                }
            }
    
            try
            {
                // Use the producer client to send the batch of events to the event hub
                await producerClient.SendAsync(eventBatch);
                Console.WriteLine($"A batch of {numOfEvents} events has been published.");
            }
            finally
            {
                await producerClient.DisposeAsync();
            }
        }
    
  5. プロジェクトをビルドし、エラーがないことを確認します。

  6. プログラムを実行し、確認メッセージが表示されるまで待ちます。

    A batch of 3 events has been published.
    
  7. Azure portal で、イベント ハブがイベントを受信したことを確認できます。 [メトリック] セクションで [メッセージ] ビューに切り替えます。 ページを最新の情報に更新して、グラフを更新します。 メッセージが受信されたことが示されるまでに数秒かかることがあります。

    Image of the Azure portal page to verify that the event hub received the events

    注意

    より多くの情報を含むコメント付きの完全なソース コードについては、GitHub 上のこちらのファイルを参照してください

受信イベント

このセクションでは、イベント プロセッサを使用してイベント ハブからイベントを受信する .NET Core コンソール アプリケーションを作成する方法について説明します。 イベント プロセッサは、永続的なチェックポイントとそれらのイベント ハブからの並列受信を管理することによって、イベント ハブからのイベントの受信を簡素化します。 イベント プロセッサは、特定のイベントハブとコンシューマー グループに関連付けられています。 イベント ハブ内の複数のパーティションからイベントを受け取り、指定されたコードを使用して処理できるようにハンドラー デリゲートに渡します。

警告

このコードを Azure Stack Hub で実行すると、特定の Storage API バージョンを対象としている場合を除き、実行時エラーが発生します。 これは、Event Hubs SDK では、Azure で利用できる最新の Azure Storage API が使用されますが、Azure Stack Hub プラットフォームではこれを利用できない可能性があるためです。 Azure Stack Hub でサポートされる Storage Blob SDK のバージョンは、Azure で一般的に利用できるバージョンと異なる場合があります。 チェックポイント ストアとして Azure Blob Storage を使用している場合は、Azure Stack Hub ビルドでサポートされている Azure Storage API バージョンを確認し、コード内でそのバージョンを対象にします。

たとえば、Azure Stack Hub バージョン 2005 上で実行している場合、Storage サービスで利用できる最も高いバージョンは 2019-02-02 となります。 既定では、Event Hubs SDK クライアント ライブラリでは、Azure で利用できる最も高いバージョン (SDK のリリース時点では 2019-07-07) が使用されます。 この場合は、このセクションの手順に従うことに加え、Storage サービス API バージョン 2019-02-02 を対象とするコードを追加する必要もあります。 特定の Storage API バージョンを対象にする方法の例については、GitHub のサンプルを参照してください。

Azure Storage と BLOB コンテナーを作成する

このクイックスタートでは、チェックポイント ストアとして Azure Storage を使用します。 Azure ストレージ アカウントを作成するには、次の手順に従います。

  1. Azure ストレージ アカウントの作成

  2. BLOB コンテナーを作成する

  3. ストレージ アカウントへの接続文字列を取得する

    接続文字列とコンテナー名を書き留めておきます。 これらは、受信コードで使用します。

受信側のプロジェクトを作成する

  1. ソリューション エクスプローラー ウィンドウで、 [EventHubQuickStart] ソリューションを右クリックし、 [追加] をポイントして、 [新しいプロジェクト] を選択します。
  2. [コンソール アプリケーション] を選択し、 [次へ] を選択します。
  3. [プロジェクト名] に「EventHubsReceiver」と入力し、 [作成] を選択します。
  4. ソリューション エクスプローラー ウィンドウで、 [EventHubsReceiver] を右クリックし、 [Set as a Startup Project](スタートアップ プロジェクトとして設定) を選択します。

Event Hubs NuGet パッケージの追加

  1. メニューから [ツール]>[NuGet パッケージ マネージャー]>[パッケージ マネージャー コンソール] の順に選択します。

  2. [パッケージ マネージャー コンソール] ウィンドウで、 [既定のプロジェクト][EventHubsReceiver] が選択されていることを確認します。 そうでない場合は、ドロップダウン リストを使用して [EventHubsReceiver] を選択します。

  3. 次のコマンドを実行して、Azure.Messaging.EventHubs NuGet パッケージをインストールします。

    Install-Package Azure.Messaging.EventHubs
    
  4. 次のコマンドを実行して、Azure.Messaging.EventHubs.Processor NuGet パッケージをインストールします。

    Install-Package Azure.Messaging.EventHubs.Processor
    

Main メソッドを更新する

  1. Program.cs ファイルの先頭に次の using ステートメントを追加します。

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Event Hubs 接続文字列およびイベント ハブ名を表す定数を、Program クラスに追加します。 山かっこ内のプレースホルダーを、イベント ハブの作成時に取得した適切な値で置き換えます。 山かっこ内のプレースホルダーを、イベント ハブとストレージ アカウントの作成時に取得した適切な値 (アクセス キーとプライマリ接続文字列) で置き換えます。 {Event Hubs namespace connection string} がイベント ハブの文字列ではなく名前空間レベルの接続文字列であることを確認します。

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    
  3. Program クラスに次の静的プロパティを追加します。

        static BlobContainerClient storageClient;
    
        // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
        // of the application, which is best practice when events are being published or read regularly.        
        static EventProcessorClient processor;    
    
  4. Main メソッドを次の async Main メソッドで置き換えます。 詳細については、コードのコメントを参照してください。 コードの重要な手順を次に示します。

    1. 名前空間へのプライマリ接続文字列とイベント ハブを使って、EventProcessorClient オブジェクトを作成します。 前に作成した Azure Storage でコンテナー用の BlobContainerClient オブジェクトを作成する必要があります。
    2. Eventprocessorclient オブジェクトの ProcessEventAsync および ProcessErrorAsync イベント用のハンドラーを指定します。
    3. EventProcessorClient オブジェクトで StartProcessingAsync を呼び出して、イベントの処理を開始します。
    4. ユーザーが任意のキーを押して処理を終了すると、EventProcessorClient オブジェクトの StopProcessingAsync が呼び出されます。
        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 30 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(30));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }
    
  5. ここで、次のイベントおよびエラー ハンドラー メソッドをクラスに追加します。

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  6. プロジェクトをビルドし、エラーがないことを確認します。

    注意

    より多くの情報を含むコメント付きの完全なソース コードについては、GitHub 上のこちらのファイルを参照してください。

  7. 受信側アプリを実行します。

  8. イベントが受信されたことを示すメッセージが表示されます。

    Received event: Event 1
    Received event: Event 2
    Received event: Event 3    
    

    これらのイベントは、前に送信側プログラムを実行してイベント ハブに送信した 3 つのイベントです。

  9. Azure portal では、3 つの送信メッセージがあることを確認できます。 これらは、Event Hubs が受信アプリケーションに送信したメッセージです。 ページを最新の情報に更新して、グラフを更新します。 メッセージが受信されたことが示されるまでに数秒かかることがあります。

    Image of the Azure portal page to verify that the event hub sent events to the receiving app

次の手順

このクイックスタートでは、イベントのバッチをイベント ハブに送信し、それらを受信するという簡単なシナリオを実装するステップバイステップの手順を説明します。 他のシナリオや高度なシナリオに関する他のサンプルについては、GitHub で次のサンプルを参照してください。