Отправка событий и получение событий из концентраторов событий Azure с помощью JavaUse Java to send events to or receive events from Azure Event Hubs

В этом руководстве показано, как создавать приложения Java для отправки событий в концентраторы событий Azure и получения событий.This tutorial shows how to create Java applications to send events to or receive events from Azure Event Hubs.

Центры событий Azure — это платформа потоковой передачи больших данных и служба приема событий, принимающая и обрабатывающая миллионы событий в секунду.Azure Event Hubs is a Big Data streaming platform and event ingestion service, capable of receiving and processing millions of events per second. Служба "Центры событий" может обрабатывать и сохранять события и данные, в том числе телеметрические, созданные распределенным программным обеспечением и устройствами.Event Hubs can process and store events, data, or telemetry produced by distributed software and devices. Данные, отправляемые в концентратор событий, можно преобразовывать и сохранять с помощью любого поставщика аналитики в режиме реального времени, а также с помощью адаптеров пакетной обработки или хранения.Data sent to an event hub can be transformed and stored using any real-time analytics provider or batching/storage adapters. Подробные сведения о концентраторах событий см. в статье Общие сведения о концентраторах событий и функциях концентраторов событий.For detailed overview of Event Hubs, see Event Hubs overview and Event Hubs features.

Примечание

Вы можете скачать это краткое руководство в качестве примера с сайта GitHub, заменить строки EventHubConnectionString и EventHubName значениями для своего концентратора событий и выполнить этот пример.You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName strings with your event hub values, and run it. Или следуйте инструкциям из этого руководства, чтобы создать собственное решение.Alternatively, you can follow the steps in this tutorial to create your own.

Технические условияPrerequisites

Для работы с данным руководством вам потребуется:To complete this tutorial, you need the following prerequisites:

  • Активная учетная запись Azure.An active Azure account. Если у вас еще нет подписки Azure, создайте бесплатная учетная запись, прежде чем начинать работу.If you do not have an Azure subscription, create a free account before you begin.
  • Среда разработки Java.A Java development environment. В этом руководстве используется среда Eclipse.This tutorial uses Eclipse.
  • Создайте пространство имен концентраторов событий и концентратор событий.Create an Event Hubs namespace and an event hub. Первым шагом является использование портала Azure для создания пространства имен типа Центров событий и получение учетных данных управления, необходимых приложению для взаимодействия с концентратором событий.The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. Чтобы создать пространство имен и концентратор событий, выполните инструкции из этой статьи.To create a namespace and an event hub, follow the procedure in this article. Затем получите значение ключа доступа для концентратора событий, выполнив инструкции из статьи " Получение строки подключения".Then, get the value of access key for the event hub by following instructions from the article: Get connection string. Используйте ключ доступа в коде, который вы напишете далее в рамках этого руководства.You use the access key in the code you write later in this tutorial. Имя ключа по умолчанию: RootManageSharedAccessKey.The default key name is: RootManageSharedAccessKey.

Отправка событийSend events

В этом разделе показано, как создать приложение Java для отправки событий в концентратор событий.This section shows you how to create a Java application to send events an event hub.

Добавление ссылки на библиотеку Центров событий AzureAdd reference to Azure Event Hubs library

Клиентскую библиотеку Java для Центров событий можно использовать в проектах Maven из центрального репозитория Maven.The Java client library for Event Hubs is available for use in Maven projects from the Maven Central Repository. Чтобы сослаться на эту библиотеку, используйте следующее объявление зависимостей в файле проекта Maven:You can reference this library using the following dependency declaration inside your Maven project file:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>2.2.0</version>
</dependency>

Для различных типов сред сборки можно явно получить последние выпущенные JAR-файлы из центрального репозитория Maven.For different types of build environments, you can explicitly obtain the latest released JAR files from the Maven Central Repository.

Если используется простой издатель событий, импортируйте пакет com.microsoft.azure.eventhubs для клиентских классов Центров событий и пакет com.microsoft.azure.servicebus для служебных классов, таких как общие исключения, которые используются совместно с клиентом обмена сообщениями служебной шины Azure.For a simple event publisher, import the com.microsoft.azure.eventhubs package for the Event Hubs client classes and the com.microsoft.azure.servicebus package for utility classes such as common exceptions that are shared with the Azure Service Bus messaging client.

Написание кода для отправки сообщений в концентратор событийWrite code to send messages to the event hub

