Event Hubs .NET SDK (AMQP) を使用してイベントのストリーミング時に Avro スキーマを使用して検証する

このクイックスタートでは、Azure.Messaging.EventHubs .NET ライブラリを使用したスキーマ検証を使用して、イベント ハブとの間でイベントを送受信する方法について説明します。

Note

Azure スキーマ レジストリは Event Hubs の機能で、イベント駆動型およびメッセージング中心のアプリケーション用のスキーマの中央リポジトリとして機能します。 プロデューサーとコンシューマー アプリケーションに対し、スキーマを管理して共有することなく、データを交換できる柔軟性を提供します。 また、再利用可能なスキーマのための単純なガバナンス フレームワークが用意されており、グループ化構成体 (スキーマ グループ) を使用してスキーマ間のリレーションシップを定義します。 詳細については、Event Hubs の Azure スキーマ レジストリ に関するページを参照してください。

前提条件

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。

このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。

  • Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。
  • Microsoft Visual Studio 2022。 Azure Event Hubs クライアント ライブラリでは、C# 8.0 で導入された新機能を利用しています。 以前のバージョンの C# 言語でライブラリを使うこともできますが、新しい構文は使用できません。 完全な構文を使用するには、.NET Core SDK 3.0 以上で、言語バージョンlatest に設定してコンパイルすることをお勧めします。 Visual Studio を使用している場合、Visual Studio 2019 より前のバージョンには、C# 8.0 プロジェクトをビルドするために必要なツールとの互換性がありません。 無料の Community エディションを含む Visual Studio 2019 は、[こちら](https://visualstudio.microsoft.com/vs/)からダウンロードできます。

イベント ハブの作成

Event Hubs 名前空間とイベント ハブを作成する」のクイックスタートの手順に従って、Event Hubs 名前空間とイベント ハブを作成します。 次に、接続文字列を取得する方法に関するページの手順に従って、使用している Event Hubs 名前空間への接続文字列を取得します。

現在のクイック スタートで使用する次の設定をメモします。

  • Event Hubs 名前空間の接続文字列
  • イベント ハブの名前

スキーマの作成

スキーマ レジストリを使用してスキーマを作成する方法に関するページの手順に従って、スキーマ グループとスキーマを作成します。

  1. スキーマ レジストリ ポータルを使用して、contoso-sg という名前のスキーマ グループを作成します。 シリアル化の種類として Avro を使用し、互換性モードに [なし] を使用します。

  2. そのスキーマ グループで、次のスキーマ コンテンツを使用して、スキーマ名 Microsoft.Azure.Data.SchemaRegistry.example.Order を使用して新しい Avro スキーマを作成します。

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

スキーマ レジストリ閲覧者ロールにユーザーを追加する

名前空間レベルでスキーマ レジストリ閲覧者ロールにユーザー アカウントを追加します。 スキーマ レジストリ共同作成者ロールを使用することもできますが、このクイックスタートでは必要ありません。

  1. [Event Hubs 名前空間] ページで、左側のメニューの [アクセス制御 (IAM)] を選択します。
  2. [アクセス制御 (IAM)] ページのメニューで、[+ 追加] ->[ロールの割り当てを追加] を選択します。
  3. [割り当ての種類] ページで、[次へ] を選択します。
  4. [ロール] ページで、[スキーマ レジストリ閲覧者 (プレビュー)] を選択し、ページの下部にある [次へ] を選択します。
  5. [+ メンバーの選択] リンクを使用してユーザー アカウントをロールに追加し、[次へ] を選択します。
  6. [確認と割り当て] ページで、[確認と割り当て] を選択します。

スキーマ検証を使用してイベント ハブにイベントを生成する

