Как использовать Java Message Service со Служебной шиной Azure и AMQP 1.0

Предупреждение

Эта статья посвящена описанию ограниченной поддержки API 1.1 Java Message Service (JMS) и применима только к уровню "Стандартный" Служебной шины Azure.

Полная поддержка API 2.0 Java Message Service доступна только для уровня "Премиум" Служебной шины Azure в предварительной версии. Рекомендуется использовать этот уровень.

В этой статье описано, как использовать функции обмена сообщениями через Служебную шину из приложений Java, использующих популярный стандарт API JMS. Эти функции обмена сообщениями включают очереди и публикацию разделов или подписку на них. В связанной статье объясняется, как выполнить те же действия, используя API .NET Служебной шины Azure. Вы можете использовать эти две статьи вместе, чтобы узнать об обмене сообщениями между различными платформами с помощью Расширенного протокола управления очередью сообщений (AMQP) 1.0.

AMQP 1.0 — это эффективный и надежный сетевой протокол, который можно использовать для создания надежных межплатформенных приложений для обмена сообщениями.

Поддержка AMQP 1.0 в Служебной шине означает, что с помощью эффективного двоичного протокола можно на различных платформах использовать возможности очередей и обмена сообщениями с публикацией или подпиской через брокер. Кроме того, можно создавать приложения, содержащие компоненты, созданные с использованием разных языков, платформ и операционных систем.

Приступая к работе со служебной шиной

В этой статье предполагается, что вы уже создали пространство имен Служебной шины, содержащее очередь с именем basicqueue. Если это не так, создайте пространство имен и очередь на портале Azure. Дополнительные сведения о создании пространства имен и очередей служебной шины см. в статье Приступая к работе с очередями служебной шины.

Примечание

Секционированные очереди и разделы также поддерживают AMQP. Дополнительные сведения см. в статьях Секционированные сущности обмена сообщениями и Поддержка AMQP 1.0 для секционированных очередей и разделов служебной шины.

Загрузка клиентской библиотеки JMS AMQP 1.0

Сведения о том, где скачать последнюю версию клиентской библиотеки Apache Qpid JMS AMQP 1.0, см. на сайте загрузки Apache Qpid.

При создании и запуске приложений JMS с использованием Служебной шины необходимо добавить следующие JAR-файлы из архива распространения Apache Qpid JMS AMQP 1.0 в переменную среды Java CLASSPATH:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jar

Примечание

Имена и версии JMS JAR могли измениться. Дополнительные сведения см. в разделе Qpid JMS AMQP 1.0.

Создание приложений Java

Java Naming and Directory Interface

JMS использует интерфейс JNDI для разделения логических и физических имен. С помощью JNDI разрешаются два типа объектов JMS: ConnectionFactory и Destination. JNDI использует модель поставщика, к которой можно подключить различные службы каталогов для обработки заданий разрешения имен. Библиотека Apache Qpid JMS AMQP 1.0 поставляется с простым файловым поставщиком JNDI, настроенным с помощью файла свойств в следующем формате:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

Установка контекста JNDI и настройка объекта ConnectionFactory

Указанная строка подключения доступна в политике общего доступа на портале Azure в разделе Основная строка подключения.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Настройка очереди назначения производителя и потребителя

Эта запись используется для определения назначения в поставщике JNDI файла свойств Qpid в приведенном ниже формате.

Чтобы создать очередь назначения для производителя:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

Чтобы создать очередь назначения для потребителя:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

Написание приложения JMS

Для использования JMS со Служебной шиной не нужны специальные API или параметры. Но существует несколько ограничений, которые будут рассмотрены ниже. Как и для любого приложения JMS, первое, что необходимо, — это конфигурация среды JNDI, позволяющая разрешать объект ConnectionFactory и назначения.

Настройка объекта JNDI InitialContext

Для настройки среды JNDI в конструктор класса javax.naming.InitialContext передается хэш-таблица со сведениями о конфигурации. Два обязательных элемента в хэш-таблице — это имя класса фабрики исходного контекста и URL-адрес поставщика. В следующем примере кода показано, как настроить среду JNDI для использования поставщика JNDI на основе файла свойств Qpid с именем servicebus.properties.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Простое приложение JMS, использующее очередь Служебной шины

Следующий пример программы отправляет текстовые сообщения JMS в очередь Служебной шины с логическим именем JNDI QUEUE и получает ответные сообщения.

