Używanie usługi Java Message Service 1.1 z usługą Azure Service Bus w warstwie Standardowa i AMQP 1.0

Ostrzeżenie

Ten artykuł obsługuje ograniczoną obsługę interfejsu API usługi Java Message Service (JMS) 1.1 i istnieje tylko dla warstwy Standardowa usługi Azure Service Bus.

Pełna obsługa interfejsu API usługi Java Message Service 2.0 jest dostępna tylko w warstwie Premium usługi Azure Service Bus. Zalecamy użycie tej warstwy.

W tym artykule wyjaśniono, jak używać funkcji obsługi komunikatów usługi Service Bus z aplikacji Java przy użyciu popularnego standardu interfejsu API JMS. Te funkcje obsługi komunikatów obejmują kolejki i publikowanie lub subskrybowanie tematów. W artykule towarzyszącym wyjaśniono, jak to zrobić przy użyciu interfejsu API .NET usługi Azure Service Bus. Te dwa artykuły umożliwiają zapoznanie się z obsługą komunikatów międzyplatformowych przy użyciu protokołu Advanced Message Queuing Protocol (AMQP) 1.0.

AmQP 1.0 to wydajny, niezawodny protokół obsługi komunikatów na poziomie przewodu, którego można użyć do tworzenia niezawodnych aplikacji do obsługi komunikatów międzyplatformowych.

Obsługa protokołu AMQP 1.0 w usłudze Service Bus oznacza, że można używać funkcji kolejkowania i publikowania lub subskrybowania komunikatów obsługiwanych przez brokera z wielu platform przy użyciu wydajnego protokołu binarnego. Można również tworzyć aplikacje składające się ze składników utworzonych przy użyciu różnych języków, struktur i systemów operacyjnych.

Rozpoczynanie pracy z usługą Service Bus

W tym artykule założono, że masz już przestrzeń nazw usługi Service Bus zawierającą kolejkę o nazwie basicqueue. Jeśli tego nie zrobisz, możesz utworzyć przestrzeń nazw i kolejkę przy użyciu witryny Azure Portal. Aby uzyskać więcej informacji na temat tworzenia przestrzeni nazw i kolejek usługi Service Bus, zobacz Wprowadzenie do kolejek usługi Service Bus.

Uwaga

Partycjonowane kolejki i tematy obsługują również protokół AMQP. Aby uzyskać więcej informacji, zobacz Partitioned messaging entities and AMQP 1.0 support for Service Bus partitioned queues and topics (Partycjonowane jednostki komunikatów i obsługa protokołu AMQP 1.0 dla partycjonowanych kolejek i tematów usługi Service Bus).

Pobieranie biblioteki klienta amQP 1.0 JMS

Aby uzyskać informacje o tym, gdzie pobrać najnowszą wersję biblioteki klienta apache Qpid JMS AMQP 1.0, zobacz witrynę pobierania apache Qpid.

Podczas kompilowania i uruchamiania aplikacji JMS za pomocą usługi Service Bus należy dodać następujące pliki JAR z archiwum dystrybucji JMS AMQP 1.0 języka Java do zmiennej środowiskowej JAVA CLASSPATH:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jar

Uwaga

Nazwy jar i wersje pakietu JMS mogły ulec zmianie. Aby uzyskać więcej informacji, zobacz Qpid JMS AMQP 1.0.

Kod aplikacji Java

Interfejs nazewnictwa języka Java i katalogu

Program JMS używa interfejsu Java Naming i Directory Interface (JNDI), aby utworzyć separację między nazwami logicznymi i nazwami fizycznymi. Dwa typy obiektów JMS są rozpoznawane przy użyciu JNDI: Połączenie ionFactory i Destination. JNDI używa modelu dostawcy, w którym można podłączyć różne usługi katalogowe do obsługi zadań rozpoznawania nazw. Biblioteka JMS AMQP 1.0 systemu Apache Qpid jest dostarczana z prostym dostawcą JNDI opartym na plikach właściwości skonfigurowanym przy użyciu pliku właściwości w następującym formacie:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