Следующий пример сначала создает новый проект Maven для приложения консоли или оболочки в избранной среде разработки Java.For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. Добавьте класс с именем SimpleSend и добавьте в него следующий код.Add a class named SimpleSend, and add the following code to the class:

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

public class SimpleSend {

    public static void main(String[] args)
            throws EventHubException, ExecutionException, InterruptedException, IOException {
            
            
    }
 }

Создание строки подключенияConstruct connection string

Чтобы создать значение строки подключения, используйте класс ConnectionStringBuilder. Затем передайте это значение в экземпляр клиента службы "Центры событий".Use the ConnectionStringBuilder class to construct a connection string value to pass to the Event Hubs client instance. Замените заполнители значениями, которые получены при создании пространства имен и концентратора событий:Replace the placeholders with the values you obtained when you created the namespace and event hub:

        final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("<EVENTHUB NAMESPACE") 
                .setEventHubName("EVENT HUB")
                .setSasKeyName("RootManageSharedAccessKey")
                .setSasKey("SHARED ACCESS KEY");

Написание кода для отправки событийWrite code to send events

Создайте одиночное событие, преобразовав строку в байтовую кодировку UTF-8.Create a singular event by transforming a string into its UTF-8 byte encoding. Затем создайте экземпляр клиента Центров событий из строки подключения и отправьте сообщение.Then, create a new Event Hubs client instance from the connection string and send the message:

        final Gson gson = new GsonBuilder().create();

        // The Executor handles all asynchronous tasks and this is passed to the EventHubClient instance.
        // This enables the user to segregate their thread pool based on the work load.
        // This pool can then be shared across multiple EventHubClient instances.
        // The following sample uses a single thread executor, as there is only one EventHubClient instance,
        // handling different flavors of ingestion to Event Hubs here.
        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);

        // Each EventHubClient instance spins up a new TCP/SSL connection, which is expensive.
        // It is always a best practice to reuse these instances. The following sample shows this.
        final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);


        try {
            for (int i = 0; i < 10; i++) {

                String payload = "Message " + Integer.toString(i);
                byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
                EventData sendEvent = EventData.create(payloadBytes);

                // Send - not tied to any partition
                // Event Hubs service will round-robin the events across all Event Hubs partitions.
                // This is the recommended & most reliable way to send to Event Hubs.
                ehClient.sendSync(sendEvent);
            }

            System.out.println(Instant.now() + ": Send Complete...");
            System.out.println("Press Enter to stop.");
            System.in.read();
        } finally {
            ehClient.closeSync();
            executorService.shutdown();
        }

Скомпилируйте и запустите программу и убедитесь в отсутствии ошибок.Build and run the program, and ensure that there are no errors.

Поздравляем!Congratulations! Теперь вы можете отправлять сообщения в концентратор событий.You have now sent messages to an event hub.

Дополнение. Способ маршрутизации сообщений к разделам концентратора событийAppendix: How messages are routed to EventHub partitions

Прежде чем сообщения будут получены потребителями, они должны быть опубликованы издателями в разделах.Before messages are retrieved by consumers, they have to be published to the partitions first by the publishers. При синхронной публикации сообщения в концентраторе событий с помощью метода sendSync() в объекте com.microsoft.azure.eventhubs.EventHubClient сообщение может быть отправлено в определенную секцию или распределиться по всем доступным разделам циклически в зависимости от того, указан ли ключ раздела или нет.When messages are published to event hub synchronously using the sendSync() method on the com.microsoft.azure.eventhubs.EventHubClient object, the message could be sent to a specific partition or distributed to all available partitions in a round-robin manner depending on whether the partition key is specified or not.

Когда указана строка, представляющая ключ раздела, ключ будет хэширован, чтобы определить, в какой раздел нужно отправить событие.When a string representing the partition key is specified, the key will be hashed to determine which partition to send the event to.

Если ключ раздела не задан, сообщения будут циклически отправляться во все доступные разделы.When the partition key is not set, then messages will round-robined to all available partitions

// Serialize the event into bytes
byte[] payloadBytes = gson.toJson(messagePayload).getBytes(Charset.defaultCharset());

// Use the bytes to construct an {@link EventData} object
EventData sendEvent = EventData.create(payloadBytes);

// Transmits the event to event hub without a partition key
// If a partition key is not set, then we will round-robin to all topic partitions
eventHubClient.sendSync(sendEvent);