Вы можете получить доступ ко всем исходным кодам и сведениям о конфигурации в кратком руководстве по очереди JMS примеров Служебной шины Azure.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

Выполнение приложения

Передайте Строку подключения из политики совместного доступа для запуска приложения. В результате запуска приложения выводятся следующие данные:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

Расположение AMQP и сопоставление операции служебной шины

Вот как расположение AMQP преобразовывается в операцию Служебной шины:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

Разделы JMS и разделы Служебной шины

Использование разделов и подписок Служебной шины с помощью API JMS обеспечивает базовые возможности отправки и получения. Это удобный вариант при переносе приложений из других брокеров сообщений в API, совместимые с JMS, даже если разделы Служебной шины отличаются от разделов JMS и требует нескольких корректировок.

Разделы Служебной шины направляют сообщения в именованные, общие и устойчивые подписки, управляемые через интерфейс управления ресурсами Azure, программы командной строки Azure или портал Azure. Каждая подписка позволяет использовать до 2000 правил выбора, каждое из которых может иметь условие фильтра, а для фильтров SQL — также действие преобразования метаданных. Для каждого соответствия условию фильтра выбирается входное сообщение, которое нужно скопировать в подписку.

Получение сообщений из подписок идентично приему сообщений из очередей. C каждой подпиской связана очередь недоставленных сообщений и возможность автоматически пересылать сообщения в другую очередь или раздел.

Разделы JMS позволяют клиентам динамически создавать неустойчивые и устойчивые подписчики, которые при необходимости позволяют фильтровать сообщения с помощью селекторов сообщений. Служебная шина не поддерживает эти сущности без общего доступа. Синтаксис правил фильтрации SQL для Служебной шины аналогичен синтаксису селектора сообщений, поддерживаемому JMS.

Сторона издателя раздела JMS совместима со Служебной шиной, как показано в этом примере, но динамические подписчики — нет. В Служебной шине не поддерживаются следующие API JMS, связанные с топологией.

Неподдерживаемые возможности и ограничения

При использовании JMS по протоколу AMQP 1.0 с Сервисной шиной действуют следующие ограничения:

  • Для одного сеанса допускается только один объект MessageProducer или MessageConsumer. Если нужно создать несколько объектов MessageProducer или MessageConsumer в приложении, создайте отдельный сеанс для каждого из них.
  • Временные подписки раздела сейчас не поддерживаются.
  • Объекты MessageSelector сейчас не поддерживаются.
  • Распределенные транзакции не поддерживаются, но поддерживаются сеансы транзакций.

Служебная шина разделяет плоскость управления от плоскости данных, поэтому не поддерживает несколько функций динамической топологии JMS.

Неподдерживаемый метод Заменить на
createDurableSubscriber Создание подписки на раздел для переноса селектора сообщений.
createDurableConsumer Создание подписки на раздел для переноса селектора сообщений.
createSharedConsumer Разделами Служебной шины всегда можно поделиться. См. раздел "Разделы JMS и разделы Служебной шины".
createSharedDurableConsumer Разделами Служебной шины всегда можно поделиться. См. раздел "Разделы JMS и разделы Служебной шины".
createTemporaryTopic Создание раздела с помощью API управления, инструментов или портала с параметром AutoDeleteOnIdle, установленным на срок действия.
createTopic Создание раздела с помощью API управления, инструментов или портала.
unsubscribe Удаление раздела API управления, инструментов или портала.
createBrowser Не поддерживается. Используйте функции Peek() API Служебной шины.
createQueue Создание очереди с помощью API управления, инструментов или портала.
createTemporaryQueue Создание очереди с помощью API управления, инструментов или портала с параметром AutoDeleteOnIdle, установленным на срок действия.
receiveNoWait Используйте метод receive(), предоставленный пакетом SDK Служебной шины, и укажите очень низкое или нулевое значение времени ожидания.

Сводка

В этой статье показано использование функций обмена сообщениями, выполняемых посредством Служебной шины, например очередей и разделов публикации или подписки, из Java с использованием популярного API JMS и протокола AMQP 1.0.

Протокол AMQP 1.0 Служебной шины можно также использовать из других языков, таких как .NET, C, Python и PHP. Компоненты, созданные с помощью этих языков, могут надежно и точно обмениваться сообщениями, используя AMQP 1.0 в Служебной шине.

Дальнейшие действия