Validación mediante un esquema de Avro al transmitir eventos mediante los SDK de .NET de Event Hubs (AMQP)

En este inicio rápido, aprenderá cómo enviar eventos a un centro de eventos, y recibirlos de él, con la validación de esquemas mediante la biblioteca de .NET Azure.Messaging.EventHubs.

Nota:

El registro de esquema de Azure es una característica de Event Hubs, que proporciona un repositorio central para los esquemas de aplicaciones controladas por eventos y basadas en mensajería. Ofrece la flexibilidad para que las aplicaciones de productor y consumidor intercambien datos sin tener que administrar y compartir el esquema. También proporciona un marco de gobierno sencillo para los esquemas reutilizables y define la relación entre los esquemas a través de una construcción de agrupación (grupos de esquemas). Para más información, consulte Registro de esquema de Azure en Event Hubs.

Prerrequisitos

Si es la primera vez que usa Azure Event Hubs, consulte la información general de Event Hubs antes de continuar con este inicio rápido.

Para completar este tutorial de inicio rápido, debe cumplir los siguientes requisitos previos:

  • Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar.
  • Microsoft Visual Studio 2022. La biblioteca cliente de Azure Event Hubs utiliza las nuevas características que se introdujeron en C# 8.0. Aún puede seguir usando la biblioteca con versiones anteriores de lenguaje C#, pero la nueva sintaxis no está disponible. Para usar la sintaxis completa, se recomienda realizar la compilación con el SDK para .NET Core 3.0 o posterior y la versión de lenguaje establecida en latest. Si usa Visual Studio, las versiones anteriores a Visual Studio 2019 no son compatibles con las herramientas necesarias para compilar proyectos de C# 8.0. Visual Studio 2019, incluida la edición gratuita Community, se puede descargar aquí.

Creación de un centro de eventos

Siga las instrucciones del inicio rápido Creación de un espacio de nombres y un centro de eventos de Event Hubs para crear un espacio de nombres de Event Hubs y un centro de eventos. A continuación, para obtener una cadena de conexión al espacio de nombres de Event Hubs, siga las instrucciones que se indican en Obtención de una cadena de conexión.

Anote la siguiente configuración que usará en el inicio rápido actual:

  • Cadena de conexión del espacio de nombres de Event Hubs
  • Nombre del centro de eventos

Creación de un esquema

Siga las instrucciones que se indican en Creación de esquemas mediante el registro de esquema para crear un grupo de esquemas y un esquema.

  1. Cree un grupo de esquemas denominado contoso-sg mediante el portal del registro de esquema. Use Avro como tipo de serialización y Ninguno como modo de compatibilidad.

  2. En ese grupo de esquemas, cree un esquema de Avro con el nombre del esquema Microsoft.Azure.Data.SchemaRegistry.example.Order mediante el siguiente contenido de esquema.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Agregar usuario al rol Lector del registro de esquema

Agregue la cuenta de usuario al rol Lector del registro de esquema en el nivel de espacio de nombres. También puede usar el rol Colaborador del registro de esquema, pero no es necesario para este inicio rápido.

  1. En la página Espacio de nombres de Event Hubs, seleccione Control de acceso (IAM) en el menú de la izquierda.
  2. En la página Control de acceso (IAM), seleccione + Agregar ->Agregar asignación de roles en el menú.
  3. En la página Tipo de asignación, seleccione Siguiente.
  4. En la página Roles, seleccione Lector del registro de esquema (versión preliminar) y, a continuación, seleccione Siguiente en la parte inferior de la página.
  5. Use el vínculo + Seleccionar miembros para agregar la cuenta de usuario al rol y, a continuación, seleccione Siguiente.
  6. En la página Revisar + asignar, seleccione Revisar + asignar.

Producción de eventos en Event Hubs con validación de esquema

Creación de una aplicación de consola para el productor de eventos

  1. Inicie Visual Studio 2019.
  2. Seleccione Crear un proyecto.
  3. En el cuadro de diálogo Crear un nuevo proyecto, siga estos pasos: Si no ve este cuadro de diálogo, seleccione Archivo en el menú, seleccione Nuevo y, después, seleccione Proyecto.
    1. Seleccione C# como lenguaje de programación.

    2. Seleccione Consola como tipo de aplicación.

    3. Seleccione Aplicación de consola en la lista de resultados.

    4. Después, seleccione Siguiente.

      Imagen que muestra el cuadro de diálogo Nuevo proyecto.

  4. Escriba OrderProducer como nombre del proyecto, SRQuickStart como nombre de la solución y, después, seleccione Aceptar para crear el proyecto.

Incorporación del paquete NuGet de Event Hubs

  1. Seleccione Herramientas>Administrador de paquetes NuGet>Consola del Administrador de paquetes en el menú.

  2. Ejecute los comandos siguientes para instalar Azure.Messaging.EventHubs y otros paquetes de NuGet. Presione ENTRAR para ejecutar el último comando.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Autentique las aplicaciones de productor para conectarse a Azure mediante Visual Studio, como se muestra aquí.

  4. Inicie sesión en Azure utilizando la cuenta de usuario que sea miembro del rol Schema Registry Reader a nivel de espacio de nombres. Para obtener información sobre los roles de registro de esquema, consulte Registro de esquema de Azure en Event Hubs.

Generación de código mediante el esquema de Avro

  1. Use el mismo contenido que usó para crear el esquema para crear un archivo denominado Order.avsc. Guarde el archivo en la carpeta del proyecto o la solución.
  2. Luego, puede usar este archivo de esquema para generar código para .NET. Puede usar cualquier herramienta externa de generación de código como avrogen para la generación de código. Por ejemplo, puede ejecutar avrogen -s .\Order.avsc . para generar código.
  3. Una vez generado el código, verá el archivo denominado Order.cs en la carpeta \Microsoft\Azure\Data\SchemaRegistry\example. Para el esquema de Avro anterior, se generan los tipos de C# en el espacio de nombres Microsoft.Azure.Data.SchemaRegistry.example.
  4. Añada el archivo Order.cs al proyecto OrderProducer.

Escritura de código para serializar y enviar eventos al centro de eventos

  1. Agregue el siguiente código al archivo Program.cs. Vea los comentarios de código para obtener más detalles. Los pasos de alto nivel del código son:

    1. Cree un cliente productor que pueda usar para enviar eventos a un centro de eventos.
    2. Cree un cliente de registro de esquema que pueda usar para serializar y validar datos en un objeto Order.
    3. Cree un nuevo objeto Order con el tipo generado Order.
    4. Use el cliente de registro de esquema para serializar el objeto Order en EventData.
    5. Cree un lote de eventos.
    6. Agregue los datos del evento al lote de eventos.
    7. Use el cliente productor para enviar el lote de eventos al centro de eventos.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Reemplace los siguientes valores de marcador de posición por los valores reales.

    • EVENTHUBSNAMESPACECONNECTIONSTRING: cadena de conexión del espacio de nombres de Event Hubs
    • EVENTHUBNAME: nombre del centro de eventos
    • EVENTHUBSNAMESPACENAME: nombre del espacio de nombres de Event Hubs
    • SCHEMAGROUPNAME: nombre del grupo de esquema
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Compile el proyecto y asegúrese de que no hay errores.

  4. Ejecute el programa y espere el mensaje de confirmación.

    A batch of 1 order has been published.
    
  5. En Azure Portal, puede comprobar que el centro de eventos ha recibido los eventos. Cambie a la vista Mensajes en la sección Métricas. Actualice la página para actualizar el gráfico. Puede tardar unos segundos en mostrar que los mensajes se han recibido.

    Imagen de la página de Azure Portal para comprobar que el centro de eventos recibió los eventos.