//  the partitionKey will be hash'ed to determine the partitionId to send the eventData to.
eventHubClient.sendSync(sendEvent, partitionKey);

// close the client at the end of your program
eventHubClient.closeSync();

Получение событийReceive events

Код в этом руководстве основан на коде EventProcessorSample в GitHub. Изучите его, чтобы получить полное представление о рабочем приложении.The code in this tutorial is based on the EventProcessorSample code on GitHub, which you can examine to see the full working application.

Прием сообщений через EventProcessorHost в JavaReceive messages with EventProcessorHost in Java

EventProcessorHost — это класс Java, который упрощает прием событий от концентраторов событий путем управления постоянными контрольными точками и параллельно принимает сообщения от этих концентраторов событий.EventProcessorHost is a Java class that simplifies receiving events from Event Hubs by managing persistent checkpoints and parallel receives from those Event Hubs. С помощью класса EventProcessorHost можно разделить события между несколькими получателями, даже если они размещены на разных узлах.Using EventProcessorHost, you can split events across multiple receivers, even when hosted in different nodes. В этом примере показано, как использовать EventProcessorHost для одного получателя.This example shows how to use EventProcessorHost for a single receiver.

Создание учетной записи хранилищаCreate a storage account

Чтобы использовать EventProcessorHost, необходимо [учетная запись хранения Azure] [учетная запись хранения Azure]:To use EventProcessorHost, you must have an [Azure Storage account][Azure Storage account]:

  1. Войдите в портал Azureи выберите создать ресурс в левой части экрана.Sign in the Azure portal, and select Create a resource on the left-hand side of the screen.

  2. Выберите хранилище, а затем выберите учетная запись хранения.Select Storage, then select Storage account. В окне Создание учетной записи хранения введите имя учетной записи хранения.In the Create storage account window, type a name for the storage account. Заполните остальные поля, выберите нужный регион и нажмите кнопку создать.Complete the rest of the fields, select your desired region, and then select Create.

    Создание учетной записи хранения в портал Azure

  3. Выберите только что созданную учетную запись хранения, а затем выберите ключи доступа:Select the newly created storage account, and then select Access Keys:

    Получение ключей доступа в портал Azure

    Скопируйте значение key1 во временное расположение.Copy the key1 value to a temporary location. Оно будет использоваться далее в этом руководстве.You use it later in this tutorial.

Создание проекта Java с помощью EventProcessorHostCreate a Java project using the EventProcessor Host

Клиентская библиотека Java для Центров событий доступна для использования в проектах из центрального репозитория Maven. Ссылаться на нее можно, используя следующее объявление зависимости в файле проекта Maven:The Java client library for Event Hubs is available for use in Maven projects from the Maven Central Repository, and can be referenced using the following dependency declaration inside your Maven project file:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs-eph</artifactId>
    <version>2.4.0</version>
</dependency>

Для различных типов сред сборки можно явно получить последние выпущенные JAR-файлы из центрального репозитория Maven.For different types of build environments, you can explicitly obtain the latest released JAR files from the Maven Central Repository.

  1. Следующий пример сначала создает новый проект Maven для приложения консоли или оболочки в избранной среде разработки Java.For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. Класс называется ErrorNotificationHandler.The class is called ErrorNotificationHandler.

    import java.util.function.Consumer;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    
    public class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
    {
        @Override
        public void accept(ExceptionReceivedEventArgs t)
        {
            System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
        }
    }
    
  2. Создайте новый класс с именем EventProcessorSampleс помощью следующего кода:Use the following code to create a new class called EventProcessorSample. Замените значения заполнителей значениями, которые использовались при создании концентратора событий и учетной записи хранения:Replace the placeholders with the values used when you created the event hub and storage account:

    package com.microsoft.azure.eventhubs.samples.eventprocessorsample;
    
    import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
    import com.microsoft.azure.eventhubs.EventData;
    import com.microsoft.azure.eventprocessorhost.CloseReason;
    import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
    import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    import com.microsoft.azure.eventprocessorhost.IEventProcessor;
    import com.microsoft.azure.eventprocessorhost.PartitionContext;
    
    import java.util.concurrent.ExecutionException;
    import java.util.function.Consumer;
    
    public class EventProcessorSample
    {
        public static void main(String args[]) throws InterruptedException, ExecutionException
        {
            String consumerGroupName = "$Default";
            String namespaceName = "----NamespaceName----";
            String eventHubName = "----EventHubName----";
            String sasKeyName = "----SharedAccessSignatureKeyName----";
            String sasKey = "----SharedAccessSignatureKey----";
            String storageConnectionString = "----AzureStorageConnectionString----";
            String storageContainerName = "----StorageContainerName----";
            String hostNamePrefix = "----HostNamePrefix----";
    
            ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
                 .setNamespaceName(namespaceName)
                 .setEventHubName(eventHubName)
                 .setSasKeyName(sasKeyName)
                 .setSasKey(sasKey);
    
            EventProcessorHost host = new EventProcessorHost(
                 EventProcessorHost.createHostName(hostNamePrefix),
                 eventHubName,
                 consumerGroupName,
                 eventHubConnectionString.toString(),
                 storageConnectionString,
                 storageContainerName);
    
            System.out.println("Registering host named " + host.getHostName());
            EventProcessorOptions options = new EventProcessorOptions();
            options.setExceptionNotification(new ErrorNotificationHandler());
    
            host.registerEventProcessor(EventProcessor.class, options)
            .whenComplete((unused, e) ->
            {
                if (e != null)
                {
                    System.out.println("Failure while registering: " + e.toString());
                    if (e.getCause() != null)
                    {
                        System.out.println("Inner exception: " + e.getCause().toString());
                    }
                }
            })
            .thenAccept((unused) ->
            {
                System.out.println("Press enter to stop.");
                try 
                {
                    System.in.read();
                }
                catch (Exception e)
                {
                    System.out.println("Keyboard read failed: " + e.toString());
                }
            })
            .thenCompose((unused) ->
            {
                return host.unregisterEventProcessor();
            })
            .exceptionally((e) ->
            {
                System.out.println("Failure while unregistering: " + e.toString());
                if (e.getCause() != null)
                {
                    System.out.println("Inner exception: " + e.getCause().toString());
                }
                return null;
            })
            .get(); // Wait for everything to finish before exiting main!
    
            System.out.println("End of sample");
        }
    
  3. Создайте еще один класс с именем EventProcessor при помощи следующего кода:Create one more class called EventProcessor, using the following code:

    public static class EventProcessor implements IEventProcessor
    {
        private int checkpointBatchingCount = 0;
    
        // OnOpen is called when a new event processor instance is created by the host. 
        @Override
        public void onOpen(PartitionContext context) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
        }
    
        // OnClose is called when an event processor instance is being shut down. 
        @Override
        public void onClose(PartitionContext context, CloseReason reason) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
        }
    
        // onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
        @Override
        public void onError(PartitionContext context, Throwable error)
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
        }
    
        // onEvents is called when events are received on this partition of the Event Hub. 
        @Override
        public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
            int eventCount = 0;
            for (EventData data : events)
            {
                try
                {
                    System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
                            data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
                    eventCount++;
    
                    // Checkpointing persists the current position in the event stream for this partition and means that the next
                    // time any host opens an event processor on this event hub+consumer group+partition combination, it will start
                    // receiving at the event after this one. 
                    this.checkpointBatchingCount++;
                    if ((checkpointBatchingCount % 5) == 0)
                    {
                        System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
                            data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
                        // Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
                        // before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
                        context.checkpoint(data).get();
                    }
                }
                catch (Exception e)
                {
                    System.out.println("Processing failed for an event: " + e.toString());
                }
            }
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
        }
    }
    

В данном учебнике используется один экземпляр EventProcessorHost.This tutorial uses a single instance of EventProcessorHost. Чтобы увеличить пропускную способность, можно запустить несколько экземпляров EventProcessorHost, желательно на разных компьютерах.To increase throughput, we recommend that you run multiple instances of EventProcessorHost, preferably on separate machines. Кроме того, такое решение обеспечит избыточность.It provides redundancy as well. В этом случае различные экземпляры автоматически координируются друг с другом для распределения нагрузки полученных событий.In those cases, the various instances automatically coordinate with each other in order to load balance the received events. Если каждый из нескольких получателей должен обрабатывать все события, то необходимо использовать понятие ConsumerGroup .If you want multiple receivers to each process all the events, you must use the ConsumerGroup concept. При получении события от разных компьютеров может оказаться полезным указать имена экземпляров EventProcessorHost в компьютерах (или ролях), где они развернуты.When receiving events from different machines, it might be useful to specify names for EventProcessorHost instances based on the machines (or roles) in which they are deployed.

Публикация сообщений в EventHubPublishing Messages to EventHub

Прежде чем сообщения будут получены потребителями, они должны быть опубликованы издателями в разделах.Before messages are retrieved by consumers, they have to be published to the partitions first by the publishers. Следует отметить, что при синхронной публикации сообщения в концентраторе событий с помощью метода sendSync() в объекте com.microsoft.azure.eventhubs.EventHubClient сообщение может быть отправлено в определенный раздел или распределиться по всем доступным разделам циклически, в зависимости от того, указан ли ключ раздела.It is worth noting that when messages are published to event hub synchronously using the sendSync() method on the com.microsoft.azure.eventhubs.EventHubClient object, the message could be sent to a specific partition or distributed to all available partitions in a round-robin manner depending on whether the partition key is specified or not.

Если указана строка, представляющая ключ раздела, раздел для отправки события определяется по хэшу ключа.When a string representing the partition key is specified, the key is hashed to determine which partition to send the event to.

Если ключ раздела не задан, сообщения будут отправляются поочередно во все доступные разделы.When the partition key is not set, then messages are round-robined to all available partitions

// Serialize the event into bytes
byte[] payloadBytes = gson.toJson(messagePayload).getBytes(Charset.defaultCharset());

// Use the bytes to construct an {@link EventData} object
EventData sendEvent = EventData.create(payloadBytes);

// Transmits the event to event hub without a partition key
// If a partition key is not set, then we will round-robin to all topic partitions
eventHubClient.sendSync(sendEvent);

//  the partitionKey will be hash'ed to determine the partitionId to send the eventData to.
eventHubClient.sendSync(sendEvent, partitionKey);

Реализация пользовательского CheckpointManager для EventProcessorHost (EPH)Implementing a Custom CheckpointManager for EventProcessorHost (EPH)

API предоставляет механизм реализации пользовательского диспетчера контрольных точек для сценариев, где реализация по умолчанию несовместима с вашим вариантом использования.The API provides a mechanism to implement your custom checkpoint manager for scenarios where the default implementation is not compatible with your use case.

Диспетчер контрольных точек по умолчанию использует хранилище BLOB-объектов, но вы можете переопределить диспетчер контрольных точек собственной реализацией и использовать любое хранилище для резервного копирования в этой реализации диспетчера контрольных точек.The default checkpoint manager uses blob storage but if you override the checkpoint manager used by EPH with your own implementation, you can use any store you want to back your checkpoint manager implementation.

Создайте класс, который реализует интерфейс com.microsoft.azure.eventprocessorhost.ICheckpointManager.Create a class that implements the interface com.microsoft.azure.eventprocessorhost.ICheckpointManager

Используйте свою реализацию пользовательского диспетчера контрольных точек (com.microsoft.azure.eventprocessorhost.ICheckpointManager).Use your custom implementation of the checkpoint manager (com.microsoft.azure.eventprocessorhost.ICheckpointManager)

В рамках своей реализации можно переопределить механизм контрольных точек по умолчанию и реализовать собственные контрольные точки на основе собственного хранилища данных (например, SQL Server, CosmosDB и кэша Azure для Redis).Within your implementation, you can override the default checkpointing mechanism and implement our own checkpoints based on your own data store (like SQL Server, CosmosDB, and Azure Cache for Redis). Мы рекомендуем, чтобы хранилище, используемое для резервного копирования в вашей реализации диспетчера контрольных точек, было доступно всем экземплярам EPH, которые обрабатывают события для группы потребителей.We recommend that the store used to back your checkpoint manager implementation is accessible to all EPH instances that are processing events for the consumer group.

Вы можете использовать любое хранилище данных, которое доступно в вашей среде.You can use any datastore that is available in your environment.

Класс com.microsoft.azure.eventprocessorhost.EventProcessorHost предоставляет два конструктора, которые позволяют переопределить диспетчер контрольных точек для EventProcessorHost.The com.microsoft.azure.eventprocessorhost.EventProcessorHost class provides you with two constructors that allow you to override the checkpoint manager for your EventProcessorHost.

Дальнейшие действияNext steps

Ознакомьтесь со следующими статьями:Read the following articles: