Azure Event Hubs との間でイベントを送受信する - .NET (Azure.Messaging.EventHubs)

このクイックスタートでは、Azure.Messaging.EventHubs .NET ライブラリを使用して、イベント ハブとの間でイベントを送受信する方法について説明します。

重要

このクイックスタートでは、新しい Azure.Messaging.EventHubs ライブラリを使用します。 以前の Microsoft.Azure.EventHubs ライブラリを使用するクイックスタートについては、Microsoft.Azure.EventHubs ライブラリを使用したイベントの送受信に関するページを参照してください。

前提条件

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。

このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。

  • Microsoft Azure サブスクリプション。 Azure Event Hubs を含む Azure サービスを使用するには、サブスクリプションが必要です。 既存の Microsoft Azure アカウントをお持ちでない場合は、アカウントを作成する際に、無料試用版にサインアップするか、MSDN サブスクライバー特典を利用できます。
  • Microsoft Visual Studio 2019。 Azure Event Hubs クライアント ライブラリでは、C# 8.0 で導入された新機能を利用しています。 以前のバージョンの C# 言語でライブラリを使用することもできますが、新しい構文は使用できません。 完全な構文を使用するには、.NET Core SDK 3.0 以降および言語バージョンlatest に設定して、コンパイルすることをお勧めします。 Visual Studio を使用している場合、Visual Studio 2019 より前のバージョンには、C# 8.0 プロジェクトをビルドするために必要なツールとの互換性がありません。 無料の Community エディションを含む Visual Studio 2019 は、こちらからダウンロードできます。
  • Event Hubs 名前空間とイベント ハブを作成する。 最初の手順では、Azure Portal を使用して Event Hubs 型の名前空間を作成し、アプリケーションがイベント ハブと通信するために必要な管理資格情報を取得します。 名前空間とイベント ハブを作成するには、こちらの記事の手順に従います。 その後、次の記事の手順に従って、Event Hubs 名前空間用の接続文字列 を取得します: 接続文字列を取得する。 この接続文字列は、このクイックスタートの後の手順で必要になります。

送信イベント

このセクションでは、イベント ハブにイベントを送信する .NET Core コンソール アプリケーションの作成方法を説明します。

コンソール アプリケーションの作成

  1. Visual Studio 2019 を起動します。

  2. [新しいプロジェクトの作成] を選択します。

  3. [新しいプロジェクトの作成] ダイアログ ボックスで、次の手順に従います。このダイアログ ボックスが表示されない場合は、メニューで [ファイル][新規][プロジェクト] の順に選択します。

    1. プログラミング言語として [C#] を選択します。

    2. アプリケーションの種類として [コンソール] を選択します。

    3. 結果の一覧から [コンソール アプリ (.NET Core)] を選択します。

    4. 次に、 [次へ] を選択します。

      [新しいプロジェクト] ダイアログ ボックス

  4. プロジェクト名として「EventHubsSender」、ソリューション名として「EventHubsQuickStart」と入力し、 [OK] を選択してプロジェクトを作成します。

    [C#] > [コンソール] アプリ

Event Hubs NuGet パッケージの追加

  1. メニューから [ツール] > [NuGet パッケージ マネージャー] > [パッケージ マネージャー コンソール] の順に選択します。

  2. 次のコマンドを実行して、Azure.Messaging.EventHubs NuGet パッケージをインストールします。

    Install-Package Azure.Messaging.EventHubs
    

イベント ハブにメッセージを送信するコードの記述

  1. Program.cs ファイルの先頭に次の using ステートメントを追加します。

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. Event Hubs 接続文字列およびイベント ハブ名を表す定数を、Program クラスに追加します。 山かっこ内のプレースホルダーを、イベント ハブの作成時に取得した適切な値で置き換えます。 {Event Hubs namespace connection string} がイベント ハブの文字列ではなく名前空間レベルの接続文字列であることを確認します。

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. Main メソッドを次の async Main メソッドで置き換えます。 詳細については、コードのコメントを参照してください。

        static async Task Main()
        {
            // Create a producer client that you can use to send events to an event hub
            await using (var producerClient = new EventHubProducerClient(connectionString, eventHubName))
            {
                // Create a batch of events 
                using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
                // Add events to the batch. An event is a represented by a collection of bytes and metadata. 
                eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("First event")));
                eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Second event")));
                eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Third event")));
    
                // Use the producer client to send the batch of events to the event hub
                await producerClient.SendAsync(eventBatch);
                Console.WriteLine("A batch of 3 events has been published.");
            }
        }
    
  4. プロジェクトをビルドし、エラーがないことを確認します。

  5. プログラムを実行し、確認メッセージが表示されるまで待ちます。

  6. Azure portal で、イベント ハブがメッセージを受信したことを確認できます。 [メトリック] セクションで [メッセージ] ビューに切り替えます。 ページを最新の情報に更新して、グラフを更新します。 メッセージが受信されたことが示されるまでに数秒かかることがあります。

    イベント ハブがメッセージを受信したことを確認する

    注意

    より多くの情報を含むコメント付きの完全なソース コードについては、GitHub 上のこちらのファイルを参照してください