Konfigurowanie kontekstu JNDI i konfigurowanie obiektu Połączenie ionFactory

Odwołanie parametry połączenia jest dostępne w zasadach dostępu współdzielonego w witrynie Azure Portal w obszarze Podstawowy ciąg Połączenie ion.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Konfigurowanie kolejek docelowych producentów i konsumentów

Wpis używany do definiowania miejsca docelowego w pliku właściwości Qpid dostawcy JNDI ma następujący format.

Aby utworzyć kolejkę docelową dla producenta:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

Aby utworzyć kolejkę docelową dla użytkownika:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

Pisanie aplikacji JMS

W przypadku korzystania z programu JMS z usługą Service Bus nie są wymagane żadne specjalne interfejsy API ani opcje. Istnieje kilka ograniczeń, które zostaną omówione później. Podobnie jak w przypadku dowolnej aplikacji JMS, pierwszą rzeczą wymaganą jest konfiguracja środowiska JNDI, aby móc rozpoznać obiekt i miejsca docelowe Połączenie ionFactory.

Konfigurowanie obiektu InitialContext JNDI

Środowisko JNDI jest konfigurowane przez przekazanie tabeli skrótów informacji o konfiguracji do konstruktora klasy javax.naming.InitialContext. Dwa wymagane elementy w tabeli skrótów to nazwa klasy początkowej fabryki kontekstu i adres URL dostawcy. Poniższy kod pokazuje, jak skonfigurować środowisko JNDI do używania dostawcy JNDI opartego na plikach właściwości Qpid z plikiem właściwości o nazwie servicebus.properties.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Prosta aplikacja JMS korzystająca z kolejki usługi Service Bus

Poniższy przykładowy program wysyła komunikaty tekstowe JMS do kolejki usługi Service Bus z logiczną nazwą kolejki JNDI i odbiera komunikaty z powrotem.

Dostęp do wszystkich informacji o kodzie źródłowym i konfiguracji można uzyskać w przewodniku Szybki start z przykładami usługi Azure Service Bus kolejki JMS.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

Uruchamianie aplikacji

Przekaż ciąg Połączenie ion z zasad dostępu współdzielonego, aby uruchomić aplikację. Następujące dane wyjściowe to formularz z uruchomioną aplikacją:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

Mapowanie dyspozycji protokołu AMQP i operacji usługi Service Bus

Oto jak dyspozycja protokołu AMQP przekłada się na operację usługi Service Bus:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

Tematy JMS a tematy usługi Service Bus

Korzystanie z tematów i subskrypcji usługi Service Bus za pośrednictwem interfejsu API JMS zapewnia podstawowe możliwości wysyłania i odbierania. Jest to wygodny wybór w przypadku przenoszenia aplikacji z innych brokerów komunikatów z interfejsami API zgodnymi ze standardem JMS, mimo że tematy usługi Service Bus różnią się od tematów JMS i wymagają kilku korekt.

Tematy usługi Service Bus kierują komunikaty do nazwanych, udostępnionych i trwałych subskrypcji zarządzanych za pomocą interfejsu zarządzania zasobami platformy Azure, narzędzi wiersza polecenia platformy Azure lub witryny Azure Portal. Każda subskrypcja umożliwia korzystanie z maksymalnie 2000 reguł wyboru, z których każdy może mieć warunek filtru, a w przypadku filtrów SQL również akcję przekształcania metadanych. Każdy warunek filtru jest zgodny z wybranym komunikatem wejściowym, który ma zostać skopiowany do subskrypcji.

Odbieranie komunikatów z subskrypcji jest identyczne z odbieraniem komunikatów z kolejek. Każda subskrypcja ma skojarzona kolejka utraconych komunikatów i możliwość automatycznego przekazywania komunikatów do innej kolejki lub tematów.