イベント プロデューサー用のコンソール アプリケーションを作成する

  1. Visual Studio 2019 を起動します。
  2. [新しいプロジェクトの作成] を選択します。
  3. **[新しいプロジェクトの作成]** ダイアログ ボックスで、次の手順に従います。このダイアログ ボックスが表示されない場合は、メニューで **[ファイル]****[新規]****[プロジェクト]** の順に選択します。
    1. プログラミング言語として **[C#]** を選択します。

    2. アプリケーションの種類として [コンソール] を選択します。

    3. 結果リストから **[コンソール アプリケーション]** を選択します。

    4. 次に、 [次へ] を選択します。

      [新しいプロジェクト] ダイアログ ボックスを示す画像。

  4. プロジェクト名として「OrderProducer」、ソリューション名として「SRQuickStart」と入力し、 [OK] を選択してプロジェクトを作成します。

Event Hubs NuGet パッケージの追加

  1. メニューから [ツール]>[NuGet パッケージ マネージャー]>[パッケージ マネージャー コンソール] の順に選択します。

  2. 次のコマンドを実行して Azure.Messaging.EventHubs と他の NuGet パッケージをインストールします。 Enter キーを押して最後のコマンドを実行します。

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. 次に示すように、プロデューサー アプリケーションを認証して、Visual Studio Azure に接続します。

  4. 名前空間レベルで Schema Registry Reader ロールのメンバーであるユーザー アカウントを使用して Azure にサインインします。 スキーマ レジストリ ロールの詳細については、「Event Hubs の Azure スキーマ レジストリ」をご覧ください。

Avro スキーマを使用したコード生成

  1. スキーマの作成に使用したのと同じコンテンツを使用して、Order.avsc という名前のファイルを作成します。 プロジェクトまたはソリューション フォルダーにファイルを保存します。
  2. その後、このスキーマ ファイルを使用して .NET のコードを生成できます。 コード生成には、avrogen などの外部コード生成ツールを使用できます。 たとえば、 avrogen -s .\Order.avsc . を実行してコードを生成できます。
  3. コードを生成すると、\Microsoft\Azure\Data\SchemaRegistry\example フォルダーに Order.cs という名前のファイルが表示されます。 上記の Avro スキーマでは、Microsoft.Azure.Data.SchemaRegistry.example 名前空間に C# 型が生成されます。
  4. OrderProducer プロジェクトに Order.cs ファイルを追加します。

イベント ハブにイベントをシリアル化して送信するコードを記述する

  1. 次のコードを Program.cs ファイルに追加します。 詳細については、コードのコメントを参照してください。 コードの大まかな手順は次のとおりです。

    1. イベント ハブにイベントを送信するために使用できるプロデューサー クライアントを作成します。
    2. Order オブジェクト内のデータをシリアル化および検証するために使用できるスキーマ レジストリ クライアントを作成します。
    3. 生成された Order 型を使用して新しい Order オブジェクトを作成します。
    4. スキーマ レジストリ クライアントを使用して、Order オブジェクトを EventData にシリアル化します。
    5. イベントのバッチを作成します。
    6. イベント データをイベント バッチに追加します。
    7. プロデューサー クライアントを使用して、イベントのバッチをイベント ハブに送信します。
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. 次のプレースホルダーの値を実際の値に置き換えます。

    • EVENTHUBSNAMESPACECONNECTIONSTRING - Event Hubs 名前空間の接続文字列
    • EVENTHUBNAME - イベント ハブの名前
    • EVENTHUBSNAMESPACENAME - Event Hubs 名前空間の名前
    • SCHEMAGROUPNAME - スキーマ グループの名前
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. プロジェクトをビルドし、エラーがないことを確認します。

  4. プログラムを実行し、確認メッセージが表示されるまで待ちます。

    A batch of 1 order has been published.
    
  5. Azure portal で、イベント ハブがイベントを受信したことを確認できます。 [メトリック] セクションで [メッセージ] ビューに切り替えます。 ページを最新の情報に更新して、グラフを更新します。 メッセージが受信されたことが示されるまでに数秒かかることがあります。

    イベント ハブでイベントが受信されたことを確認する Azure portal ページの画像。

スキーマ検証を使用してイベント ハブからイベントを使用する

このセクションでは、イベント ハブからイベントを受信し、スキーマ レジストリを使用してイベント データを逆シリアル化する .NET Core コンソール アプリケーションを作成する方法について説明します。

追加の前提条件

  • イベント プロセッサで使用するストレージ アカウントを作成します。

コンシューマー アプリケーションを作成する

  1. ソリューション エクスプローラー ウィンドウで、 [SRQuickStart] ソリューションを右クリックし、 [追加] をポイントして、 [新しいプロジェクト] を選択します。
  2. **[コンソール アプリケーション]** を選択し、 **[次へ]** を選択します。
  3. [プロジェクト名] に「OrderConsumer」と入力し、 [作成] を選択します。
  4. ソリューション エクスプローラー ウィンドウで、 [OrderConsumer] を右クリックし、 [スタートアップ プロジェクトとして設定] を選択します。

Event Hubs NuGet パッケージの追加

  1. メニューから [ツール]>[NuGet パッケージ マネージャー]>[パッケージ マネージャー コンソール] の順に選択します。

  2. [パッケージ マネージャー コンソール] ウィンドウで、 [既定のプロジェクト]OrderConsumer が選択されていることを確認します。 選択されていない場合は、ドロップダウン リストを使用して OrderConsumer を選択します。

  3. 次のコマンドを実行して、必要な NuGet パッケージをインストールします。 Enter キーを押して最後のコマンドを実行します。

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. 次に示すように、プロデューサー アプリケーションを認証して、Visual Studio Azure に接続します。

  5. 名前空間レベルで Schema Registry Reader ロールのメンバーであるユーザー アカウントを使用して Azure にサインインします。 スキーマ レジストリ ロールの詳細については、「Event Hubs の Azure スキーマ レジストリ」をご覧ください。

  6. プロデューサー アプリの作成の一環として生成した Order.cs ファイルを OrderConsumer プロジェクトに追加します。

  7. OrderConsumer プロジェクトを右クリックし、[スタートアップ プロジェクトに設定] を選択します。

スキーマ レジストリを使用し、イベントを受信して逆シリアル化するコードを記述する

  1. 次のコードを Program.cs ファイルに追加します。 詳細については、コードのコメントを参照してください。 コードの大まかな手順は次のとおりです。

    1. イベント ハブにイベントを送信するために使用できるコンシューマー クライアントを作成します。
    2. Azure BLOB ストレージ内に BLOB コンテナー用の BLOB コンテナー クライアントを作成します。
    3. イベント プロセッサ クライアントを作成し、イベント ハンドラーとエラー ハンドラーを登録します。
    4. イベント ハンドラーで、イベント データを Order オブジェクトに逆シリアル化するために使用できるスキーマ レジストリ クライアントを作成します。
    5. シリアライザーを使用して、イベント データを Order オブジェクトに逆シリアル化します。
    6. 受け取った注文に関する情報を印刷します。
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. 次のプレースホルダーの値を実際の値に置き換えます。

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING - Event Hubs 名前空間の接続文字列
    • EVENTHUBNAME - イベント ハブの名前
    • EVENTHUBSNAMESPACENAME - Event Hubs 名前空間の名前
    • SCHEMAGROUPNAME - スキーマ グループの名前
    • AZURESTORAGECONNECTIONSTRING - Azure ストレージ アカウントの接続文字列
    • BLOBCONTAINERNAME - BLOB コンテナーの名前
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. プロジェクトをビルドし、エラーがないことを確認します。

  4. 受信側アプリを実行します。

  5. イベントが受信されたことを示すメッセージが表示されます。

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    これらのイベントは、前に送信側プログラムを実行してイベント ハブに送信した 3 つのイベントです。

サンプル

GitHub リポジトリの Readme に関する記事をご覧ください。

リソースをクリーンアップする

Event Hubs 名前空間を削除するか、名前空間を含むリソース グループを削除します。

次のステップ