受信イベント

このセクションでは、イベント プロセッサを使用してイベント ハブからメッセージを受信する .NET Core コンソール アプリケーションを作成する方法について説明します。 イベント プロセッサは、永続的なチェックポイントとそれらのイベント ハブからの並列受信を管理することによって、イベント ハブからのイベントの受信を簡素化します。 イベント プロセッサは、特定のイベントハブとコンシューマー グループに関連付けられています。 イベント ハブ内の複数のパーティションからイベントを受け取り、指定されたコードを使用して処理できるようにハンドラー デリゲートに渡します。

警告

このコードを Azure Stack Hub で実行すると、特定の Storage API バージョンを対象としている場合を除き、実行時エラーが発生します。 これは、Event Hubs SDK では、Azure で利用できる最新の Azure Storage API が使用されますが、Azure Stack Hub プラットフォームではこれを利用できない可能性があるためです。 Azure Stack Hub でサポートされる Storage Blob SDK のバージョンは、Azure で一般的に利用できるバージョンと異なる場合があります。 チェックポイント ストアとして Azure Blob Storage を使用している場合は、Azure Stack Hub ビルドでサポートされている Azure Storage API バージョンを確認し、コード内でそのバージョンを対象にします。

たとえば、Azure Stack Hub バージョン 2005 上で実行している場合、Storage サービスで利用できる最も高いバージョンは 2019-02-02 となります。 既定では、Event Hubs SDK クライアント ライブラリでは、Azure で利用できる最も高いバージョン (SDK のリリース時点では 2019-07-07) が使用されます。 この場合は、このセクションの手順に従うことに加え、Storage サービス API バージョン 2019-02-02 を対象とするコードを追加する必要もあります。 特定の Storage API バージョンを対象にする方法の例については、GitHub のサンプルを参照してください。

Azure Storage と BLOB コンテナーを作成する

このクイックスタートでは、チェックポイント ストアとして Azure Storage を使用します。 Azure ストレージ アカウントを作成するには、次の手順に従います。

  1. Azure ストレージ アカウントの作成

  2. BLOB コンテナーを作成する

  3. ストレージ アカウントへの接続文字列を取得する

    接続文字列とコンテナー名を書き留めておきます。 これらは、受信コードで使用します。

受信側のプロジェクトを作成する

  1. ソリューション エクスプローラー ウィンドウで、 [EventHubQuickStart] ソリューションを右クリックし、 [追加] をポイントして、 [新しいプロジェクト] を選択します。
  2. [コンソール アプリ (.NET Core)] を選択し、 [次へ] を選択します。
  3. [プロジェクト名] に「EventHubsReceiver」と入力し、 [作成] を選択します。

Event Hubs NuGet パッケージの追加

  1. メニューから [ツール] > [NuGet パッケージ マネージャー] > [パッケージ マネージャー コンソール] の順に選択します。

  2. 次のコマンドを実行して、Azure.Messaging.EventHubs NuGet パッケージをインストールします。

    Install-Package Azure.Messaging.EventHubs
    
  3. 次のコマンドを実行して、Azure.Messaging.EventHubs.Processor NuGet パッケージをインストールします。

    Install-Package Azure.Messaging.EventHubs.Processor
    

Main メソッドを更新する

  1. Program.cs ファイルの先頭に次の using ステートメントを追加します。

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Event Hubs 接続文字列およびイベント ハブ名を表す定数を、Program クラスに追加します。 山かっこ内のプレースホルダーを、イベント ハブの作成時に取得した適切な値で置き換えます。 山かっこ内のプレースホルダーを、イベント ハブとストレージ アカウントの作成時に取得した適切な値 (アクセス キーとプライマリ接続文字列) で置き換えます。 {Event Hubs namespace connection string} がイベント ハブの文字列ではなく名前空間レベルの接続文字列であることを確認します。

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    
  3. Main メソッドを次の async Main メソッドで置き換えます。 詳細については、コードのコメントを参照してください。

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 30 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(30));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. ここで、次のイベントおよびエラー ハンドラー メソッドをクラスに追加します。

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. プロジェクトをビルドし、エラーがないことを確認します。

    注意

    より多くの情報を含むコメント付きの完全なソース コードについては、GitHub 上のこちらのファイルを参照してください。

  6. 受信側アプリを実行します。

  7. イベントが受信されたことを示すメッセージが表示されます。

    受信されたイベント

    これらのイベントは、前に送信側プログラムを実行してイベント ハブに送信した 3 つのイベントです。

次のステップ

GitHub で次のサンプルを確認します。