Tematy JMS umożliwiają klientom dynamiczne tworzenie nieururowalnych i trwałych subskrybentów, którzy opcjonalnie zezwalają na filtrowanie komunikatów za pomocą selektorów komunikatów. Te nieudostępne jednostki nie są obsługiwane przez usługę Service Bus. Składnia reguły filtru SQL dla usługi Service Bus jest podobna do składni selektora komunikatów obsługiwanej przez program JMS.

Strona wydawcy tematu JMS jest zgodna z usługą Service Bus, jak pokazano w tym przykładzie, ale dynamiczni subskrybenci nie są. Następujące interfejsy API JMS związane z topologią nie są obsługiwane w usłudze Service Bus.

Nieobsługiwane funkcje i ograniczenia

W przypadku korzystania z programu JMS za pośrednictwem protokołu AMQP 1.0 z usługą Service Bus istnieją następujące ograniczenia:

  • Na sesję dozwolony jest tylko jeden obiekt MessageProducer lub MessageConsumer . Jeśli musisz utworzyć wiele obiektów MessageProducer lub MessageConsumer w aplikacji, utwórz dedykowaną sesję dla każdego z nich.
  • Subskrypcje nietrwałych tematów nie są obecnie obsługiwane.
  • Obiekty MessageSelector nie są obecnie obsługiwane.
  • Transakcje rozproszone nie są obsługiwane, ale obsługiwane są sesje transakcyjne.

Usługa Service Bus dzieli płaszczyznę sterowania z płaszczyzny danych, więc nie obsługuje kilku dynamicznych funkcji topologii pakietu JMS.

Nieobsługiwana metoda Replace with
createDurableSubscriber Utwórz subskrypcję tematu, która portuje selektor komunikatów.
createDurableConsumer Utwórz subskrypcję tematu, która portuje selektor komunikatów.
createSharedConsumer Tematy usługi Service Bus są zawsze udostępniane. Zobacz sekcję "Tematy JMS a tematy usługi Service Bus".
createSharedDurableConsumer Tematy usługi Service Bus są zawsze udostępniane. Zobacz sekcję "Tematy JMS a tematy usługi Service Bus".
createTemporaryTopic Utwórz temat za pomocą interfejsu API zarządzania, narzędzi lub portalu z ustawieniem AutoDeleteOnIdle na okres wygaśnięcia.
createTopic Utwórz temat za pomocą interfejsu API zarządzania, narzędzi lub portalu.
Zrezygnować Usuń interfejs API zarządzania tematami, narzędzia lub portal.
createBrowser Nieobsługiwane. Użyj funkcji Peek() interfejsu API usługi Service Bus.
createQueue Utwórz kolejkę za pośrednictwem interfejsu API zarządzania, narzędzi lub portalu.
createTemporaryQueue Utwórz kolejkę za pośrednictwem interfejsu API zarządzania, narzędzi lub portalu z ustawieniem AutoDeleteOnIdle na okres wygaśnięcia.
receiveNoWait Użyj metody receive() dostarczonej przez zestaw SDK usługi Service Bus i określ bardzo niski lub zerowy limit czasu.

Podsumowanie

W tym artykule pokazano, jak używać funkcji obsługi komunikatów obsługiwanych przez brokera usługi Service Bus, takich jak kolejki i publikowanie lub subskrybowanie tematów, z języka Java przy użyciu popularnego interfejsu API JMS i protokołu AMQP 1.0.

Możesz również użyć protokołu AMQP usługi Service Bus 1.0 z innych języków, takich jak .NET, C, Python i PHP. Składniki utworzone przy użyciu tych różnych języków mogą niezawodnie wymieniać komunikaty i w pełni wierność dzięki obsłudze protokołu AMQP 1.0 w usłudze Service Bus.

Następne kroki