Использование очередей Служебной шины Azure с Java для отправки и получения сообщений (старый пакет)

В этой статье показано, как создавать приложения Java для отправки и получения сообщений через очередь Служебной шины Azure.

Предупреждение

В этой статье используются старые пакеты azure-servicebus. Сведения об использовании нового пакета azure-messaging-servicebus см. в статье Отправка и получение сообщений с помощью пакетаazure-messaging-servicebus.

Предварительные требования

  1. Подписка Azure. Для выполнения инструкций из этой статьи требуется учетная запись Azure. Вы можете активировать преимущества подписчика MSDN или зарегистрироваться для получения бесплатной учетной записи.
  2. Если у вас нет подходящей очереди служебной шины, создайте ее с помощью портала Azure.
    1. Ознакомьтесь с общими сведениями об очередях Служебной шины.
    2. Создайте пространство имен Служебной шины.
    3. Получите строку подключения.
    4. Создайте очередь Служебной шины.
  3. Установите пакет Azure SDK дляJava.

Настройка приложения для использования служебной шины

Перед созданием этого образца убедитесь, что вы установили пакет Azure SDK для Java.

При использовании Eclipse можно установить набор средств Azure для Eclipse, включающий в себя пакет Azure SDK для Java. Затем можно добавить библиотеки Microsoft Azure для Java в проект. Если вы используете IntelliJ, см. статью Установка набора средств Azure для IntelliJ.

Добавление библиотек Microsoft Azure для Java в проект Eclipse

Добавьте в начало Java-файла следующие инструкции import:

// Include the following imports to use Service Bus APIs
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.google.gson.Gson;

import static java.nio.charset.StandardCharsets.*;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;

import org.apache.commons.cli.*;

Отправка сообщений в очередь

Для отправки сообщений в очередь служебной шины приложение создает объект QueueClient и асинхронно отправляет сообщения. Ниже показан код по отправке сообщения в очередь, созданную на портале.

public void run() throws Exception {
    // Create a QueueClient instance and then asynchronously send messages.
    // Close the sender once the send operation is complete.
    QueueClient sendClient = new QueueClient(new ConnectionStringBuilder(ConnectionString, QueueName), ReceiveMode.PEEKLOCK);
    this.sendMessageAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync());

    sendClient.close();
}

    CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {
        List<HashMap<String, String>> data =
                GSON.fromJson(
                        "[" +
                                "{'name' = 'Einstein', 'firstName' = 'Albert'}," +
                                "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
                                "{'name' = 'Curie', 'firstName' = 'Marie'}," +
                                "{'name' = 'Hawking', 'firstName' = 'Steven'}," +
                                "{'name' = 'Newton', 'firstName' = 'Isaac'}," +
                                "{'name' = 'Bohr', 'firstName' = 'Niels'}," +
                                "{'name' = 'Faraday', 'firstName' = 'Michael'}," +
                                "{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
                                "{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
                                "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
                                "]",
                        new TypeToken<List<HashMap<String, String>>>() {}.getType());

        List<CompletableFuture> tasks = new ArrayList<>();
        for (int i = 0; i < data.size(); i++) {
            final String messageId = Integer.toString(i);
            Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
            message.setContentType("application/json");
            message.setLabel("Scientist");
            message.setMessageId(messageId);
            message.setTimeToLive(Duration.ofMinutes(2));
            System.out.printf("\nMessage sending: Id = %s", message.getMessageId());
            tasks.add(
                    sendClient.sendAsync(message).thenRunAsync(() -> {
                        System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId());
                    }));
        }
        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
    }

Сообщения, отправляемые в очереди служебной шины и получаемые из них, представляют собой экземпляры класса Message. Объекты Message обладают набором стандартных свойств (таких как Label и TimeToLive), словарем, в котором хранятся зависящие от приложения пользовательские свойства, и основным набором произвольных данных приложения. Приложение может задать текст сообщения, передав любой сериализуемый объект конструктору объекта Message, после чего для сериализации объекта будет использоваться соответствующий сериализатор. Вы также можете указать объект java.IO.InputStream.

служебная шина очереди поддерживают максимальный размер сообщения 256 кб на уровне Standard и 100 мб на уровне Premium. Максимальный размер заголовка, который содержит стандартные и настраиваемые свойства приложения, — 64 КБ. Ограничения на количество сообщений в очереди нет, но есть максимальный общий размер сообщений, содержащихся в очереди. Этот размер очереди, определяемый в момент ее создания, не должен превышать 5 ГБ.

Получение сообщений из очереди

Самым простым способом получения сообщений из очереди является использование объекта ServiceBusContract. Полученные сообщения могут работать в двух различных режимах: ReceiveAndDelete и PeekLock.

