Usar o Java Message Service 1.1 com o Azure Service Bus Standard e AMQP 1.0

Aviso

Este artigo atende ao suporte limitado para a API do Java Message Service (JMS) 1.1 e existe apenas para a camada padrão do Barramento de Serviço do Azure.

O suporte completo para a API do Java Message Service 2.0 está disponível apenas na camada premium do Azure Service Bus. Recomendamos que você use essa camada.

Este artigo explica como usar os recursos de mensagens do Service Bus de aplicativos Java usando o popular padrão de API JMS. Esses recursos de mensagens incluem filas e publicação ou assinatura de tópicos. Um artigo complementar explica como fazer o mesmo usando a API .NET do Barramento de Serviço do Azure. Você pode usar esses dois artigos juntos para aprender sobre mensagens entre plataformas usando o Advanced Message Queuing Protocol (AMQP) 1.0.

O AMQP 1.0 é um protocolo de mensagens eficiente, confiável e de nível de cabo que você pode usar para criar aplicativos de mensagens robustos e multiplataforma.

O suporte para AMQP 1.0 no Service Bus significa que você pode usar o enfileiramento e publicar ou assinar recursos de mensagens intermediadas de uma variedade de plataformas usando um protocolo binário eficiente. Você também pode criar aplicativos compostos por componentes criados usando uma combinação de linguagens, estruturas e sistemas operacionais.

Introdução ao Service Bus

Este artigo pressupõe que você já tenha um namespace do Service Bus que contém uma fila chamada basicqueue. Caso contrário, você pode criar o namespace e a fila usando o portal do Azure. Para obter mais informações sobre como criar namespaces e filas do Service Bus, consulte Introdução às filas do Service Bus.

Nota

Filas particionadas e tópicos também suportam AMQP. Para obter mais informações, consulte Entidades de mensagens particionadas e Suporte AMQP 1.0 para filas e tópicos particionados do Service Bus.

Baixe a biblioteca de cliente AMQP 1.0 JMS

Para obter informações sobre onde baixar a versão mais recente da biblioteca de cliente Apache Qpid JMS AMQP 1.0, consulte o site de download do Apache Qpid.

Você deve adicionar os seguintes arquivos JAR do arquivo de distribuição Apache Qpid JMS AMQP 1.0 à variável de ambiente Java CLASSPATH ao criar e executar aplicativos JMS com o Service Bus:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[versão].jar

Nota

Os nomes e versões JMS JAR podem ter sido alterados. Para obter mais informações, consulte Qpid JMS AMQP 1.0.

Aplicações Java de código

Interface de nomenclatura e diretório Java

O JMS usa o Java Naming and Directory Interface (JNDI) para criar uma separação entre nomes lógicos e nomes físicos. Dois tipos de objetos JMS são resolvidos usando JNDI: ConnectionFactory e Destination. O JNDI usa um modelo de provedor no qual você pode conectar diferentes serviços de diretório para lidar com tarefas de resolução de nomes. A biblioteca Apache Qpid JMS AMQP 1.0 vem com um provedor JNDI baseado em arquivo de propriedade simples que é configurado usando um arquivo de propriedades do seguinte formato:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

Configurar o contexto JNDI e configurar o objeto ConnectionFactory

A cadeia de conexão referenciada é a disponível nas Políticas de Acesso Compartilhado no portal do Azure em Cadeia de Conexão Primária.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Configurar filas de destino de produtor e consumidor

A entrada usada para definir um destino no provedor JNDI do arquivo de propriedades Qpid é do seguinte formato.

Para criar uma fila de destino para o produtor:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

Para criar uma fila de destino para o consumidor:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

Escrever o aplicativo JMS

Nenhuma API ou opção especial é necessária quando você usa JMS com Service Bus. Existem algumas restrições que serão abordadas mais tarde. Como em qualquer aplicativo JMS, a primeira coisa necessária é a configuração do ambiente JNDI para poder resolver um objeto e destinos ConnectionFactory .

Configurar o objeto JNDI InitialContext

O ambiente JNDI é configurado ao transmitir uma tabela hash de informações de configuração para o construtor da classe javax.naming.InitialContext. Os dois elementos necessários na tabela hash são o nome da classe da Fábrica de Contexto Inicial e o URL do fornecedor. O código a seguir mostra como configurar o ambiente JNDI para usar o provedor JNDI baseado em arquivo de propriedades Qpid com um arquivo de propriedades chamado servicebus.properties.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Um aplicativo JMS simples que usa uma fila do Service Bus

O programa de exemplo a seguir envia mensagens de texto JMS para uma fila do Service Bus com o nome lógico JNDI de QUEUE e recebe as mensagens de volta.

Você pode acessar todas as informações de código-fonte e configuração do início rápido da fila JMS de exemplos do Barramento de Serviço do Azure.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

Executar a aplicação

Passe a Cadeia de Conexão das Políticas de Acesso Compartilhado para executar o aplicativo. A saída a seguir é do formulário que executa o aplicativo:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

Disposição AMQP e mapeamento da operação do Service Bus

Veja como uma disposição AMQP se traduz em uma operação do Service Bus:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

Tópicos JMS vs. tópicos do Service Bus

O uso de tópicos e assinaturas do Service Bus por meio da API JMS fornece recursos básicos de envio e recebimento. É uma escolha conveniente quando você faz a portabilidade de aplicativos de outros agentes de mensagens com APIs compatíveis com JMS, mesmo que os tópicos do Service Bus sejam diferentes dos tópicos do JMS e exijam alguns ajustes.

Os tópicos do Barramento de Serviço encaminham mensagens para assinaturas nomeadas, compartilhadas e duráveis que são gerenciadas por meio da interface de Gerenciamento de Recursos do Azure, das ferramentas de linha de comando do Azure ou do portal do Azure. Cada assinatura permite até 2.000 regras de seleção, cada uma das quais pode ter uma condição de filtro e, para filtros SQL, também uma ação de transformação de metadados. Cada correspondência de condição de filtro seleciona a mensagem de entrada a ser copiada para a assinatura.

Receber mensagens de assinaturas é idêntico a receber mensagens de filas. Cada assinatura tem uma fila de mensagens mortas associada e a capacidade de encaminhar mensagens automaticamente para outra fila ou tópicos.

Os tópicos JMS permitem que os clientes criem dinamicamente assinantes não duráveis e duráveis que, opcionalmente, permitem filtrar mensagens com seletores de mensagens. Essas entidades não compartilhadas não são suportadas pelo Service Bus. A sintaxe da regra de filtro SQL para o Service Bus é semelhante à sintaxe do seletor de mensagens suportada pelo JMS.

O lado do editor de tópicos JMS é compatível com o Service Bus, conforme mostrado neste exemplo, mas os assinantes dinâmicos não são. As APIs JMS relacionadas à topologia a seguir não são suportadas com o Service Bus.

Recursos e restrições não suportados

As seguintes restrições existem quando você usa JMS sobre AMQP 1.0 com Service Bus, a saber:

  • Apenas um objeto MessageProducer ou MessageConsumer é permitido por sessão. Se você precisar criar vários objetos MessageProducer ou MessageConsumer em um aplicativo, crie uma sessão dedicada para cada um deles.
  • Atualmente, não há suporte para assinaturas de tópicos voláteis.
  • Atualmente, não há suporte para objetos MessageSelector .
  • Não há suporte para transações distribuídas, mas há suporte para sessões transacionadas.

O Service Bus divide o plano de controle do plano de dados, portanto, ele não suporta várias das funções de topologia dinâmica do JMS.

Método não suportado Replace with
createDurableSubscriber Crie uma assinatura de tópico que porte o seletor de mensagens.
criarDurableConsumer Crie uma assinatura de tópico que porte o seletor de mensagens.
criarSharedConsumer Os tópicos do Service Bus são sempre compartilháveis. Consulte a seção "Tópicos JMS vs. tópicos do Service Bus".
createSharedDurableConsumer Os tópicos do Service Bus são sempre compartilháveis. Consulte a seção "Tópicos JMS vs. tópicos do Service Bus".
createTemporaryTopic Crie um tópico por meio da API de gerenciamento, das ferramentas ou do portal com AutoDeleteOnIdle definido para um período de expiração.
createTopic Crie um tópico por meio da API de gerenciamento, das ferramentas ou do portal.
Cancelar inscrição Exclua a API, as ferramentas ou o portal de gerenciamento de tópicos.
createBrowser Não suportado. Use a funcionalidade Peek() da API do Service Bus.
createQueue Crie uma fila por meio da API de gerenciamento, das ferramentas ou do portal.
createTemporaryQueue Crie uma fila por meio da API de gerenciamento, das ferramentas ou do portal com AutoDeleteOnIdle definido para um período de expiração.
receberNoWait Use o método receive() fornecido pelo SDK do Service Bus e especifique um tempo limite muito baixo ou zero.

Resumo

Este artigo mostrou como usar os recursos de mensagens intermediadas pelo Service Bus, como filas e tópicos de publicação ou assinatura, do Java usando a popular API JMS e o AMQP 1.0.

Você também pode usar o Service Bus AMQP 1.0 de outras linguagens, como .NET, C, Python e PHP. Os componentes criados usando esses idiomas diferentes podem trocar mensagens de forma confiável e com total fidelidade usando o suporte AMQP 1.0 no Service Bus.

Próximos passos