Uso de Java para enviar o recibir eventos en Azure Event Hubs

En este inicio rápido se muestra cómo enviar y recibir eventos desde un centro de eventos mediante el paquete de Java azure-messaging-eventhubs.

Sugerencia

Si trabaja con recursos de Azure Event Hubs en una aplicación de Spring, se recomienda considerar el uso de Spring Cloud Azure como alternativa. Spring Cloud Azure es un proyecto de código abierto que proporciona una integración perfecta de Spring con los servicios de Azure. Para más información sobre Spring Cloud Azure y para ver un ejemplo con Event Hubs, consulte Spring Cloud Stream con Azure Event Hubs.

Requisitos previos

Si es la primera vez que usa Azure Event Hubs, consulte la información general de Event Hubs antes de continuar con este inicio rápido.

Para completar este tutorial de inicio rápido, debe cumplir los siguientes requisitos previos:

  • Una suscripción a Microsoft Azure. Para usar los servicios de Azure, entre los que se incluye Azure Event Hubs, se necesita una suscripción. Si no se dispone de una cuenta de Azure, es posible registrarse para obtener una evaluación gratuita, o bien usar las ventajas que disfrutan los suscriptores MSDN al crear una cuenta.
  • Un entorno de desarrollo de Java. Este inicio rápido utiliza Eclipse. Se requiere el kit de desarrollo de Java (JDK), versión 8 o posteriores.
  • Creación de un espacio de nombres de Event Hubs y un centro de eventos. El primer paso consiste en usar Azure Portal para crear un espacio de nombres de tipo Event Hubs y obtener las credenciales de administración que la aplicación necesita para comunicarse con el centro de eventos. Para crear un espacio de nombres y un centro de eventos, siga el procedimiento que se indica en este artículo. Después, obtenga la cadena de conexión para el espacio de nombres de Event Hubs. Para ello, siga las instrucciones del artículo: Obtenga la cadena de conexión. La utilizará más adelante en este inicio rápido.

Envío de eventos

En esta sección se muestra cómo crear una aplicación de Java para enviar eventos a un centro de eventos.

Incorporación de una referencia a la biblioteca de Azure Event Hubs

primero, cree un nuevo proyecto de Maven para una aplicación de consola o shell en su entorno de desarrollo Java favorito. Actualice el archivo pom.xml de la forma siguiente. La biblioteca de cliente de Java para Event Hubs está disponible en repositorio central de Maven.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Nota:

Actualice la versión a la más reciente publicada en el repositorio Maven.

Autenticación de la aplicación en Azure

En este inicio rápido se muestran dos maneras de conectarse a Azure Event Hubs: sin contraseña y con la cadena de conexión. La primera opción muestra cómo usar la entidad de seguridad de Microsoft Entra ID y el control de acceso basado en roles (RBAC) para conectarse a un espacio de nombres de Event Hubs. No es necesario preocuparse por tener cadenas de conexión codificadas de forma rígida en el código, en un archivo de configuración o en un almacenamiento seguro, como Azure Key Vault. La segunda opción muestra cómo usar una cadena de conexión para conectarse a un espacio de nombres de Event Hubs. Si no está familiarizado con Azure, puede que le resulte más fácil la opción de la cadena de conexión. Se recomienda usar la opción sin contraseña en aplicaciones reales y entornos de producción. Para más información, consulte Autenticación y autorización. También puede obtener más información sobre la autenticación sin contraseña en la página de información general.

Asignación de roles al usuario de Microsoft Entra

Al desarrollar localmente, asegúrese de que la cuenta de usuario que se conecta a Azure Event Hubs tenga los permisos correctos. Necesitará el rol propietario de datos Azure Event Hubs para enviar y recibir mensajes. Para asignarse este rol, necesitará el rol Administrador de accesos de usuarios, u otro que incluya la acción Microsoft.Authorization/roleAssignments/write. Puede asignar roles RBAC de Azure a un usuario mediante Azure Portal, la CLI de Azure o Azure PowerShell. Puede obtener más información sobre los ámbitos disponibles para las asignaciones de roles en la página de información general del ámbito.

En el ejemplo siguiente se asigna el rol Azure Event Hubs Data Owner a la cuenta de usuario, que proporciona un acceso completo a los recursos de Azure Event Hubs. En un escenario real, siga el Principio de privilegios mínimos para conceder a los usuarios solo los permisos mínimos necesarios para un entorno de producción más seguro.

Roles integrados de Azure para Azure Event Hubs

En el caso de Azure Event Hubs, la administración de los espacios de nombres y de todos los recursos relacionados mediante Azure Portal y la API de administración de recursos de Azure, ya se ha protegido mediante el modelo de Azure RBAC. Azure proporciona los siguientes roles integrados de Azure para autorizar el acceso a un espacio de nombres de Event Hubs:

Si desea crear un rol personalizado, consulte Derechos necesarios para las operaciones de Event Hubs.

Importante

En la mayoría de los casos, la asignación de roles tardará un minuto o dos en propagarse en Azure. En ocasiones excepcionales, puede tardar hasta ocho minutos. Si recibe errores de autenticación al ejecutar por primera vez el código, espere unos instantes e inténtelo de nuevo.

  1. En Azure Portal, localice el espacio de nombres de Event Hubs mediante la barra de búsqueda principal o el panel de navegación de la izquierda.

  2. En la página de información general, seleccione Control de acceso (IAM) en el menú de la izquierda.

  3. En la página Control de acceso (IAM), seleccione la pestaña Asignación de roles.

  4. Seleccione + Agregar en el menú superior y, a continuación, Agregar asignación de roles en el menú desplegable resultante.

    A screenshot showing how to assign a role.

  5. Puede usar el cuadro de búsqueda para filtrar los resultados por el rol deseado. En este ejemplo, busque Azure Event Hubs Data Owner y seleccione el resultado coincidente. Después, haga clic en Siguiente.

  6. En la pestaña Asignar acceso a, seleccione Usuario, grupo o entidad de servicio y, a continuación, elija + Seleccionar miembros.

  7. En el cuadro de diálogo, busque el nombre de usuario de Microsoft Entra (normalmente su dirección de correo electrónico de user@domain) y, a continuación, elija Seleccionar en la parte inferior del cuadro de diálogo.

  8. Seleccione Revisar y asignar para ir a la página final y, a continuación, de nuevo Revisar y asignar para completar el proceso.

Escritura de código para enviar mensajes al centro de eventos

Agregue una clase denominada Sender y agréguele el código siguiente:

Importante

  • Actualice <NAMESPACE NAME> con el nombre del espacio de nombres de Event Hubs.
  • Actualice <EVENT HUB NAME> con el nombre del centro de eventos.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Compile el programa y asegúrese de que no hay errores. Este programa se ejecutará después de ejecutar el programa del destinatario.

Recepción de eventos

El código de este tutorial se basa en el ejemplo EventProcessorClient de GitHub, que se puede examinar para ver toda la aplicación en funcionamiento.

Siga estas recomendaciones al usar Azure Blob Storage como almacén de puntos de control:

  • Use un contenedor independiente para cada grupo de consumidores. Puede usar la misma cuenta de almacenamiento, pero usar un contenedor por cada grupo.
  • No use el contenedor ni la cuenta de almacenamiento para otras actividades.
  • La cuenta de almacenamiento debe estar en la misma región en la que se encuentra la aplicación implementada. Si la aplicación es local, intente elegir la región más cercana posible.

En la página Cuenta de almacenamiento de Azure Portal, en la sección Blob service, asegúrese de que la siguiente configuración está deshabilitada.

  • Espacio de nombres jerárquico
  • Eliminación temporal de blobs
  • Control de versiones

Creación de una instancia de Azure Storage y un contenedor de blobs de Azure Storage

En este inicio rápido, se usa Azure Storage (concretamente Blob Storage) como almacén de puntos de control. La creación de puntos de control es un proceso por el que un procesador de eventos marca o confirma la posición del último evento procesado correctamente en una partición. Normalmente, el marcado de un punto de control se realiza en la función que procesa los eventos. Para obtener más información acerca de la creación de puntos de control, consulte la sección sobre el procesador de eventos.

Siga estos pasos para crear una cuenta de Azure Storage.

  1. Creación de una cuenta de Azure Storage
  2. Creación de un contenedor de blobs
  3. Autenticación en el contenedor de blobs

Al desarrollar localmente, asegúrese de que la cuenta de usuario que accede a los datos de blob tenga los permisos correctos. Necesitará el Colaborador de datos de blobs de almacenamiento para leer y escribir datos de blob. Para asignarse este rol a sí mismo, necesitará que se le asigne el rol Administrador de acceso de usuario u otro rol que incluya la acción Microsoft.Authorization/roleAssignments/write. Puede asignar roles RBAC de Azure a un usuario mediante Azure Portal, la CLI de Azure o Azure PowerShell. Puede obtener más información sobre los ámbitos disponibles para las asignaciones de roles en la página de información general del ámbito.

En este escenario, asignará permisos a la cuenta de usuario, cuyo ámbito es la cuenta de almacenamiento, a fin de seguir el principio de privilegios mínimos. Esta práctica solo proporciona a los usuarios los permisos mínimos necesarios y crea entornos de producción más seguros.

En el ejemplo siguiente se asignará el rol Colaborador de datos de blobs de almacenamiento a la cuenta de usuario, que proporciona acceso de lectura y escritura a los datos de blobs de la cuenta de almacenamiento.

Importante

En la mayoría de los casos, la asignación de roles tardará un minuto o dos en propagarse en Azure, pero en casos excepcionales puede tardar hasta ocho minutos. Si recibe errores de autenticación al ejecutar por primera vez el código, espere unos instantes e inténtelo de nuevo.

  1. En Azure Portal, busque la cuenta de almacenamiento mediante la barra de búsqueda principal o el panel de navegación de la izquierda.

  2. En la página de información general de la cuenta de almacenamiento, seleccione Control de acceso (IAM) en el menú de la izquierda.

  3. En la página Control de acceso (IAM), seleccione la pestaña Asignación de roles.

  4. Seleccione + Agregar en el menú superior y, a continuación, Agregar asignación de roles en el menú desplegable resultante.

    A screenshot showing how to assign a storage account role.

  5. Puede usar el cuadro de búsqueda para filtrar los resultados por el rol deseado. En este ejemplo, busque Colaborador de datos de blobs de almacenamiento y seleccione el resultado coincidente y, a continuación, elija Siguiente.

  6. En la pestaña Asignar acceso a, seleccione Usuario, grupo o entidad de servicio y, a continuación, elija + Seleccionar miembros.

  7. En el cuadro de diálogo, busque el nombre de usuario de Microsoft Entra (normalmente su dirección de correo electrónico de user@domain) y, a continuación, elija Seleccionar en la parte inferior del cuadro de diálogo.

  8. Seleccione Revisar y asignar para ir a la página final y, a continuación, de nuevo Revisar y asignar para completar el proceso.

Incorporación de bibliotecas de Event Hubs al proyecto de Java

Agregue las siguientes dependencias en el archivo pom.xml.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Agregue las siguientes instrucciones import en la parte superior del archivo Java.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Cree una clase denominada Receiver y agréguele las siguientes variables de cadena. Reemplace los marcadores de posición por los valores correctos.

    Importante

    Reemplace los marcadores de posición por los valores correctos.

    • <NAMESPACE NAME> por el nombre del espacio de nombres de Event Hubs.
    • <EVENT HUB NAME> con el nombre del centro de eventos en el espacio de nombres.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Agregue el siguiente método main a la clase.

    Importante

    Reemplace los marcadores de posición por los valores correctos.

    • <STORAGE ACCOUNT NAME> con el nombre de la cuenta de Azure Storage.
    • <CONTAINER NAME> con el nombre del contenedor de blobs de la cuenta de almacenamiento.
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Agregue los dos métodos auxiliares (PARTITION_PROCESSOR y ERROR_HANDLER) que procesan eventos y errores a la clase Receiver.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Compile el programa y asegúrese de que no hay errores.

Ejecución de las aplicaciones

  1. Ejecute primero la aplicación del destinatario.

  2. Luego, ejecute la aplicación del remitente.

  3. En la ventana de la aplicación del destinatario, confirme que ve los eventos publicados por la aplicación del remitente.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Presione ENTRAR en la ventana de la aplicación del destinatario para detener la aplicación.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Pasos siguientes

Consulte los siguientes ejemplos en GitHub: