Menggunakan Java Message Service 1.1 dengan standar Azure Service Bus dan AMQP 1.0

Peringatan

Artikel ini melayani dukungan terbatas untuk API Layanan Pesan Java (JMS) 1.1 dan hanya ada untuk tingkat standar Azure Service Bus.

Dukungan penuh untuk JAVA Message Service 2.0 API hanya tersedia di tingkat premium Azure Bus Layanan. Kami menyarankan agar Anda menggunakan tingkatan ini.

Artikel ini menjelaskan cara menggunakan fitur pesan Service Bus dari aplikasi Java dengan menggunakan standar JMS API yang populer. Fitur perpesanan ini mencakup antrean dan penerbitan atau berlangganan topik. Artikel pendamping menjelaskan cara melakukan hal yang sama dengan menggunakan Azure Service Bus .NET API. Anda dapat menggunakan kedua artikel ini bersama-sama untuk mempelajari tentang pesan lintas platform menggunakan Advanced Message Queuing Protocol (AMQP) 1.0.

AMQP 1.0 adalah protokol perpesanan tingkat kawat yang efisien, andal, yang dapat Anda gunakan untuk membangun aplikasi perpesanan lintas platform yang kuat.

Dukungan untuk AMQP 1.0 di Service Bus berarti Anda dapat menggunakan fitur antrean dan publikasikan atau berlangganan pesan broker dari berbagai platform dengan menggunakan protokol biner yang efisien. Anda juga dapat membuat aplikasi yang terdiri dari komponen yang dibangun dengan menggunakan campuran bahasa, kerangka kerja, dan sistem operasi.

Memulai Service Bus

Artikel ini mengasumsikan bahwa Anda sudah memiliki namespace Service Bus yang berisi antrean bernama basicqueue. Jika tidak, Anda bisa membuat namespace dan antrean dengan menggunakan portal Microsoft Azure. Untuk informasi selengkapnya tentang cara membuat namespace dan antrean Service Bus, lihat Mulai dengan antrean Service Bus.

Catatan

Antrean dan topik yang dipartisi juga mendukung AMQP. Untuk informasi selengkapnya, lihat Entitas pesan partisi dan dukungan AMQP 1.0 untuk antrean dan topik partisi Service Bus.

Unduh pustaka klien AMQP 1.0 JMS

Untuk informasi tentang di mana mengunduh versi terbaru dari pustaka klien Apache Qpid JMS AMQP 1.0, lihat situs unduhan Apache Qpid.

Anda harus menambahkan file JAR berikut dari arsip distribusi Apache Qpid JMS AMQP 1.0 ke variabel lingkungan Java CLASSPATH saat Anda membangun dan menjalankan aplikasi JMS dengan Service Bus:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jar

Catatan

Nama dan versi JMS JAR mungkin telah berubah. Untuk informasi selengkapnya, lihat Qpid JMS AMQP 1.0.

Aplikasi Code Java

Java Naming and Directory Interface

JMS menggunakan Java Naming and Directory Interface (JNDI) untuk membuat pemisahan antara nama logika dan nama fisik. Dua jenis objek JMS diselesaikan dengan menggunakan JNDI: ConnectionFactory dan Destination. JNDI menggunakan model penyedia tempat Anda dapat menyambungkan layanan direktori yang berbeda untuk menangani tugas resolusi nama. Pustaka Apache Qpid JMS AMQP 1.0 dilengkapi dengan penyedia JNDI berbasis file properti sederhana yang dikonfigurasi dengan menggunakan file properti dengan format berikut:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

Menyiapkan konteks JNDI dan mengonfigurasi objek ConnectionFactory

String koneksi yang direferensikan adalah yang tersedia dalam Kebijakan Akses Bersama di portal Microsoft Azure di bawah String Koneksi Utama.

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

Mengonfigurasi antrean produsen dan tujuan konsumen

Entri yang digunakan untuk menentukan tujuan dalam file properti Qpid penyedia JNDI adalah format berikut.

Untuk membuat antrean tujuan bagi produsen:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create producer
MessageProducer producer = session.createProducer(queue);

Untuk membuat antrean tujuan bagi konsumen:

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create consumer
MessageConsumer consumer = session.createConsumer(queue);

Menulis aplikasi JMS

Tidak ada API atau opsi khusus yang diperlukan saat Anda menggunakan JMS dengan Service Bus. Ada beberapa batasan yang akan ditanggung nantinya. Seperti halnya aplikasi JMS, hal pertama yang diperlukan adalah konfigurasi lingkungan JNDI untuk dapat menyelesaikan objek dan tujuan ConnectionFactory.

Mengonfigurasi objek JNDI InitialContext

Lingkungan JNDI dikonfigurasi dengan meneruskan tabel hash informasi konfigurasi ke dalam konstruktor javax.naming.InitialContext. Dua elemen yang diperlukan dalam tabel hash adalah nama kelas Initial Context Factory dan URL penyedia. Kode berikut menunjukkan cara mengonfigurasi lingkungan JNDI untuk menggunakan penyedia JNDI berbasis file properti Qpid dengan file properti bernama servicebus.properties.

// Set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Aplikasi JMS sederhana yang menggunakan antrean Service Bus

Contoh program berikut mengirim pesan teks JMS ke antrean Service Bus dengan nama logika JNDI QUEUE dan menerima pesan kembali.

Anda dapat mengakses semua kode sumber dan informasi konfigurasi dari Azure Service Bus sampel cepat antrean JMS.

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS queue producer into
 * an Azure Service Bus queue and receive them with a JMS message consumer.
 * JMS queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string properties is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringProperties csb = new ConnectionStringProperties(connectionString);
        
        // Set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // We create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter.
        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create connection
            Connection connection = cf.createConnection(csb.getSharedAccessKeyName(), csb.getSharedAccessKey());
            connection.start();
            // Create session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // Create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // Received message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // Wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // Parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // Get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

Jalankan aplikasi

Berikan String Koneksi dari Kebijakan Akses Bersama untuk menjalankan aplikasi. Output berikut adalah formulir yang menjalankan aplikasi:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

Disposisi AMQP dan pemetaan operasi Service Bus

Inilah cara disposisi AMQP diterjemahkan menjadi operasi Service Bus:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

Topik JMS vs. Topik Service Bus

Menggunakan topik dan langganan Service Bus melalui JMS API menyediakan kemampuan pengiriman dan terima dasar. Ini adalah pilihan yang nyaman ketika Anda mentransfer aplikasi dari broker pesan lain dengan API yang mematuhi JMS, meskipun topik Service Bus berbeda dari topik JMS dan memerlukan beberapa penyesuaian.

Topik Service Bus merutekan pesan ke langganan bernama, dibagikan, dan tahan lama yang dikelola melalui antarmuka Azure Resource Management, alat baris perintah Azure, atau portal Microsoft Azure. Setiap langganan memungkinkan hingga 2.000 aturan pemilihan, yang masing-masing mungkin memiliki kondisi filter dan, untuk filter SQL, juga tindakan transformasi metadata. Setiap kondisi filter yang cocok memilih pesan input yang akan disalin ke dalam langganan.

Menerima pesan dari langganan identik dengan menerima pesan dari antrean. Setiap langganan memiliki antrean huruf mati terkait dan kemampuan untuk meneruskan pesan secara otomatis ke antrean atau topik lain.

Topik JMS memungkinkan klien untuk secara dinamis membuat pelanggan yang tidak dapat diasuransikan dan tahan lama yang secara opsional memungkinkan pemfilteran pesan dengan pemilih pesan. Entitas yang tidak dibagikan ini tidak didukung oleh Service Bus. Sintaks aturan filter SQL untuk Service Bus mirip dengan sintaks pemilih pesan yang didukung oleh JMS.

Sisi penerbit topik JMS kompatibel dengan Service Bus, seperti yang ditunjukkan dalam sampel ini, tetapi pelanggan dinamis tidak. API JMS terkait topologi berikut ini tidak didukung dengan Service Bus.

Fitur dan batasan yang tidak didukung

Pembatasan berikut ada saat Anda menggunakan JMS melalui AMQP 1.0 dengan Service Bus, yaitu:

  • Hanya satu objek MessageProducer atau MessageConsumer yang diperbolehkan per sesi. Jika Anda perlu membuat beberapa objek MessageProducer atau MessageConsumer dalam aplikasi, buat sesi khusus untuk masing-masing objek.
  • Langganan topik yang mudah menguap saat ini tidak didukung.
  • Objek MessageSelector saat ini tidak didukung.
  • Transaksi terdistribusi tidak didukung, tetapi sesi yang ditransaksikan didukung.

Service Bus membagi control plane dari data plane, sehingga tidak mendukung beberapa fungsi topologi dinamis JMS.

Metode yang tidak didukung Ganti dengan
createDurableSubscriber Membuat langganan topik yang memindahkan pemilih pesan.
createDurableConsumer Membuat langganan topik yang memindahkan pemilih pesan.
createSharedConsumer Topik Service Bus selalu dapat dibagikan. Lihat bagian "Topik JMS vs. Topik Service Bus."
createSharedDurableConsumer Topik Service Bus selalu dapat dibagikan. Lihat bagian "Topik JMS vs. Topik Service Bus."
createTemporaryTopic Buat topik melalui API manajemen, alat, atau portal dengan AutoDeleteOnIdle diatur ke periode kedaluwarsa.
createTopic Buat topik melalui API manajemen, alat, atau portal.
berhenti berlangganan Hapus API, alat, atau portal manajemen topik.
createBrowser Tidak didukung. Gunakan fungsi Peek() dari Service Bus API.
createQueue Buat antrean melalui API manajemen, alat, atau portal.
createTemporaryQueue Buat antrean melalui API manajemen, alat, atau portal dengan AutoDeleteOnIdle diatur ke periode kedaluwarsa.
receiveNoWait Gunakan metode receive() yang disediakan oleh Service Bus SDK dan tentukan batas waktu yang sangat rendah atau nol.

Ringkasan

Artikel ini menunjukkan kepada Anda cara menggunakan fitur pesan broker Service Bus, seperti antrean dan menerbitkan atau berlangganan topik, dari Java dengan menggunakan JMS API dan AMQP 1.0 yang populer.

Anda juga dapat menggunakan Service Bus AMQP 1.0 dari bahasa lain, seperti .NET, C, Python, dan PHP. Komponen yang dibangun dengan menggunakan bahasa yang berbeda ini dapat bertukar pesan dengan andal dan dengan kesetiaan penuh dengan menggunakan dukungan AMQP 1.0 di Service Bus.

Langkah berikutnya