Use Java para enviar eventos ou receber eventos de Azure Event Hubs (azure-messaging-eventhubs)

Este quickstart mostra como enviar eventos e receber eventos de um centro de eventos usando o pacote java de mensagens azure-eventhubs.

Importante

Este quickstart usa o novo pacote azure-messaging-eventhubs. Para um arranque rápido que utiliza os antigos pacotes azure-eventhubs-eph, consulte Enviar e receber eventos usando azure-eventhubs e azure-eventhubs-eph.

Pré-requisitos

Se você é novo em Azure Event Hubs, consulte o Event Hubs antes de fazer este quickstart.

Para completar este arranque rápido, precisa dos seguintes pré-requisitos:

  • Subscrição do Microsoft Azure. Para utilizar os serviços Azure, incluindo os Azure Event Hubs, precisa de uma subscrição. Se não tiver uma conta Azure existente, pode inscrever-se para um teste gratuito ou utilizar os benefícios do seu assinante MSDN quando criar uma conta.
  • Um ambiente de desenvolvimento de Java. Este quickstart usa Eclipse. É necessário o Kit de Desenvolvimento de Java (JDK) com a versão 8 ou superior.
  • Crie um espaço de nomes de Centros de Eventos e um centro de eventos. O primeiro passo consiste em utilizar o portal do Azure para criar um espaço de nomes do tipo Hubs de Eventos e obter as credenciais de gestão de que a sua aplicação precisa para comunicar com o hub de eventos. Para criar um espaço de nome e um centro de eventos, siga o procedimento neste artigo. Em seguida, obtenha a cadeia de ligação para o espaço de nomes Do Event Hubs seguindo as instruções do artigo: Obter a cadeia de ligação. Utilize a cadeia de ligação mais tarde neste arranque rápido.

Enviar eventos

Esta secção mostra como criar uma aplicação Java para enviar eventos um centro de eventos.

Adicionar referência à biblioteca Azure Event Hubs

A biblioteca de clientes Java para Centros de Eventos está disponível no Repositório Central maven. Pode fazer referência a esta biblioteca utilizando a seguinte declaração de dependência dentro do seu ficheiro de projeto Maven:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.7.0</version>
</dependency>

Nota

Atualize a versão para a versão mais recente publicada no repositório Maven.

Escrever códigos para enviar mensagens ao hub de eventos

Para o exemplo que se segue, comece por criar um novo projeto Maven para uma consola/aplicação shell no seu ambiente de desenvolvimento Java favorito. Adicione uma classe chamada Sender , e adicione o seguinte código à classe:

Importante

Atualize <Event Hubs namespace connection string> com o fio de ligação ao seu espaço de nomes Desempaco. Atualize <Event hub name> com o nome do seu centro de eventos no espaço de nomes.

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

public class Sender {
    private static final String connectionString = "<Event Hubs namespace connection string>";
    private static final String eventHubName = "<Event hub name>";

    public static void main(String[] args) {
        publishEvents();
    }
}

Adicione código para publicar eventos no centro de eventos

Adicione um método nomeado publishEvents à Sender classe:

    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a producer client
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }

Construa o programa e certifique-se de que não há erros. Executará este programa depois de executar o programa recetor.

Receber eventos

O código deste tutorial baseia-se na amostra EventProcessorClient no GitHub,que pode examinar para ver a aplicação completa de trabalho.

Aviso

Se executar este código no Azure Stack Hub, sofrerá erros de tempo de execução a menos que tenha como alvo uma versão API de armazenamento específica. Isto porque o Event Hubs SDK utiliza a mais recente API de Armazenamento Azure disponível disponível no Azure que pode não estar disponível na sua plataforma Azure Stack Hub. O Azure Stack Hub pode suportar uma versão diferente do Azure Blob Storage SDK do que os normalmente disponíveis no Azure. Se estiver a utilizar o Azure Blob Storage como uma loja de ponto de verificação, verifique a versão API suportada para o seu Azure Stack Hub e direcione essa versão no seu código.

Por exemplo, Se estiver a executar a versão Azure Stack Hub 2005, a versão mais alta disponível para o serviço de armazenamento é a versão 2019-02-02. Por padrão, a biblioteca de clientes Event Hubs SDK utiliza a versão mais alta disponível no Azure (2019-07-07 no momento do lançamento do SDK). Neste caso, além de seguir os passos nesta secção, também terá de adicionar código para direcionar o serviço de armazenamento API versão 2019-02-02. Para um exemplo sobre como direcionar uma versão específica da API de armazenamento, consulte esta amostra no GitHub.

Criar um armazenamento Azure e um recipiente blob

Neste arranque rápido, utilize o Azure Storage (especificamente, Blob Storage) como loja de pontos de verificação. O checkpointing é um processo pelo qual um processador de eventos marca ou compromete a posição do último evento processado com sucesso dentro de uma partição. A marcação de um ponto de verificação é normalmente feita dentro da função que processa os eventos. Para saber mais sobre o checkpoint, consulte o processador Event.

Siga estes passos para criar uma conta de Armazenamento Azure.

  1. Criar uma conta de Armazenamento do Azure

  2. Criar um contentor de blobs

  3. Obtenha o fio de ligação para a conta de armazenamento

    Observe o fio de ligação e o nome do recipiente. Vai usá-los no código de receção.

Adicione bibliotecas de Centros de Eventos ao seu projeto Java

Adicione as seguintes dependências no ficheiro pom.xml.

<dependencies>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs</artifactId>
        <version>5.7.0</version>
    </dependency>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
        <version>1.6.0</version>
    </dependency>
</dependencies>
  1. Adicione as seguintes declarações de importação no topo do ficheiro Java.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
  2. Crie uma classe chamada Receiver , e adicione as seguintes variáveis de corda à classe. Substitua os espaços reservados pelos valores corretos.

    Importante

    Substitua os espaços reservados pelos valores corretos.

    • <Event Hubs namespace connection string> com a cadeia de ligação ao seu espaço de nomes Event Hubs. Atualizar
    • <Event hub name> com o nome do seu centro de eventos no espaço de nomes.
    • <Storage connection string> com o fio de ligação à sua conta de armazenamento Azure.
    • <Storage container name> com o nome do seu recipiente no seu armazém Azure Blob.
    private static final String connectionString = "<Event Hubs namespace connection string>";
    private static final String eventHubName = "<Event hub name>";
    private static final String storageConnectionString = "<Storage connection string>";
    private static final String storageContainerName = "<Storage container name>";
    
  3. Adicione o seguinte main método à classe.

    public static void main(String[] args) throws Exception {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(storageConnectionString)
            .containerName(storageContainerName)
            .buildAsyncClient();
    
        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(connectionString, eventHubName)
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .processEvent(PARTITION_PROCESSOR)
            .processError(ERROR_HANDLER)
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
    
        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
    
        System.out.println("Starting event processor");
        eventProcessorClient.start();
    
        System.out.println("Press enter to stop.");
        System.in.read();
    
        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");
    
        System.out.println("Exiting process");
    }
    
  4. Adicione os dois métodos de ajuda PARTITION_PROCESSOR ERROR_HANDLER (e) que processam eventos e erros à Receiver classe.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  5. Construa o programa e certifique-se de que não há erros.

Executar as aplicações

  1. Executar primeiro a aplicação recetora.

  2. Em seguida, executar o pedido de Remetente.

  3. Na janela de aplicação do Recetor, confirme que vê os eventos que foram publicados pela aplicação Sender.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Prima ENTER na janela de aplicação do recetor para parar a aplicação.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Passos seguintes

Veja as seguintes amostras no GitHub: