Использование Java для отправки и получения событий в Центрах событий Azure

В этом кратком руководстве показано, как отправлять события в концентратор событий и получать события из него с помощью пакета Java azure-messaging-eventhubs.

Совет

Если вы работаете с ресурсами Центры событий Azure в приложении Spring, рекомендуется использовать Spring Cloud Azure в качестве альтернативы. Spring Cloud Azure — это проект с открытым исходным кодом, который обеспечивает простую интеграцию Spring со службами Azure. Дополнительные сведения о Spring Cloud Azure и примерах использования Центров событий см. в статье Spring Cloud Stream с Центры событий Azure.

Необходимые компоненты

Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями, прежде чем приступить к работе с этим руководством.

Для работы с данным руководством необходимо следующее:

  • Подписка Microsoft Azure. Чтобы использовать службы Azure, в том числе Центры событий Azure, потребуется действующая подписка. Если у вас еще нет учетной записи Azure, зарегистрируйтесь для работы с бесплатной пробной версией или активируйте преимущества для подписчиков MSDN при создании учетной записи.
  • Среда разработки Java. В рамках этого краткого руководства используется Eclipse. Необходим комплект SDK Java (JDK) версии 8 или более поздней.
  • Создайте пространство имен Центров событий и концентратор событий. Первым шагом является использование портала Azure для создания пространства имен типа Центров событий и получение учетных данных управления, необходимых приложению для взаимодействия с концентратором событий. Чтобы создать пространство имен и концентратор событий, выполните инструкции из этой статьи. Получите строку подключения для пространства имен Центров событий, следуя инструкциям из статьи Получение строки подключения. Строка подключения понадобится нам позже в рамках этого краткого руководства.

Отправка событий

Из этого раздела вы узнаете, как создать приложение Java, которое отправляет события в концентратор событий.

Добавление ссылки на библиотеку Центров событий Azure

Сначала создайте проект Maven для консольного или оболочкного приложения в любимой среде разработки Java. pom.xml Обновите файл следующим образом. Клиентская библиотека Java для Центров событий доступна в центральном репозитории Maven.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Примечание.

Обновите версию до последней, опубликованной в репозитории Maven.

Проверка подлинности приложения в Azure

В этом кратком руководстве показано два способа подключения к Центры событий Azure: без пароля и строка подключения. Первый вариант показывает, как использовать субъект безопасности в идентификаторе Microsoft Entra ID и управлении доступом на основе ролей (RBAC) для подключения к пространству имен Центров событий. Вам не нужно беспокоиться о наличии жестко закодированных строка подключения в коде или в файле конфигурации или в безопасном хранилище, например Azure Key Vault. Второй вариант показывает, как использовать строка подключения для подключения к пространству имен Центров событий. Если вы не знакомы с Azure, вы можете найти вариант строка подключения проще следовать. Мы рекомендуем использовать параметр без пароля в реальных приложениях и рабочих средах. Дополнительные сведения см. в разделе "Проверка подлинности и авторизация". Дополнительные сведения о проверке подлинности без пароля см. на странице обзора.

Назначение ролей пользователю Microsoft Entra

При локальной разработке убедитесь, что учетная запись пользователя, которая подключается к Центры событий Azure имеет правильные разрешения. Для отправки и получения сообщений вам потребуется роль владельца данных Центры событий Azure. Чтобы назначить себе эту роль, вам потребуется роль Администратор istrator пользователя или другую роль, которая включает Microsoft.Authorization/roleAssignments/write действие. Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Узнайте больше о доступных область для назначений ролей на странице обзора область.

В следующем примере роль назначается Azure Event Hubs Data Owner учетной записи пользователя, которая предоставляет полный доступ к ресурсам Центры событий Azure. В реальном сценарии следуйте принципу наименьших привилегий , чтобы предоставить пользователям только минимальные разрешения, необходимые для более безопасной рабочей среды.

Встроенные роли Azure для Центров событий Azure

Для Центры событий Azure управление пространствами имен и всеми связанными ресурсами с помощью портал Azure и API управления ресурсами Azure уже защищено с помощью модели Azure RBAC. Azure предоставляет следующие встроенные роли Azure для авторизации доступа к пространству имен Центров событий:

Если вы хотите создать пользовательскую роль, см. раздел "Права", необходимые для операций Центров событий.

Внимание

В большинстве случаев для распространения назначения ролей в Azure потребуется несколько минут. В редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.

  1. В портал Azure найдите пространство имен Центров событий с помощью главной панели поиска или навигации слева.

  2. На странице обзора выберите элемент управления доступом (IAM) в меню слева.

  3. На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.

  4. Выберите + Добавить в верхнем меню, а затем выберите Добавить назначение роли в появившемся раскрывающемся меню.

    A screenshot showing how to assign a role.

  5. Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите Azure Event Hubs Data Owner и выберите соответствующий результат. Теперь щелкните Далее.

  6. В разделе Назначение доступа для выберите Пользователь, группа или субъект-служба и + Выбрать членов.

  7. В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты), а затем выберите в нижней части диалогового окна.

  8. Нажмите кнопку Проверить и назначить, чтобы перейти на последнюю страницу, а затем еще раз Проверить и назначить, чтобы завершить процесс.

Написание кода для отправки сообщений в концентратор событий

Добавьте класс с именем Sender и добавьте в него следующий код.

Внимание

  • Обновите <NAMESPACE NAME> имя пространства имен Центров событий.
  • Обновите <EVENT HUB NAME> имя концентратора событий.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Скомпилируйте программу и убедитесь в отсутствии ошибок. Эту программу вы запустите после запуска программы получателя.

Получение событий

Код в этом руководстве основан на шаблоне EventProcessorClient в GitHub. Изучите его, чтобы получить полное представление о рабочем приложении.

Следуйте этим рекомендациям при использовании Хранилище BLOB-объектов Azure в качестве хранилища проверка point:

  • Используйте отдельный контейнер для каждой группы потребителей. Вы можете использовать одну и ту же учетную запись хранения, но использовать один контейнер для каждой группы.
  • Не используйте контейнер для других компонентов и не используйте учетную запись хранения для других действий.
  • служба хранилища учетная запись должна находиться в том же регионе, в который находится развернутое приложение. Если приложение находится в локальной среде, попробуйте выбрать ближайший регион.

На странице учетной записи служба хранилища в портал Azure в разделе службы BLOB-объектов убедитесь, что следующие параметры отключены.

  • Иерархическое пространство имен
  • Обратимое удаление BLOB-объекта
  • Управление версиями

Создание службы хранилища Azure и контейнера больших двоичных объектов

В этом кратком руководстве показано, как настроить службу хранилища Azure (Хранилище BLOB-объектов) в качестве хранилища контрольных точек. Назначение контрольных точек — это процесс, в ходе которого обработчик событий отмечает или фиксирует положение последнего успешно обработанного события в секции. Контрольная точка обычно отмечается в функции, которая обрабатывает события. См. статью Балансировка нагрузки секций между несколькими экземплярами приложения.

Выполните указанные ниже действия, чтобы создать учетную запись хранения Azure.

  1. Создайте учетную запись хранения Azure
  2. Создание контейнера больших двоичных объектов
  3. Проверка подлинности в контейнере BLOB-объектов

Если вы выполняете разработку локально, убедитесь, что учетная запись пользователя, через которую осуществляется доступ к данным BLOB-объектов, имеет правильные разрешения. Вам потребуется служба хранилища участник данных BLOB-объектов для чтения и записи данных BLOB-объектов. Чтобы назначить себе эту роль, вам потребуется назначить роль Администратор istrator для доступа пользователей или другую роль, включающую действие Microsoft.Authorization/roleAssignments/write. Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Дополнительные сведения о доступных областях назначения ролей можно узнать на странице обзора области.

В этом сценарии вы назначите разрешения учетной записи пользователя, которая ограничена учетной записью хранения, чтобы обеспечить соблюдение принципа минимальных привилегий. В рамках этой практики пользователям предоставляются только минимальные необходимые разрешения, что позволяет создавать более защищенные рабочие среды.

В следующем примере будет назначена роль участника данных BLOB-объектов служба хранилища учетной записи пользователя, которая предоставляет доступ для чтения и записи к данным BLOB-объектов в вашей учетной записи хранения.

Внимание

В большинстве случаев для распространения назначения ролей в Azure потребуется минута или две, но в редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.

  1. На портале Azure найдите свою учетную запись хранения, воспользовавшись основной панелью поиска или областью навигации слева.

  2. На странице обзора учетной записи хранения выберите Контроль доступа (IAM) в меню слева.

  3. На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.

  4. Выберите + Добавить в верхнем меню, а затем выберите Добавить назначение роли в появившемся раскрывающемся меню.

    A screenshot showing how to assign a storage account role.

  5. Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите участника данных BLOB-объектов хранилища и выберите соответствующий результат, а затем нажмите кнопку Далее.

  6. В разделе Назначение доступа для выберите Пользователь, группа или субъект-служба и + Выбрать членов.

  7. В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты), а затем выберите в нижней части диалогового окна.

  8. Нажмите кнопку Проверить и назначить, чтобы перейти на последнюю страницу, а затем еще раз Проверить и назначить, чтобы завершить процесс.

Добавление библиотек Центров событий в проект Java

Добавьте следующие зависимости в файл pom.xml.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Добавьте следующие import инструкции в верхней части файла Java.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Создайте класс с именем Receiver и добавьте в него следующие строковые переменные. Замените значения заполнителей своими значениями.

    Внимание

    Замените значения заполнителей своими значениями.

    • <NAMESPACE NAME> замените именем пространства имен для Центров событий;
    • Замените <EVENT HUB NAME> именем пространства имен для концентратора событий.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Добавьте в класс следующий метод main.

    Внимание

    Замените значения заполнителей своими значениями.

    • <STORAGE ACCOUNT NAME>с именем учетной записи служба хранилища Azure.
    • <CONTAINER NAME> с именем контейнера BLOB-объектов в учетной записи хранения
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Добавьте два вспомогательных метода (PARTITION_PROCESSOR и ERROR_HANDLER), которые обрабатывают события и ошибки в классе Receiver.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Скомпилируйте программу и убедитесь в отсутствии ошибок.

Запуск приложений

  1. Сначала запустите приложение получателя.

  2. Затем запустите приложение отправителя.

  3. Убедитесь, что в окне приложения получателя отображаются события, опубликованные приложением отправителя.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Нажмите ENTER в окне приложения получателя, чтобы прервать работу приложения.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Следующие шаги

См. следующие примеры на сайте GitHub: