Usar a API do Serviço de Mensagens do Java 1.1 com o padrão do Barramento de Serviço do Azure e AMQP 1.0

Aviso

Este artigo atende ao suporte limitado para a API do Serviço de Mensagem do Java (JMS) 1.1 e existe apenas para a camada standard do barramento de serviço do Azure.

O suporte completo para a API 2.0 do Serviço de Mensagens Java está disponível somente na camada premium do Barramento de Serviço do Azure. Recomendamos o uso dessa camada.

Este artigo explica como usar os recursos de sistema de mensagens do Barramento de Serviço de aplicativos Java usando o popular padrão de API do JMS. Esses recursos de mensagens incluem filas e publicação ou assinatura de tópicos. Existe um artigo complementar que explica como fazer o mesmo usando a API do Barramento de Serviço do .NET. Você pode usar esses dois artigos juntos para saber mais sobre mensagens de plataforma cruzada usando o Advanced Message Queuing Protocol (AMQP) 1.0.

O AMQP 1.0 é um protocolo de mensagens eficiente, confiável e em nível de transmissão que pode ser usado para criar aplicativos de mensagens robustos entre plataformas.

O suporte para o AMQP 1.0 no Barramento de Serviço significa que você pode usar o enfileiramento e publicar ou assinar os recursos de mensagens agenciadas a partir de uma variedade de plataformas usando um protocolo binário eficiente. Você também pode criar aplicativos compostos de componentes criados com o uso de uma mistura de linguagens, estruturas e sistemas operacionais.

Introdução ao Barramento de serviço

Este artigo presume que você já tenha um namespace no Barramento de Serviço que contém uma fila denominada basicqueue. Caso contrário, você pode criar o namespace e a fila usando o portal do Azure. Para obter mais informações sobre como criar namespaces e filas do Barramento de Serviço, consulte Introdução às filas do Barramento de Serviço.

Observação

Filas e tópicos particionados também dão suporte ao AMQP. Para saber mais, confira Entidades de mensagens particionadas e Suporte a AMQP 1.0 para filas e tópicos particionados do Barramento de Serviço.

Baixar a biblioteca de cliente do JMS do AMQP 1.0

Para obter informações sobre onde baixar a versão mais recente da biblioteca do cliente Apache Qpid JMS do AMQP 1.0, acesse o site do Apache Qpid.

Você deve adicionar os seguintes arquivos JAR do arquivamento de distribuição do Apache Qpid JMS do AMQP 1.0 à variável de ambiente do CLASSPATH do Java ao criar e executar aplicativos do JMS com o Barramento de Serviço:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jar

Observação

Os nomes e versões do JMS JAR podem ter mudado. Para saber mais, confira Qpid JMS do AMQP 1.0.

Codificar aplicativos Java

Nomenclatura e interface de diretório do Java

O JMS usa a Java Naming and Directory Interface (JNDI) para criar uma separação entre nomes lógicos e físicos. Dois tipos de objetos JMS são resolvidos usando a JNDI: ConnectionFactory e Destino. A JNDI usa um modelo de provedor no qual você pode conectar diferentes serviços de diretório para lidar com tarefas de resolução de nome. A biblioteca Apache Qpid JMS do AMQP 1.0 vem com um Provedor JNDI simples baseado em arquivo de propriedades que é configurado usando um arquivo de propriedades no seguinte formato:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

Configurar o contexto JNDI e configurar o objeto ConnectionFactory

A cadeia de conexão referenciada é a que está nas Políticas de acesso compartilhado no portal do Azure em Cadeia de conexão primária.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Configurar as filas de destino do produtor e do consumidor

A entrada usada para definir um destino no provedor JNDI do arquivo de propriedades do Qpid tem o seguinte formato.

Para criar uma fila de destino para o produtor:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

Para criar a fila de destino para o consumidor:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

Escrever o aplicativo JMS

Não existem APIs ou opções especiais obrigatórias ao usar o JMS com o Barramento de Serviço. Existem algumas restrições que serão abordadas posteriormente. Da mesma forma que ocorre com qualquer aplicativo JMS, o primeiro item necessário é a configuração do ambiente JNDI, para ser capaz de resolver um objeto ConnectionFactory e destinos.

Configurar o objeto InitialContext do JNDI

O ambiente JNDI é configurado por meio da transmissão de uma tabela de hash com informações de configuração para o construtor da classe javax.naming.InitialContext. Os dois elementos necessários na tabela de hash são o nome da classe da Fábrica de Contexto inicial e a URL do provedor. O código a seguir mostra como configurar o ambiente JNDI para usar o provedor JNDI com base em arquivo de propriedades do Qpid com um arquivo de propriedades chamado servicebus.properties.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Um aplicativo JMS simples que usa uma fila do Barramento de Serviço

O programa de exemplo a seguir envia mensagens de texto do JMS para uma fila do Barramento de Serviço com o nome lógico do JNDI da fila e recebe as mensagens de volta.

Você pode acessar todos os as código-fonte e informações de configuração e o Início Rápido de Fila do JMS das amostras do Barramento de Serviço do Microsoft Azure.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

Executar o aplicativo

Passe a Cadeia de Conexão das políticas de acesso compartilhado para executar o aplicativo. A saída a seguir é do formulário que executa o aplicativo:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

Disposição do Advanced Message Queuing Protocol e mapeamento da operação de Barramento de Serviço

Veja como uma disposição AMQP é convertida em uma operação do Barramento de Serviço:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

Tópicos sobre JMS vs. tópicos sobre Barramento de Serviço

O uso de tópicos e assinaturas do Barramento de Serviço por meio da API do JMS fornece recursos básicos de envio e recebimento. É uma opção conveniente quando você porta aplicativos de outros agentes de mensagens com APIs compatíveis com JMS, mesmo que os tópicos do Barramento de Serviço sejam diferentes dos tópicos do JMS e exijam alguns ajustes.

Os tópicos do Barramento de Serviço roteiam mensagens para assinaturas nomeadas, compartilhadas e duráveis que são gerenciadas por meio da interface de gerenciamento de recursos do Azure, das ferramentas de linha de comando do Azure ou da portal do Azure. Cada assinatura permite até 2 mil regras de seleção. Cada uma delas pode ter uma condição de filtro e, para filtros SQL, também uma ação de transformação de metadados. Cada correspondência de condição de filtro seleciona a mensagem de entrada a ser copiada na assinatura.

O recebimento de mensagens de assinaturas é idêntico ao recebimento de mensagens de filas. Cada assinatura tem uma fila de mensagens mortas associada e a capacidade de encaminhar mensagens automaticamente para outra fila ou tópicos.

Os tópicos do JMS permitem que os clientes criem de maneira dinâmica assinantes não duráveis e duráveis, que também permitem filtrar mensagens com seletores de mensagem. Essas entidades não compartilhadas não têm suporte do Barramento de Serviço. A sintaxe da regra de filtro SQL para o Barramento de Serviço é semelhante à sintaxe do seletor de mensagem compatível com JMS.

O lado do publicador do tópico JMS é compatível com o Barramento de Serviço, conforme mostrado neste exemplo, mas os assinantes dinâmicos não são. Não há suporte para as seguintes APIs do JMS relacionadas à topologia com o Barramento de Serviço.

Restrições e recursos não suportados

As restrições a seguir ocorrem durante o uso do JMS no AMQP 1.0 com o Barramento de Serviço, como:

  • Apenas um MessageProducer ou um MessageConsumer é permitido por sessão. Se precisar criar vários objetos MessageProducers ou MessageConsumers em um aplicativo, crie uma sessão dedicada para cada um deles.
  • Assinaturas de tópico voláteis não são suportadas no momento.
  • Atualmente, não há suporte para objetos MessageSelector.
  • As transações distribuídas não têm suporte, mas há suporte para sessões transacionadas.

O Barramento de Serviço divide o plano de controle do plano de dados. Então, ele não é compatível com várias funções de topologia dinâmica do JMS.

Método sem suporte Substitua por
createDurableSubscriber Crie uma assinatura de tópico que porta o seletor de mensagem.
createDurableConsumer Crie uma assinatura de tópico que porta o seletor de mensagem.
createSharedConsumer Os tópicos do Barramento de Serviço sempre podem ser compartilhados. Consulte a seção "Tópicos de JMS vs. Tópicos do Barramento de Serviço".
createSharedDurableConsumer Os tópicos do Barramento de Serviço sempre podem ser compartilhados. Consulte a seção "Tópicos de JMS vs. Tópicos do Barramento de Serviço".
createTemporaryTopic Crie um tópico por meio do gerenciamento da API, das ferramentas ou do portal com AutoDeleteOnIdle definido como um período de expiração.
createTopic Crie um tópico por meio da API de gerenciamento, das ferramentas ou do Portal.
cancelar assinatura Exclua a API, as ferramentas ou o portal de gerenciamento de tópico.
createBrowser Sem suporte: Usar a funcionalidade de Peek() da API do Barramento de Serviço.
createQueue Crie um tópico por meio da API de gerenciamento, das ferramentas ou do Portal.
createTemporaryQueue Crie uma fila por meio do gerenciamento da API, das ferramentas ou do portal com AutoDeleteOnIdle definido como um período de expiração.
receiveNoWait Use o método Receive() fornecido pelo SDK do Barramento de Serviço e especifique um tempo limite muito baixo ou zero.

Resumo

Este artigo mostrou como usar os recursos de mensagens orientadas do Barramento de Serviço, como filas e tópicos de publicação ou assinatura, do Java usando a API do JMS popular e o AMQP 1.0.

Você também pode usar o AMQP 1.0 do Barramento de Serviço de outras linguagens, incluindo .NET, C, Python e PHP. Os componentes criados com essas diferentes linguagens podem trocar mensagens de forma confiável e com total fidelidade usando o suporte do AMQP 1.0 no Barramento de Serviço.

Próximas etapas