Azure Service Bus と AMQP 1.0 で Java Message Service (JMS) を使用するUse the Java Message Service (JMS) with Azure Service Bus and AMQP 1.0

この記事では、一般的な Java Message Service (JMS) API 規格を使用して Java アプリケーションから Azure Service Bus のメッセージング機能 (キューおよびトピック発行/サブスクライブ) を使用する方法について説明します。This article explains how to use Azure Service Bus messaging features (queues and publish/subscribe topics) from Java applications using the popular Java Message Service (JMS) API standard. Azure Service Bus .NET API を使用して同じ作業を実行する方法が説明されている関連記事があります。There is a companion article that explains how to do the same using the Azure Service Bus .NET API. これら 2 種類のガイドを使用して、AMQP 1.0 を使用したクロスプラットフォームのメッセージングについて学習できます。You can use these two guides together to learn about cross-platform messaging using AMQP 1.0.

Advanced Message Queuing Protocol (AMQP) 1.0 は、堅牢なクロスプラットフォーム メッセージング アプリケーションを作成するために使用できる、効率的で信頼性の高い回線レベルのメッセージング プロトコルです。The Advanced Message Queuing Protocol (AMQP) 1.0 is an efficient, reliable, wire-level messaging protocol that you can use to build robust, cross-platform messaging applications.

Azure Service Bus での AMQP 1.0 のサポートにより、ブローカー メッセージング機能 (キューおよびトピック発行/サブスクライブ) をさまざまなプラットフォームから効率的なバイナリ プロトコルを使って利用できます。Support for AMQP 1.0 in Azure Service Bus means that you can use the queuing and publish/subscribe brokered messaging features from a range of platforms using an efficient binary protocol. さらに、さまざまな言語、フレームワーク、およびオペレーティング システムを使って作成されたコンポーネントで構成されたアプリケーションを作成できます。Furthermore, you can build applications comprised of components built using a mix of languages, frameworks, and operating systems.

Service Bus の概要Get started with Service Bus

このガイドは、basicqueue という名前のキューが含まれている Service Bus 名前空間が既にあることを前提としています。This guide assumes that you already have a Service Bus namespace containing a queue named basicqueue. まだない場合は、Azure ポータルを使用して名前空間とキューを作成できます。If you do not, then you can create the namespace and queue using the Azure portal. Service Bus 名前空間とキューの作成方法の詳細については、「Service Bus キューの使用」を参照してください。For more information about how to create Service Bus namespaces and queues, see Get started with Service Bus queues.

注意

パーティション分割されたキューおよびトピックも AMQP をサポートします。Partitioned queues and topics also support AMQP. 詳細については、「パーティション分割されたメッセージング エンティティ」と「パーティション分割された Service Bus のキューおよびトピックでの AMQP 1.0 のサポート」を参照してください。For more information, see Partitioned messaging entities and AMQP 1.0 support for Service Bus partitioned queues and topics.

AMQP 1.0 JMS クライアント ライブラリのダウンロードDownloading the AMQP 1.0 JMS client library

Apache Qpid JMS AMQP 1.0 クライアント ライブラリの最新バージョンをダウンロードする場所については、https://qpid.apache.org/download.html を参照してください。For information about where to download the latest version of the Apache Qpid JMS AMQP 1.0 client library, visit https://qpid.apache.org/download.html.

Service Bus を使用する JMS アプリケーションをビルドおよび実行するときは、次の 4 つの JAR ファイルを Apache Qpid JMS AMQP 1.0 ディストリビューション アーカイブから Java CLASSPATH に追加する必要があります。You must add the following four JAR files from the Apache Qpid JMS AMQP 1.0 distribution archive to the Java CLASSPATH when building and running JMS applications with Service Bus:

  • geronimo-jms_1.1_spec-1.0.jargeronimo-jms_1.1_spec-1.0.jar
  • qpid-jms-client-[version].jarqpid-jms-client-[version].jar

注意

JMS JAR 名およびバージョンが変更されている可能性があります。JMS JAR names and versions may have changed. 詳細については、Qpid JMS - AMQP 1.0 を参照してください。For details, see Qpid JMS - AMQP 1.0.

Java アプリケーションのコーディングCoding Java applications

Java Naming and Directory Interface (JNDI)Java Naming and Directory Interface (JNDI)