В режиме ReceiveAndDelete получение является одиночной операцией, т. е. когда служебная шина получает запрос на чтение для сообщения в очереди, сообщение помечается как использованное и возвращается в приложение. Режим ReceiveAndDelete (режим по умолчанию) представляет собой самую простую модель, которая лучше всего работает для сценариев, в которых приложение может не обрабатывать сообщение в случае сбоя. Чтобы это понять, рассмотрим сценарий, в котором объект-получатель выдает запрос на получение и выходит из строя до его обработки. Так как служебная шина отмечает сообщение как использованное, перезапущенное приложение, начав снова использовать сообщения, пропустит сообщение, использованное до аварийного завершения работы.

В режиме PeekLock процесс получения становится двухэтапной операцией, что позволяет поддерживать приложения, неустойчивые к пропуску сообщений. Получив запрос, служебная шина находит следующее сообщение, блокирует его, чтобы предотвратить его получение другими получателями, и возвращает его приложению. Когда приложение завершает обработку сообщения (или надежно сохраняет его для последующей обработки), оно завершает второй этап процесса получения, вызывая метод complete() для полученного сообщения. Когда служебная шина обнаруживает вызов метода complete() , она помечает сообщение как использованное и удаляет его из очереди.

В следующем примере показано, как получать и обрабатывать сообщения с помощью режима PeekLock (не используется по умолчанию). В приведенном ниже примере используется модель обратного вызова с зарегистрированным обработчиком сообщений и показан процесс обработки сообщений по мере их поступления в TestQueue. Этот режим автоматически вызывает метод complete() , если обратный вызов возвращается без ошибок, или метод abandon() , если обратный вызов возвращает исключение.

    public void run() throws Exception {
        // Create a QueueClient instance for receiving using the connection string builder
        // We set the receive mode to "PeekLock", meaning the message is delivered
        // under a lock and must be acknowledged ("completed") to be removed from the queue
        QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(ConnectionString, QueueName), ReceiveMode.PEEKLOCK);
        this.registerReceiver(receiveClient);

        // shut down receiver to close the receive loop
        receiveClient.close();
    }
    void registerReceiver(QueueClient queueClient) throws Exception {
        // register the RegisterMessageHandler callback
        queueClient.registerMessageHandler(new IMessageHandler() {
            // callback invoked when the message handler loop has obtained a message
            public CompletableFuture<Void> onMessageAsync(IMessage message) {
            // receives message is passed to callback
                if (message.getLabel() != null &&
                    message.getContentType() != null &&
                    message.getLabel().contentEquals("Scientist") &&
                    message.getContentType().contentEquals("application/json")) {

                        byte[] body = message.getBody();
                        Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);

                        System.out.printf(
                            "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
                            "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\",  \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
                            message.getMessageId(),
                            message.getSequenceNumber(),
                            message.getEnqueuedTimeUtc(),
                            message.getExpiresAtUtc(),
                            message.getContentType(),
                            scientist != null ? scientist.get("firstName") : "",
                            scientist != null ? scientist.get("name") : "");
                    }
                    return CompletableFuture.completedFuture(null);
                }

                // callback invoked when the message handler has an exception to report
                public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                    System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                }
        },
        // 1 concurrent call, messages are auto-completed, auto-renew duration
        new MessageHandlerOptions(1, true, Duration.ofMinutes(1)));
    }

Как обрабатывать сбои приложения и нечитаемые сообщения

служебная шина предоставляет функции, помогающие корректно выполнить восстановление после ошибок в приложении или трудностей, возникших при обработке сообщения. Если приложение получателя не может обработать сообщение по какой-либо причине, оно может вызвать метод abandon() в объекте клиента с маркером блокировки сообщения, полученным с помощью getLockToken() . После этого служебная шина разблокирует сообщение в очереди и сделает его доступным для приема тем же или другим приложением-пользователем.

Кроме того, с сообщением, блокированным в очереди, связано время ожидания. Если приложение не сможет обработать сообщение в течение времени ожидания (например, при сбое приложения), служебная шина автоматически разблокирует сообщение и снова сделает его доступным для получения.

Если сбой приложения происходит после обработки сообщения, но перед отправкой запроса complete() , такое сообщение будет повторно доставлено в приложение после перезапуска. Часто такой подход называют Обработать хотя бы один раз, т. е. каждое сообщение будет обрабатываться по крайней мере один раз, но в некоторых случаях это же сообщение может быть доставлено повторно. Если повторная обработка недопустима, разработчики приложения должны добавить дополнительную логику для обработки повторной доставки сообщений. Часто это достигается с помощью метода getMessageId сообщения, которое остается постоянным для различных попыток доставки.

Примечание

Вы можете управлять ресурсами служебной шины с помощью обозревателя служебной шины. Обозреватель служебной шины позволяет без труда подключаться к пространству имен служебной шины и управлять сущностями обмена сообщениями. Средство предоставляет дополнительные возможности, например функции импорта и экспорта или возможность проверять разделы, очереди, подписки, службы ретрансляции, центры уведомлений и концентраторы событий.

Next Steps

Примеры для Java можно найти на сайте GitHub в репозитории azure-service-bus.