Überprüfen mithilfe eines Avro-Schemas beim Streamen von Ereignissen mithilfe von Event Hubs .NET SDKs (AMQP)

In dieser Schnellstartanleitung lernen Sie, wie Sie mithilfe der .NET-Bibliothek Azure.Messaging.EventHubs Ereignisse an einen Event Hub mit Schemaüberprüfung senden bzw. von dort empfangen.

Hinweis

Die Azure-Schemaregistrierungs ist eine Funktion von Event Hubs, die ein zentrales Repository für Schemas für ereignisgesteuerte und nachrichtenzentrierte Anwendungen bereitstellt. Sie bietet Ihren Producer- und Consumeranwendungen die Flexibilität, Daten auszutauschen, ohne das Schema verwalten und gemeinsam nutzen zu müssen. Sie stellt außerdem ein einfaches Governanceframework für wiederverwendbare Schemas bereit und definiert die Beziehung zwischen Schemas über ein Gruppierungskonstrukt (Schemagruppen). Weitere Informationen finden Sie unter Azure-Schema Registry in Azure Event Hubs.

Voraussetzungen

Wenn Sie mit Azure Event Hubs noch nicht vertraut sind, lesen Sie vor dem Durcharbeiten dieser Schnellstartanleitung die Informationen unter Übersicht über Event Hubs.

Zum Durchführen dieser Schnellstartanleitung benötigen Sie Folgendes:

  • Wenn Sie kein Azure-Abonnement besitzen, erstellen Sie ein kostenloses Konto, bevor Sie beginnen.
  • Microsoft Visual Studio 2022. Die Azure Event Hubs-Clientbibliothek nutzt neue Features, die in C# 8.0 eingeführt wurden. Sie können die Bibliothek weiterhin mit früheren C#-Sprachversionen verwenden, aber die neue Syntax ist nicht verfügbar. Wenn Sie die vollständige Syntax nutzen möchten, sollten Sie die Kompilierung mithilfe des .NET Core SDK 3.0 oder höher durchführen und die Sprachversion auf latest festlegen. Bei Verwendung von Visual Studio sind Versionen vor Visual Studio 2019 nicht mit den Tools kompatibel, die zum Erstellen von C# 8.0-Projekten erforderlich sind. Visual Studio 2019 (einschließlich der kostenlosen Community-Edition) kann hier heruntergeladen werden.

Erstellen eines Ereignis-Hubs

Befolgen Sie die Anweisungen aus der Schnellstartanleitung Erstellen eines Event Hubs-Namespaces und eines Event Hubs, um einen Event Hubs-Namespace und einen Event Hub zu erstellen. Befolgen Sie dann die Anweisungen unter Abrufen der Verbindungszeichenfolge, um eine Verbindungszeichenfolge für Ihren Event Hubs-Namespace abzurufen.

Notieren Sie sich die folgenden Einstellungen, die Sie in der aktuellen Schnellstartanleitung verwenden werden:

  • Verbindungszeichenfolge für den Event Hubs-Namespace
  • Name des Event Hubs

Erstellen eines Schemas

Befolgen Sie die Anweisungen unter Erstellen von Schemas mithilfe der Schemaregistrierung , um eine Schemagruppe und ein Schema zu erstellen.

  1. Erstellen Sie im Schemaregistrierungsportal eine Schemagruppe namens contoso-sg. Verwenden Sie Avro als Serialisierungstyp, und geben Sie Keine für den Kompatibilitätsmodus an.

  2. Erstellen Sie in dieser Schemagruppe ein neues Avro-Schema mit dem Schemanamen Microsoft.Azure.Data.SchemaRegistry.example.Order. Verwenden Sie den folgenden Schemainhalt.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Hinzufügen eines Benutzers zur Rolle „Schemaregistrierungsleser“

Fügen Sie Ihr Benutzerkonto auf Namespaceebene der Rolle Schemaregistrierungsleser hinzu. Sie können auch die Rolle Mitwirkender der Schemaregistrierung verwenden, aber dies ist für diese Schnellstartanleitung nicht erforderlich.

  1. Wählen Sie auf der Seite Event Hubs-Namespace im linken Menü Zugriffssteuerung (IAM) aus.
  2. Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Option + Hinzufügen ->Rollenzuweisung hinzufügen aus.
  3. Wählen Sie auf der Seite Zuweisungstyp die Option Weiter aus.
  4. Wählen Sie auf der Seite Rollen die Option Schemaregistrierungsleser (Vorschau) und dann unten auf der Seite Weiter aus.
  5. Verwenden Sie den Link + Mitglieder auswählen, um Ihr Benutzerkonto zur Rolle hinzuzufügen, und wählen Sie dann Weiter aus.
  6. Wählen Sie auf der Seite Überprüfen + zuweisen die Option Überprüfen + zuweisen aus.

Erstellen von Ereignissen für Event Hubs mit Schemaüberprüfung