JMS では、Java Naming and Directory Interface (JNDI) を使用して論理名と物理名が関連付けられます。JMS uses the Java Naming and Directory Interface (JNDI) to create a separation between logical names and physical names. JNDI を使用した名前解決には 2 種類の JMS オブジェクトが使用されます。ConnectionFactory と Destination です。Two types of JMS objects are resolved using JNDI: ConnectionFactory and Destination. JNDI で使用されるプロバイダー モデルでは、さまざまなディレクトリ サービスに接続して、名前解決のタスクを処理できます。JNDI uses a provider model into which you can plug different directory services to handle name resolution duties. Apache Qpid JMS AMQP 1.0 ライブラリには、次の形式のプロパティ ファイルを使用して構成されるシンプルなプロパティ ファイル ベースの JNDI Provider が付属しています。The Apache Qpid JMS AMQP 1.0 library comes with a simple properties file-based JNDI Provider that is configured using a properties file of the following format:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1

JNDI コンテキストのセットアップと ConnectionFactory の構成Setup JNDI context and Configure the ConnectionFactory

参照されている ConnectionString は、Azure Portal[プライマリ接続文字列] の下の '共有アクセス ポリシー' で使用可能な接続文字列ですThe ConnectionString referenced in the one available in the 'Shared Access Policies' in the Azure Portal under Primary Connection String

// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
// connection string. 
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
// set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");

プロデューサーとコンシューマーの送信先キューの構成Configure Producer and Consumer Destination Queues

Qpid プロパティ ファイル JNDI プロバイダーで送信先の定義に使用するエントリは、次のような形式になります。The entry used to define a destination in the Qpid properties file JNDI provider is of the following format:

プロデューサーの送信先キューを作成するにはTo create the destination queue for the Producer -

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create Producer
MessageProducer producer = session.createProducer(queue);

コンシューマーの送信先キューを作成するにはTo create a destination queue for the Consumer -

String queueName = "queueName";
Destination queue = (Destination) queueName;

ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Create Consumer
MessageConsumer consumer = session.createConsumer(queue);

JMS アプリケーションの記述Write the JMS application

JMS と Service Bus の使用時に必要になる特殊な API やオプションはありません。There are no special APIs or options required when using JMS with Service Bus. ただし、この後で取り上げているようないくつかの制限があります。However, there are a few restrictions that will be covered later. いずれの JMS アプリケーションでも、まず、ConnectionFactory と送信先の名前解決が可能になるように JNDI 環境を構成する必要があります。As with any JMS application, the first thing required is configuration of the JNDI environment, to be able to resolve a ConnectionFactory and destinations.

JNDI InitialContext の構成Configure the JNDI InitialContext

JNDI 環境を構成するには、構成情報のハッシュ テーブルを javax.naming.InitialContext クラスのコンストラクターに渡します。The JNDI environment is configured by passing a hashtable of configuration information into the constructor of the javax.naming.InitialContext class. ハッシュ テーブル内の 2 つの必須の要素は Initial Context Factory と Provider URL です。The two required elements in the hashtable are the class name of the Initial Context Factory and the Provider URL. 次のコードでは、Qpid Properties File JNDI Provider と、servicebus.properties という名前のプロパティ ファイルを使用して、JNDI 環境を構成する方法を示しています。The following code shows how to configure the JNDI environment to use the Qpid properties file based JNDI Provider with a properties file named servicebus.properties.

// set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);

Service Bus キューを使用するシンプルな JMS アプリケーションA simple JMS application using a Service Bus queue

次のサンプル プログラムでは、JNDI 論理名が QUEUE の Service Bus キューに JMS TextMessages を送信し、折り返しそれらのメッセージを受信します。The following example program sends JMS TextMessages to a Service Bus queue with the JNDI logical name of QUEUE, and receives the messages back.

Azure Service Bus サンプル JMS キュー クイック スタートから、すべてのソース コードと構成情報にアクセスできますYou can all access all the source code and configuration information from the Azure Service Bus Samples JMS Queue Quick Start

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import org.apache.commons.cli.*;
import org.apache.log4j.*;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
 * This sample demonstrates how to send messages from a JMS Queue producer into
 * an Azure Service Bus Queue, and receive them with a JMS message consumer.
 * JMS Queue. 
 */
public class JmsQueueQuickstart {

    // Number of messages to send
    private static int totalSend = 10;
    //Tracking counter for how many messages have been received; used as termination condition
    private static AtomicInteger totalReceived = new AtomicInteger(0);
    // log4j logger 
    private static Logger logger = Logger.getRootLogger();

