Use the Java Message Service (JMS) with Azure Service Bus and AMQP 1.0
Warning
The below guide caters to limited support for Java Message Service (JMS) 1.1 API and exists for Azure Service Bus standard tier only.
Full support for the Java Message Service (JMS) 2.0 API is available only on the Azure Service Bus Premium tier in preview, which is highly recommended.
This article explains how to use Azure Service Bus messaging features (queues and publish/subscribe topics) from Java applications using the popular Java Message Service (JMS) API standard. There is a companion article that explains how to do the same using the Azure Service Bus .NET API. You can use these two guides together to learn about cross-platform messaging using AMQP 1.0.
The Advanced Message Queuing Protocol (AMQP) 1.0 is an efficient, reliable, wire-level messaging protocol that you can use to build robust, cross-platform messaging applications.
Support for AMQP 1.0 in Azure Service Bus means that you can use the queuing and publish/subscribe brokered messaging features from a range of platforms using an efficient binary protocol. Furthermore, you can build applications comprised of components built using a mix of languages, frameworks, and operating systems.
Get started with Service Bus
This guide assumes that you already have a Service Bus namespace containing a queue named basicqueue. If you don't, then you can create the namespace and queue using the Azure portal. For more information about how to create Service Bus namespaces and queues, see Get started with Service Bus queues.
Note
Partitioned queues and topics also support AMQP. For more information, see Partitioned messaging entities and AMQP 1.0 support for Service Bus partitioned queues and topics.
Downloading the AMQP 1.0 JMS client library
For information about where to download the latest version of the Apache Qpid JMS AMQP 1.0 client library, visit https://qpid.apache.org/download.html.
You must add the following JAR files from the Apache Qpid JMS AMQP 1.0 distribution archive to the Java CLASSPATH when building and running JMS applications with Service Bus:
- geronimo-jms_1.1_spec-1.0.jar
- qpid-jms-client-[version].jar
Note
JMS JAR names and versions may have changed. For details, see Qpid JMS - AMQP 1.0.
Coding Java applications
Java Naming and Directory Interface (JNDI)
JMS uses the Java Naming and Directory Interface (JNDI) to create a separation between logical names and physical names. Two types of JMS objects are resolved using JNDI: ConnectionFactory and Destination. JNDI uses a provider model into which you can plug different directory services to handle name resolution duties. The Apache Qpid JMS AMQP 1.0 library comes with a simple property file-based JNDI Provider that is configured using a properties file of the following format:
# servicebus.properties - sample JNDI configuration
# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://[SASPolicyName]:[SASPolicyKey]@[namespace].servicebus.windows.net
# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.QUEUE = queue1
Setup JNDI context and Configure the ConnectionFactory
The ConnectionString referenced in the one available in the 'Shared Access Policies' in the Azure portal under Primary Connection String
// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus
// connection string.
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
// set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");
Configure Producer and Consumer Destination Queues
The entry used to define a destination in the Qpid properties file JNDI provider is of the following format:
To create the destination queue for the Producer -
String queueName = "queueName";
Destination queue = (Destination) queueName;
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create Producer
MessageProducer producer = session.createProducer(queue);
To create a destination queue for the Consumer -
String queueName = "queueName";
Destination queue = (Destination) queueName;
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Connection connection - cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create Consumer
MessageConsumer consumer = session.createConsumer(queue);
Write the JMS application
There are no special APIs or options required when using JMS with Service Bus. However, there are a few restrictions that will be covered later. As with any JMS application, the first thing required is configuration of the JNDI environment, to be able to resolve a ConnectionFactory and destinations.
Configure the JNDI InitialContext
The JNDI environment is configured by passing a hashtable of configuration information into the constructor of the javax.naming.InitialContext class. The two required elements in the hashtable are the class name of the Initial Context Factory and the Provider URL. The following code shows how to configure the JNDI environment to use the Qpid properties file based JNDI Provider with a properties file named servicebus.properties.
// set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + \
"?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
A simple JMS application using a Service Bus queue
The following example program sends JMS TextMessages to a Service Bus queue with the JNDI logical name of QUEUE, and receives the messages back.
You can all access all the source code and configuration information from the Azure Service Bus Samples JMS Queue quickstart
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import org.apache.commons.cli.*;
import org.apache.log4j.*;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
* This sample demonstrates how to send messages from a JMS Queue producer into
* an Azure Service Bus Queue, and receive them with a JMS message consumer.
* JMS Queue.
*/
public class JmsQueueQuickstart {
// Number of messages to send
private static int totalSend = 10;
//Tracking counter for how many messages have been received; used as termination condition
private static AtomicInteger totalReceived = new AtomicInteger(0);
// log4j logger
private static Logger logger = Logger.getRootLogger();
public void run(String connectionString) throws Exception {
// The connection string builder is the only part of the azure-servicebus SDK library
// we use in this JMS sample and for the purpose of robustly parsing the Service Bus
// connection string.
ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);
// set up JNDI context
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
hashtable.put("queue.QUEUE", "BasicQueue");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
// Look up queue
Destination queue = (Destination) context.lookup("QUEUE");
// we create a scope here so we can use the same set of local variables cleanly
// again to show the receive side separately with minimal clutter
{
// Create Connection
Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
// Create Session, no transaction, client ack
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create producer
MessageProducer producer = session.createProducer(queue);
// Send messages
for (int i = 0; i < totalSend; i++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(String.valueOf(i).getBytes());
producer.send(message);
System.out.printf("Sent message %d.\n", i + 1);
}
producer.close();
session.close();
connection.stop();
connection.close();
}
{
// Create Connection
Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
connection.start();
// Create Session, no transaction, client ack
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create consumer
MessageConsumer consumer = session.createConsumer(queue);
// create a listener callback to receive the messages
consumer.setMessageListener(message -> {
try {
// receives message is passed to callback
System.out.printf("Received message %d with sq#: %s\n",
totalReceived.incrementAndGet(), // increments the tracking counter
message.getJMSMessageID());
message.acknowledge();
} catch (Exception e) {
logger.error(e);
}
});
// wait on the main thread until all sent messages have been received
while (totalReceived.get() < totalSend) {
Thread.sleep(1000);
}
consumer.close();
session.close();
connection.stop();
connection.close();
}
System.out.printf("Received all messages, exiting the sample.\n");
System.out.printf("Closing queue client.\n");
}
public static void main(String[] args) {
System.exit(runApp(args, (connectionString) -> {
JmsQueueQuickstart app = new JmsQueueQuickstart();
try {
app.run(connectionString);
return 0;
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 1;
}
}));
}
static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";
public static int runApp(String[] args, Function<String, Integer> run) {
try {
String connectionString = null;
// parse connection string from command line
Options options = new Options();
options.addOption(new Option("c", true, "Connection string"));
CommandLineParser clp = new DefaultParser();
CommandLine cl = clp.parse(options, args);
if (cl.getOptionValue("c") != null) {
connectionString = cl.getOptionValue("c");
}
// get overrides from the environment
String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
if (env != null) {
connectionString = env;
}
if (connectionString == null) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("run jar with", "", options, "", true);
return 2;
}
return run.apply(connectionString);
} catch (Exception e) {
System.out.printf("%s", e.toString());
return 3;
}
}
}
Run the application
Pass the Connection String from the Shared Access Policies to run the application. Below is the output of the form by running the Application:
> mvn clean package
>java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"
Sent message 1.
Sent message 2.
Sent message 3.
Sent message 4.
Sent message 5.
Sent message 6.
Sent message 7.
Sent message 8.
Sent message 9.
Sent message 10.
Received message 1 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-1
Received message 2 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-2
Received message 3 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-3
Received message 4 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-4
Received message 5 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-5
Received message 6 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-6
Received message 7 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-7
Received message 8 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-8
Received message 9 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-9
Received message 10 with sq#: ID:7f6a7659-bcdf-4af6-afc1-4011e2ddcb3c:1:1:1-10
Received all messages, exiting the sample.
Closing queue client.
AMQP disposition and Service Bus operation mapping
Here is how an AMQP disposition translates to a Service Bus operation:
ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()
JMS Topics vs. Service Bus Topics
Using Azure Service Bus topics and subscriptions through the Java Message Service (JMS) API provides basic send and receive capabilities. It's a convenient choice when porting applications from other message brokers with JMS-compliant APIs, even though Service Bus topics differ from JMS Topics and require a few adjustments.
Azure Service Bus topics route messages into named, shared, durable subscriptions that are managed through the Azure Resource Management interface, the Azure command-line tools, or through the Azure portal. Each subscription allows for up to 2000 selection rules, each of which may have a filter condition and, for SQL filters, also a metadata transformation action. Each filter condition match selects the input message to be copied into the subscription.
Receiving messages from subscriptions is identical receiving messages from queues. Each subscription has an associated dead-letter queue and the ability to automatically forward messages to another queue or topics.
JMS Topics allow clients to dynamically create nondurable and durable subscribers that optionally allow filtering messages with message selectors. These unshared entities aren't supported by Service Bus. The SQL filter rule syntax for Service Bus is, however, similar to the message selector syntax supported by JMS.
The JMS Topic publisher side is compatible with Service Bus, as shown in this sample, but dynamic subscribers aren't. The following topology-related JMS APIs aren't supported with Service Bus.
Unsupported features and restrictions
The following restrictions exist when using JMS over AMQP 1.0 with Service Bus, namely:
- Only one MessageProducer or MessageConsumer is allowed per Session. If you need to create multiple MessageProducers or MessageConsumers in an application, create a dedicated Session for each of them.
- Volatile topic subscriptions aren't currently supported.
- MessageSelectors aren't currently supported.
- Distributed transactions aren't supported (but transacted sessions are supported).
Additionally, Azure Service Bus splits the control plane from the data plane and therefore does not support several of JMS's dynamic topology functions:
| Unsupported method | Replace with |
|---|---|
| createDurableSubscriber | create a Topic subscription porting the message selector |
| createDurableConsumer | create a Topic subscription porting the message selector |
| createSharedConsumer | Service Bus topics are always shareable, see above |
| createSharedDurableConsumer | Service Bus topics are always shareable, see above |
| createTemporaryTopic | create a topic via management API/tools/portal with AutoDeleteOnIdle set to an expiration period |
| createTopic | create a topic via management API/tools/portal |
| unsubscribe | delete the topic management API/tools/portal |
| createBrowser | unsupported. Use the Peek() functionality of the Service Bus API |
| createQueue | create a queue via management API/tools/portal |
| createTemporaryQueue | create a queue via management API/tools/portal with AutoDeleteOnIdle set to an expiration period |
| receiveNoWait | use the receive() method provided by the Service Bus SDK and specify a very low or zero timeout |
Summary
This how-to guide showed how to use Service Bus brokered messaging features (queues and publish/subscribe topics) from Java using the popular JMS API and AMQP 1.0.
You can also use Service Bus AMQP 1.0 from other languages, including .NET, C, Python, and PHP. Components built using these different languages can exchange messages reliably and at full fidelity using the AMQP 1.0 support in Service Bus.