Erstellen einer Konsolenanwendung für den Ereignisersteller

  1. Starten Sie Visual Studio 2019.
  2. Wählen Sie Neues Projekt erstellen aus.
  3. Führen Sie im Dialogfeld Neues Projekt erstellen die folgenden Schritte aus: Sollte dieses Dialogfeld nicht angezeigt werden, wählen Sie im Menü die Option Datei und anschließend Neu > Projekt aus.
    1. Wählen Sie die Programmiersprache C# aus.

    2. Wählen Sie den Anwendungstyp Konsole aus.

    3. Wählen Sie in der Ergebnisliste die Option Konsolenanwendung aus.

    4. Klicken Sie anschließend auf Weiter.

      Abbildung: Dialogfeld „Neues Projekt“

  4. Geben Sie OrderProducer als Projektnamen und SRQuickStart als Projektmappennamen ein, und klicken Sie anschließend auf OK, um das Projekt zu erstellen.

Hinzufügen des Event Hubs-NuGet-Pakets

  1. Wählen Sie im Menü Extras>NuGet-Paket-Manager>Paket-Manager-Konsole aus.

  2. Führen Sie den folgenden Befehl aus, um Azure.Messaging.EventHubs und andere NuGet-Pakete zu installieren. Drücken Sie die EINGABETASTE, um den letzten Befehl auszuführen.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Authentifizieren Sie Erstelleranwendungen, um über Visual Studio eine Verbindung mit Azure herzustellen, wie hier gezeigt.

  4. Melden Sie sich mit dem Benutzerkonto, das Mitglied der Rolle Schema Registry Reader auf Namespaceebene ist, bei Azure an. Informationen zu Schemaregistrierungsrollen finden Sie unter Azure-Schemaregistrierung in Event Hubs.

Codegenerierung mithilfe des Avro-Schemas

  1. Verwenden Sie denselben Inhalt, den Sie zum Erstellen des Schemas verwendet haben, um eine Datei mit dem Namen Order.avsc zu erstellen. Speichern Sie die Datei im Projekt- oder Lösungsordner.
  2. Anschließend können Sie mithilfe dieser Schemadatei Code für .NET generieren. Sie können ein beliebiges externes Codegenerierungstool wie avrogen für die Codegenerierung verwenden. Beispielsweise können Sie avrogen -s .\Order.avsc . ausführen, um Code zu generieren.
  3. Nachdem Sie Code generiert haben, wird die Datei Order.cs im Ordner \Microsoft\Azure\Data\SchemaRegistry\example angezeigt. Für das obige Avro-Schema werden die C#-Typen im Namespace Microsoft.Azure.Data.SchemaRegistry.example generiert.
  4. Fügen Sie die Datei Order.cs zum Projekt OrderProducer hinzu.

Schreiben von Code zum Serialisieren und Senden von Ereignissen an den Event Hub

  1. Fügen Sie der Datei Program.cs den folgenden Code hinzu. Weitere Einzelheiten finden Sie in den Codekommentaren. Allgemeine Schritte im Code:

    1. Erstellen Sie einen Producerclient, mit dem Sie Ereignisse an einen Event Hub senden können.
    2. Erstellen Sie einen Schemaregistrierungsclient, mit dem Sie Daten in einem Order-Objekt serialisieren und überprüfen können.
    3. Erstellen Sie mithilfe des generierten Order-Typs ein neues Order-Objekt.
    4. Verwenden Sie den Schemaregistrierungsclient, um das Order-Objekt in EventData zu serialisieren.
    5. Erstellen Sie einen Batch von Ereignissen.
    6. Fügen Sie die Ereignisdaten dem Ereignisbatch hinzu.
    7. Verwenden Sie den Producerclient, um den Batch von Ereignissen an den Event Hub zu senden.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Ersetzen Sie die folgenden Platzhalterwerte durch die echten Werte:

    • EVENTHUBSNAMESPACECONNECTIONSTRING: Verbindungszeichenfolge für den Event Hubs-Namespace
    • EVENTHUBNAME: Name des Event Hubs
    • EVENTHUBSNAMESPACENAME: Name des Event Hubs-Namespaces
    • SCHEMAGROUPNAME: Name des zu verwendenden Schemas
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Erstellen Sie das Projekt, und vergewissern Sie sich, dass keine Fehler vorhanden sind.

  4. Führen Sie das Programm aus, und warten Sie auf die Bestätigungsmeldung.

    A batch of 1 order has been published.
    
  5. Im Azure-Portal können Sie überprüfen, ob der Event Hub die Ereignisse empfangen hat. Wechseln Sie im Abschnitt Metriken zur Ansicht Nachrichten. Aktualisieren Sie die Seite, um das Diagramm zu aktualisieren. Es kann einige Sekunden dauern, bis angezeigt wird, dass die Nachrichten empfangen wurden.

    Abbildung: Seite im Azure-Portal zum Überprüfen, ob der Event Hub die Ereignisse empfangen hat

Nutzen von Ereignissen aus Event Hubs mit Schemaüberprüfung

In diesem Abschnitt erfahren Sie, wie Sie eine .NET Core-Konsolenanwendung schreiben, die Ereignisse von einem Event Hub empfängt, und die Schemaregistrierung zum Deserialisieren von Ereignisdaten verwenden.

Zusätzliche Voraussetzungen

  • Erstellen Sie das Speicherkonto, das vom Ereignisprozessor verwendet werden soll.

Erstellen einer Consumeranwendung

  1. Klicken Sie im Projektmappen-Explorer mit der rechten Maustaste auf die Projektmappe SRQuickStart, zeigen Sie auf Hinzufügen, und wählen Sie anschließend Neues Projekt aus.
  2. Wählen Sie zuerst Konsolenanwendung, dann Weiter aus.
  3. Geben Sie unter Projektname den Namen OrderConsumer ein, and wählen Sie Erstellen aus.
  4. Klicken Sie im Fenster Projektmappen-Explorer mit der rechten Maustaste auf OrderConsumer, und wählen Sie Als Startprojekt festlegen aus.

Hinzufügen des Event Hubs-NuGet-Pakets

  1. Wählen Sie im Menü Extras>NuGet-Paket-Manager>Paket-Manager-Konsole aus.

  2. Bestätigen Sie im Fenster Paket-Manager-Konsole, dass OrderConsumer als Standardprojekt ausgewählt wurde. Ist dies nicht der Fall, wählen Sie OrderConsumer in der Dropdownliste aus.

  3. Führen Sie den folgenden Befehl aus, um die erforderlichen NuGet-Pakete zu installieren. Drücken Sie die EINGABETASTE, um den letzten Befehl auszuführen.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Authentifizieren Sie Erstelleranwendungen, um über Visual Studio eine Verbindung mit Azure herzustellen, wie hier gezeigt.

  5. Melden Sie sich mit dem Benutzerkonto, das Mitglied der Rolle Schema Registry Reader auf Namespaceebene ist, bei Azure an. Informationen zu Schemaregistrierungsrollen finden Sie unter Azure-Schemaregistrierung in Event Hubs.

  6. Fügen Sie die Datei Order.cs, die Sie beim Erstellen der Producer-App generiert haben, dem Projekt OrderConsumer hinzu.

  7. Klicken Sie mit der rechten Maustaste auf das Projekt OrderConsumer, und wählen Sie Als Startprojekt festlegen aus.

Schreiben von Code zum Empfangen und Deserialisieren von Ereignissen mithilfe der Schemaregistrierung

  1. Fügen Sie der Datei Program.cs den folgenden Code hinzu. Weitere Einzelheiten finden Sie in den Codekommentaren. Allgemeine Schritte im Code:

    1. Erstellen Sie einen Consumerclient, mit dem Sie Ereignisse an einen Event Hub senden können.
    2. Erstellen Sie einen Blobcontainerclient für den Blobcontainer im Azure Blob Storage.
    3. Erstellen Sie einen Ereignisprozessorclient, und registrieren Sie Ereignis- und Fehlerhandler.
    4. Erstellen Sie im Ereignishandler einen Schemaregistrierungsclient, mit dem Sie Ereignisdaten in einem Order-Objekt deserialisieren können.
    5. Deserialisieren Sie die Ereignisdaten in einem Order-Objekt mithilfe des Serialisierungsmoduls.
    6. Drucken Sie die Informationen zur empfangenen Bestellung aus.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Ersetzen Sie die folgenden Platzhalterwerte durch die echten Werte:

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING: Verbindungszeichenfolge für den Event Hubs-Namespace
    • EVENTHUBNAME: Name des Event Hubs
    • EVENTHUBSNAMESPACENAME: Name des Event Hubs-Namespaces
    • SCHEMAGROUPNAME: Name des zu verwendenden Schemas
    • AZURESTORAGECONNECTIONSTRING: Verbindungszeichenfolge für das Azure-Speicherkonto
    • BLOBCONTAINERNAME: Name des Blobcontainers
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Erstellen Sie das Projekt, und vergewissern Sie sich, dass keine Fehler vorhanden sind.

  4. Führen Sie die Empfängeranwendung aus.

  5. Daraufhin sollte eine Nachricht mit dem Hinweis angezeigt werden, dass die Ereignisse empfangen wurden.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Bei diesen Ereignissen handelt es sich um die drei Ereignisse, die Sie zuvor durch Ausführen des Sendeprogramms an den Event Hub gesendet haben.

Proben

Weitere Informationen finden Sie in der Infodatei in unserem GitHub-Repository.

Bereinigen von Ressourcen

Löschen Sie den Event Hubs-Namespace, oder löschen Sie die Ressourcengruppe, die den Namespace enthält.

Nächste Schritte