A Java Message Service 1.1 használata az Azure Service Bus standard és az AMQP 1.0 használatával

Figyelmeztetés:

Ez a cikk a Java Message Service (JMS) 1.1 API korlátozott támogatását nyújtja, és csak az Azure Service Bus standard szinthez érhető el.

A Java Message Service 2.0 API teljes támogatása csak az Azure Service Bus prémium szintű verziójában érhető el. Javasoljuk, hogy ezt a szintet használja.

Ez a cikk bemutatja, hogyan használhatja a Service Bus üzenetkezelési funkcióit Java-alkalmazásokból a népszerű JMS API-szabvány használatával. Ezek az üzenetkezelési funkciók közé tartoznak az üzenetsorok, valamint a témakörökre való közzététel vagy feliratkozás. Egy társcikk bemutatja, hogyan teheti ugyanezt az Azure Service Bus .NET API használatával. A két cikk együttes használatával megismerheti a platformfüggetlen üzenetküldést az Advanced Message Queuing Protocol (AMQP) 1.0 használatával.

Az AMQP 1.0 egy hatékony, megbízható, vezetékes szintű üzenetkezelési protokoll, amellyel robusztus, platformfüggetlen üzenetkezelési alkalmazásokat hozhat létre.

Az AMQP 1.0 Service Busban való támogatása azt jelenti, hogy egy hatékony bináris protokoll használatával számos platformról használhatja a sorba állítási és közzétételi és előfizetési közvetítői üzenetkezelési funkciókat. Nyelvek, keretrendszerek és operációs rendszerek kombinációjával készült összetevőkből álló alkalmazásokat is létrehozhat.

A Service Bus használatának első lépései

Ez a cikk feltételezi, hogy már rendelkezik Service Bus-névtérrel, amely egy elnevezett basicqueueüzenetsort tartalmaz. Ha nem, létrehozhatja a névteret és az üzenetsort az Azure Portal használatával. A Service Bus-névterek és üzenetsorok létrehozásáról további információt a Service Bus-üzenetsorok használatának első lépései című témakörben talál.

Megjegyzés:

A particionált üzenetsorok és témakörök az AMQP-t is támogatják. További információ: Particionált üzenetküldési entitások és AMQP 1.0-támogatás a Service Bus particionált üzenetsoraihoz és témaköreihez.

Az AMQP 1.0 JMS-ügyfélkódtár letöltése

Az Apache Qpid JMS AMQP 1.0 ügyfélkódtár legújabb verziójának letöltéséről az Apache Qpid letöltési webhelyén tájékozódhat.

A Következő JAR-fájlokat kell hozzáadnia az Apache Qpid JMS AMQP 1.0 terjesztési archívumából a Java CLASSPATH környezeti változóhoz, amikor JMS-alkalmazásokat hoz létre és futtat a Service Bus használatával:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jar

Megjegyzés:

Előfordulhat, hogy a JMS JAR neve és verziója módosult. További információ: Qpid JMS AMQP 1.0.

Java-alkalmazások kódolása

Java-elnevezési és címtár-kezelőfelület

A JMS a Java Naming and Directory Interface (JNDI) használatával hozza létre a logikai nevek és a fizikai nevek elkülönítését. A JMS-objektumok két típusa oldható fel A JNDI használatával: Csatlakozás ionFactory és Destination. A JNDI egy szolgáltatói modellt használ, amelyhez különböző címtárszolgáltatásokat csatlakoztathat a névfeloldási feladatok kezeléséhez. Az Apache Qpid JMS AMQP 1.0 kódtár egy egyszerű tulajdonságfájl-alapú JNDI-szolgáltatóval rendelkezik, amely a következő formátumú tulajdonságfájl használatával van konfigurálva:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

JNDI-környezet beállítása és a Csatlakozás ionFactory objektum konfigurálása

A hivatkozott kapcsolati sztring az Elsődleges Csatlakozás ion-sztring alatt az Azure Portal megosztott hozzáférési szabályzataiban érhető el.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Gyártói és fogyasztói célsorok konfigurálása

A Qpid tulajdonságfájl JNDI-szolgáltatójának célhelyének meghatározásához használt bejegyzés formátuma a következő.

Célsor létrehozása a gyártó számára:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

Célsor létrehozása a fogyasztó számára:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

A JMS-alkalmazás írása

A JMS Service Bus szolgáltatással való használatakor nincs szükség speciális API-kra vagy beállításokra. A későbbiekben néhány korlátozásra is sor kerül. Mint minden JMS-alkalmazás esetén, a JNDI-környezet konfigurálása szükséges ahhoz, hogy fel lehessen oldani a Csatlakozás ionFactory objektumokat és célhelyeket.

A JNDI InitialContext objektum konfigurálása

A JNDI-környezet úgy van konfigurálva, hogy átad egy konfigurációs információ kivonattábláját a javax.naming.InitialContext osztály konstruktorának. A kivonattáblában a két szükséges elem az Initial Context Factory osztályneve és a szolgáltató URL-címe. Az alábbi kód bemutatja, hogyan konfigurálhatja a JNDI-környezetet a Qpid tulajdonságok fájlalapú JNDI-szolgáltatójának servicebus.properties nevű tulajdonságfájllal való használatára.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Egyszerű JMS-alkalmazás, amely Service Bus-üzenetsort használ

Az alábbi példaprogram JMS szöveges üzeneteket küld egy Service Bus-üzenetsorba a QUEUE JNDI logikai nevével, és visszaveheti az üzeneteket.