Consumo de eventos de centros de eventos con validación de esquemas

En esta sección se muestra cómo escribir una aplicación de consola de .NET Core que recibe eventos de un centro de eventos y cómo usar el registro de esquema para deserializar los datos de eventos.

Requisitos previos adicionales

  • Cree la cuenta de almacenamiento para usar el procesador de evento.

Creación de una aplicación de consumidor

  1. En la ventana del Explorador de soluciones, haga clic con el botón derecho en la solución SRQuickStart, apunte a Agregar y seleccione Nuevo proyecto.
  2. Seleccione Aplicación de consola y seleccione Siguiente.
  3. Escriba OrderConsumer en Nombre de proyecto y seleccione Crear.
  4. En la ventana del Explorador de soluciones, haga clic con el botón derecho en OrderConsumer y seleccione Set as a Startup Project (Establecer como proyecto de inicio).

Incorporación del paquete NuGet de Event Hubs

  1. Seleccione Herramientas>Administrador de paquetes NuGet>Consola del Administrador de paquetes en el menú.

  2. En la ventana de la Consola del Administrador de paquetes, confirme que OrderConsumer está seleccionado como Proyecto predeterminado. Si no es así, use la lista desplegable para seleccionar OrderConsumer.

  3. Ejecute el siguiente comando para instalar los paquetes NuGet necesarios. Presione ENTRAR para ejecutar el último comando.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Autentique las aplicaciones de productor para conectarse a Azure mediante Visual Studio, como se muestra aquí.

  5. Inicie sesión en Azure utilizando la cuenta de usuario que sea miembro del rol Schema Registry Reader a nivel de espacio de nombres. Para obtener información sobre los roles de registro de esquema, consulte Registro de esquema de Azure en Event Hubs.

  6. Agregue el archivo Order.cs que generó como parte de la creación de la aplicación de productor al proyecto OrderConsumer.

  7. Haga clic con el botón derecho en el proyecto OrderConsumer y seleccione Establecer como proyecto de inicio.

Escritura de código para recibir eventos y deserializarlos mediante el registro de esquema

  1. Agregue el siguiente código al archivo Program.cs. Vea los comentarios de código para obtener más detalles. Los pasos de alto nivel del código son:

    1. Cree un cliente de consumidor que pueda usar para enviar eventos a un centro de eventos.
    2. Cree un cliente de contenedor de blobs para el contenedor de blobs en el almacenamiento de blobs de Azure.
    3. Cree un cliente de procesador de evento y registre controladores de eventos y errores.
    4. En el controlador de eventos, cree un cliente de registro de esquema que puede usar para deserializar los datos de eventos en un objeto Order.
    5. Deserializar los datos del evento en un objeto Order mediante el serializador.
    6. Imprima la información sobre la orden recibida.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Reemplace los siguientes valores de marcador de posición por los valores reales.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING: cadena de conexión del espacio de nombres de Event Hubs
    • EVENTHUBNAME: nombre del centro de eventos
    • EVENTHUBSNAMESPACENAME: nombre del espacio de nombres de Event Hubs
    • SCHEMAGROUPNAME: nombre del grupo de esquema
    • AZURESTORAGECONNECTIONSTRING : cadena de conexión para la cuenta de almacenamiento de Azure
    • BLOBCONTAINERNAME : nombre del contenedor de blobs
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Compile el proyecto y asegúrese de que no hay errores.

  4. Ejecute la aplicación del destinatario.

  5. Debería ver un mensaje que indica que se han recibido los eventos.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Estos eventos son los tres que envió al centro de eventos anteriormente mediante la ejecución del programa del emisor.

Ejemplos

Consulte el artículo Léame en nuestro repositorio de GitHub.

Limpieza de recursos

Elimine el espacio de nombres de Event Hubs o elimine el grupo de recursos que contiene el espacio de nombres.

Pasos siguientes