    public void run(String connectionString) throws Exception {

        // The connection string builder is the only part of the azure-servicebus SDK library
        // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
        // connection string. 
        ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
        
        // set up JNDI context
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
        hashtable.put("queue.QUEUE", "BasicQueue");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        
        // Look up queue
        Destination queue = (Destination) context.lookup("QUEUE");

        // we create a scope here so we can use the same set of local variables cleanly 
        // again to show the receive side separately with minimal clutter
        {
            // Create Connection
            Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
            // Create Session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            // Create producer
            MessageProducer producer = session.createProducer(queue);

            // Send messages
            for (int i = 0; i < totalSend; i++) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(String.valueOf(i).getBytes());
                producer.send(message);
                System.out.printf("Sent message %d.\n", i + 1);
            }

            producer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        {
            // Create Connection
            Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
            connection.start();
            // Create Session, no transaction, client ack
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Create consumer
            MessageConsumer consumer = session.createConsumer(queue);
            // create a listener callback to receive the messages
            consumer.setMessageListener(message -> {
                try {
                    // receives message is passed to callback
                    System.out.printf("Received message %d with sq#: %s\n",
                            totalReceived.incrementAndGet(), // increments the tracking counter
                            message.getJMSMessageID());
                    message.acknowledge();
                } catch (Exception e) {
                    logger.error(e);
                }
            });

            // wait on the main thread until all sent messages have been received
            while (totalReceived.get() < totalSend) {
                Thread.sleep(1000);
            }
            consumer.close();
            session.close();
            connection.stop();
            connection.close();
        }

        System.out.printf("Received all messages, exiting the sample.\n");
        System.out.printf("Closing queue client.\n");
    }

    public static void main(String[] args) {

        System.exit(runApp(args, (connectionString) -> {
            JmsQueueQuickstart app = new JmsQueueQuickstart();
            try {
                app.run(connectionString);
                return 0;
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 1;
            }
        }));
    }

    static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

    public static int runApp(String[] args, Function<String, Integer> run) {
        try {

            String connectionString = null;

            // parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            CommandLineParser clp = new DefaultParser();
            CommandLine cl = clp.parse(options, args);
            if (cl.getOptionValue("c") != null) {
                connectionString = cl.getOptionValue("c");
            }

            // get overrides from the environment
            String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
            if (env != null) {
                connectionString = env;
            }

            if (connectionString == null) {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
                return 2;
            }
            return run.apply(connectionString);
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
            return 3;
        }
    }
}

アプリケーションの実行Run the application

共有アクセス ポリシーから接続文字列を渡してアプリケーションを実行します。Pass the Connection String from the Shared Access Policies to run the application. 以下は、アプリケーションの実行によるフォームの出力です。Below is the output of the form by running the Application:

> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.

AMQP の disposition と Service Bus 操作のマッピングAMQP disposition and Service Bus operation mapping

AMQP の disposition が Service Bus 操作にどのように変換されるかを次に示します。Here is how an AMQP disposition translates to a Service Bus operation:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

JMS トピックとService Bus トピックJMS Topics vs. Service Bus Topics

Java Message Service (JMS) API を通じて Azure Service Bus のトピックとサブスクリプションを使用すると、基本的な送受信機能が提供されます。Using Azure Service Bus topics and subscriptions through the Java Message Service (JMS) API provides basic send and receive capabilities. これは、JMS 準拠の API を使用して他のメッセージ ブローカーからアプリケーションを移植する場合に便利な選択肢となります。ただし、Service Bus トピックは JMS トピックとは異なるため、多少の調整が必要となります。It's a convenient choice when porting applications from other message brokers with JMS compliant APIs, even though Service Bus topics differ from JMS Topics and require a few adjustments.

Azure Service Bus トピックは、Azure Resource Management インターフェイス、Azure コマンド ライン ツール、または Azure portal を通じて管理される、名前の付いた、共有の、永続的サブスクリプションにメッセージをルーティングします。Azure Service Bus topics route messages into named, shared, durable subscriptions that are managed through the Azure Resource Management interface, the Azure command line tools, or through the Azure portal. 各サブスクリプションでは、最大 2000 の選択ルールを使用できます。各ルールではフィルター条件を使用できるほか、SQL フィルターについては、メタデータ変換アクションも使用できます。Each subscription allows for up to 2000 selection rules, each of which may have a filter condition and, for SQL filters, also a metadata transformation action. フィルター条件が一致するごとに、サブスクリプション内にコピーされる入力メッセージが選択されます。Each filter condition match selects the input message to be copied into tehj subscription.

サブスクリプションからのメッセージ受信は、キューからのメッセージ受信と同じです。Receiving messages from subscriptions is identical receiving messages from queues. 各サブスクリプションには配信不能キューが関連付けられ、別のキューやトピックにメッセージを自動的に転送することもできます。Each subscription has an associated dead-letter queue as well as the ability to automatically forward messages to another queue or topics.

JMS トピックでは、非永続的および永続的サブスクライバーをクライアントが動的に作成できます。これらのサブスクライバーでは、メッセージ セレクターを使用してメッセージをフィルター処理することもできます。JMS Topics allow clients to dynamically create nondurable and durable subscribers that optionally allow filtering messages with message selectors. これらの非共有エンティティは、Service Bus ではサポートされていません。These unshared entities are not supported by Service Bus. ただし、Service Bus の SQL フィルター ルール構文は、JMS でサポートされているメッセージ セレクター構文とよく似ています。The SQL filter rule syntax for Service Bus is, however, very similar to the message selector syntax supported by JMS.

JMS トピックのパブリッシャー側は (この例で示すように) Service Bus と互換性がありますが、動的サブスクライバーについては互換性がありません。The JMS Topic publisher side is compatible with Service Bus, as shown in this sample, but dynamic subscribers are not. 次のトポロジ関連 JMS API は、Service Bus ではサポートされていません。The following topology-related JMS APIs are not supported with Service Bus.

サポートされていない機能および制限Unsupported features and restrictions

JMS を AMQP 1.0 と Service Bus で使用する場合は、次の制限があります。The following restrictions exist when using JMS over AMQP 1.0 with Service Bus, namely:

  • Session ごとに作成できる MessageProducer または MessageConsumer は 1 つのみです。Only one MessageProducer or MessageConsumer is allowed per Session. アプリケーションで複数の MessageProducers または MessageConsumers を作成する必要がある場合は、それぞれに専用のセッションを作成してください。If you need to create multiple MessageProducers or MessageConsumers in an application, create a dedicated Session for each of them.
  • 揮発性トピック サブスクリプションは現在サポートされていません。Volatile topic subscriptions are not currently supported.
  • MessageSelectors は現在サポートされていません。MessageSelectors are not currently supported.
  • トランザクション セッションと分散トランザクションはサポートされません。Transacted sessions and distributed transactions are not supported.

さらに、Azure Service Bus では、データ プレーンからコントロール プレーンが分離されるため、JMS の動的トポロジ関数のいくつかがサポートされていません。Additionally, Azure Service Bus splits the control plane from the data plane and therefore does not support several of JMS's dynamic topology functions:

サポートされていないメソッドUnsupported method 置換後の文字列Replace with
createDurableSubscribercreateDurableSubscriber メッセージ セレクタを移植するトピック サブスクリプションを作成しますcreate a Topic subscription porting the message selector
createDurableConsumercreateDurableConsumer メッセージ セレクタを移植するトピック サブスクリプションを作成しますcreate a Topic subscription porting the message selector
createSharedConsumercreateSharedConsumer Service Bus トピックは常に共有可能です。上記を参照してくださいService Bus topics are always shareable, see above
createSharedDurableConsumercreateSharedDurableConsumer Service Bus トピックは常に共有可能です。上記を参照してくださいService Bus topics are always shareable, see above
createTemporaryTopiccreateTemporaryTopic AutoDeleteOnIdle に有効期間を設定して、management API/tools/portal 経由でトピックを作成しますcreate a topic via management API/tools/portal with AutoDeleteOnIdle set to an expiration period
createTopiccreateTopic management API/tools/portal 経由でトピックを作成します。create a topic via management API/tools/portal
unsubscribeunsubscribe トピック management API/tools/portal を削除しますdelete the topic management API/tools/portal
createBrowsercreateBrowser サポートされていません。unsupported. Service Bus API の Peek() 機能を使用してくださいUse the Peek() functionality of the Service Bus API
createQueuecreateQueue management API/tools/portal 経由でキューを作成しますcreate a queue via management API/tools/portal
createTemporaryQueuecreateTemporaryQueue AutoDeleteOnIdle に有効期間を設定して、management API/tools/portal 経由でキューを作成しますcreate a queue via management API/tools/portal with AutoDeleteOnIdle set to an expiration period
receiveNoWaitreceiveNoWait Service Bus SDK によって提供される receive() メソッドを利用し、非常に短いタイムアウトまたはゼロ タイムアウトを指定しますutilize the receive() method provided by the Service Bus SDK and specify a very low or zero timeout

まとめSummary

このガイドでは、一般的な JMS API と AMQP 1.0 を使って Java から Service Bus の仲介型メッセージング機能 (キューおよびトピック発行/サブスクライブ) を使用する方法について説明しました。This how-to guide showed how to use Service Bus brokered messaging features (queues and publish/subscribe topics) from Java using the popular JMS API and AMQP 1.0.

Service Bus AMQP 1.0 のサポートは、.NET、C、Python、PHP など、その他の言語からも使用できます。You can also use Service Bus AMQP 1.0 from other languages, including .NET, C, Python, and PHP. Service Bus で AMQP 1.0 のサポートを使用すると、これらのさまざまな言語を使って作成されたコンポーネントで高い信頼性と十分な忠実度のメッセージ交換が実現されます。Components built using these different languages can exchange messages reliably and at full fidelity using the AMQP 1.0 support in Service Bus.

次の手順Next steps