Az Azure Service Bus-minták JMS-üzenetsorának rövid útmutatójából elérheti az összes forráskódot és konfigurációs információt.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

Az alkalmazás futtatása

Adja át a Csatlakozás ion sztringet a megosztott hozzáférési szabályzatokból az alkalmazás futtatásához. Az alkalmazást futtató űrlap következő kimenete:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

AMQP-diszpozíció és Service Bus-műveletleképezés

Az AMQP-diszpozíció a következőképpen fordítható le Service Bus-műveletre:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

JMS-témakörök és Service Bus-témakörök

A Service Bus-témakörök és -előfizetések használata a JMS API-val alapvető küldési és fogadási képességeket biztosít. Kényelmes választás, ha más üzenetközvetítők alkalmazásait JMS-kompatibilis API-kkal portozza, annak ellenére, hogy a Service Bus-témakörök eltérnek a JMS-témaköröktől, és néhány módosítást igényelnek.

A Service Bus-témakörök elnevezett, megosztott és tartós előfizetésekbe irányítják az üzeneteket, amelyeket az Azure Resource Management felületén, az Azure parancssori eszközein vagy az Azure Portalon kezelnek. Minden előfizetés legfeljebb 2000 kiválasztási szabályt tesz lehetővé, amelyek mindegyike rendelkezhet szűrési feltétellel, és SQL-szűrők esetén metaadat-átalakítási művelet is lehet. Minden szűrési feltétel megfelel az előfizetésbe másolandó bemeneti üzenetnek.

Az előfizetésekből érkező üzenetek fogadása megegyezik az üzenetsorokból érkező üzenetekkel. Minden előfizetéshez tartozik egy kézbesítetlen levelek üzenetsora, és képes automatikusan továbbítani az üzeneteket egy másik üzenetsorba vagy témakörökbe.

A JMS-témakörök lehetővé teszik az ügyfelek számára, hogy dinamikusan hozzanak létre nem használható és tartós előfizetőket, amelyek opcionálisan lehetővé teszik az üzenetek szűrését üzenetválasztókkal. Ezeket a nem tagolt entitásokat a Service Bus nem támogatja. A Service Bus SQL-szűrőszabály-szintaxisa hasonló a JMS által támogatott üzenetválasztó szintaxishoz.

A JMS-témakör közzétevői oldala kompatibilis a Service Bus szolgáltatással, ahogyan az ebben a példában is látható, de a dinamikus előfizetők nem. A Service Bus nem támogatja a következő topológiával kapcsolatos JMS API-kat.

Nem támogatott funkciók és korlátozások

A JMS amqp 1.0-s és Service Bus-alapú használata esetén a következő korlátozások vonatkoznak:

  • Munkamenetenként csak egy MessageProducer vagy MessageConsumer objektum engedélyezett. Ha több MessageProducer - vagy MessageConsumer-objektumot kell létrehoznia egy alkalmazásban, hozzon létre egy dedikált munkamenetet mindegyikhez.
  • A témakör-előfizetések jelenleg nem támogatottak.
  • A MessageSelector-objektumok jelenleg nem támogatottak.
  • Az elosztott tranzakciók nem támogatottak, de a tranzakciós munkamenetek támogatottak.

A Service Bus felosztja a vezérlősíkot az adatsíkról, így nem támogatja a JMS több dinamikus topológiafüggvényét.

Nem támogatott metódus Csere erre
createDurableSubscriber Hozzon létre egy témakör-előfizetést, amely az üzenetválasztót portozza.
createDurableConsumer Hozzon létre egy témakör-előfizetést, amely az üzenetválasztót portozza.
createSharedConsumer A Service Bus-témakörök mindig megoszthatóak. Lásd a "JMS-témakörök és Service Bus-témakörök" című szakaszt.
createSharedDurableConsumer A Service Bus-témakörök mindig megoszthatóak. Lásd a "JMS-témakörök és Service Bus-témakörök" című szakaszt.
createTemporaryTopic Hozzon létre egy témakört a felügyeleti API-val, az eszközökkel vagy a portállal, és az AutoDeleteOnIdle lejárati időre van beállítva.
createTopic Hozzon létre egy témakört a felügyeleti API-val, az eszközökkel vagy a portállal.
Leiratkozás Törölje a témakörkezelési API-t, az eszközöket vagy a portált.
createBrowser Nem támogatott. Használja a Service Bus API Peek() funkcióját.
createQueue Hozzon létre egy üzenetsort a felügyeleti API-val, az eszközökkel vagy a portállal.
createTemporaryQueue Hozzon létre egy üzenetsort a felügyeleti API-val, az eszközökkel vagy a portálon az AutoDeleteOnIdle lejárati időszakra van állítva.
receiveNoWait Használja a Service Bus SDK által biztosított fogadási() metódust, és adjon meg nagyon alacsony vagy nulla időtúllépést.

Összesítés

Ez a cikk bemutatja, hogyan használhatja a Service Bus által közvetített üzenetkezelési funkciókat, például az üzenetsorokat, valamint hogyan tehet közzé vagy iratkozhat fel témaköröket Java-ból a népszerű JMS API és az AMQP 1.0 használatával.

A Service Bus AMQP 1.0-t más nyelvekről is használhatja, például .NET, C, Python és PHP nyelven. A különböző nyelvek használatával létrehozott összetevők megbízhatóan és teljes hűség mellett cserélhetnek üzeneteket a Service Bus AMQP 1.0-támogatásával.

Következő lépések