Azure Service Bus konu başlığına ileti gönderme ve aboneliklerden konuya ileti alma (Java)
Bu hızlı başlangıçta, bir Azure Service Bus konu başlığına ileti göndermek için azure-messaging-servicebus paketini kullanarak Java kodu yazacak ve ardından aboneliklerden bu konuya ileti alacaksınız.
Not
Bu hızlı başlangıç, bir konu başlığına toplu ileti gönderme ve bu iletileri konu aboneliğinden Service Bus basit bir senaryo için adım adım yönergeler sağlar. Azure sdk'sı için önceden Service Bus Javaörneklerini GitHub.
Önkoşullar
- Azure aboneliği. Bu öğreticiyi tamamlamak için bir Azure hesabınızın olması gerekir. Aboneliğinizi veya MSDN Visual Studio avantajlarını etkinleştirebilirsiniz veya ücretsiz bir hesap için kaydolabilirsiniz.
- Hızlı Başlangıç: Konusuyla ilgili Azure portal ve abonelikler Service Bus için hızlı başlangıç adımlarını izleyin. Bağlantı dizesini, konu adını ve abonelik adını not edin. Bu hızlı başlangıç için yalnızca bir abonelik kullanasınız.
- Java için Azure SDK'yı yükleyin. Eclipse kullanıyorsanız, Java için Azure SDK'Azure Toolkit for Eclipse yükleme yüklemelerini yükleyebilirsiniz. Ardından Java için Microsoft Azure Kitaplıkları'nın projenize eklerini eklediniz. IntelliJ kullanıyorsanız bkz. Azure Toolkit for IntelliJ.
Konu başlığına ileti gönderme
Bu bölümde, bir Java konsol projesi oluşturacak ve oluşturduğunuz konuya ileti göndermek için kod eksersiniz.
Java konsol projesi oluşturma
Eclipse'i veya kendi tercihi olan bir aracı kullanarak bir Java projesi oluşturun.
Service Bus'yi kullanmak için Service Bus
Azure Core ve Azure depolama kitaplıklarına Service Bus ekleyin.
Eclipse kullanıyorsanız ve bir Java konsol uygulaması oluşturduysanız, Java projenizi Maven'a dönüştür: Paket Gezgini penceresinde projeye sağ tıklayın ve Maven projesine -> Dönüştür'leri Yapılandır'ı seçin. Ardından, aşağıdaki örnekte gösterildiği gibi bu iki kitaplıka bağımlılıklar ekleyin.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.myorg.sbusquickstarts</groupId>
<artifactId>sbustopicqs</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>15</release>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.0.2</version>
</dependency>
</dependencies>
</project>
Konuya ileti göndermek için kod ekleme
Java dosyasının
importkonu başlığına aşağıdaki deyimlerini ekleyin.import com.azure.messaging.servicebus.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.Arrays; import java.util.List;sınıfında, aşağıda gösterildiği gibi bağlantı dizesini ve konu adını tutmak için değişkenleri tanımlayın:
static String connectionString = "<NAMESPACE CONNECTION STRING>"; static String topicName = "<TOPIC NAME>"; static String subName = "<SUBSCRIPTION NAME>";yerine
<NAMESPACE CONNECTION STRING>ad alanınıza bağlantı dizesini Service Bus değiştirin. ve yerine<TOPIC NAME>konunun adını yazın.Konuya bir ileti
sendMessagegöndermek için sınıfında adlı bir yöntem ekleyin.static void sendMessage() { // create a Service Bus Sender client for the queue ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .topicName(topicName) .buildClient(); // send one message to the topic senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.println("Sent a single message to the topic: " + topicName); }İleti listesi oluşturmak
createMessagesiçin sınıfında adlı bir yöntem ekleyin. Genellikle, bu iletileri uygulamanın farklı kısımlarından alırsınız. Burada, örnek iletilerin bir listesini oluşturuz.static List<ServiceBusMessage> createMessages() { // create a list of messages and return it to the caller ServiceBusMessage[] messages = { new ServiceBusMessage("First message"), new ServiceBusMessage("Second message"), new ServiceBusMessage("Third message") }; return Arrays.asList(messages); }Oluşturduğunuz konuya ileti
sendMessageBatchgöndermek için method adlı bir yöntem ekleyin. Bu yöntem konu için bir oluşturur, ileti listesini almak için yöntemini çağırır, bir veya daha fazla toplu iş hazırlar ve topluServiceBusSenderClientcreateMessagesişleri konuya gönderir.static void sendMessageBatch() { // create a Service Bus Sender client for the topic ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .topicName(topicName) .buildClient(); // Creates an ServiceBusMessageBatch where the ServiceBus. ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch(); // create a list of messages List<ServiceBusMessage> listOfMessages = createMessages(); // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all // messages are sent. for (ServiceBusMessage message : listOfMessages) { if (messageBatch.tryAddMessage(message)) { continue; } // The batch is full, so we create a new batch and send the batch. senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the topic: " + topicName); // create a new batch messageBatch = senderClient.createMessageBatch(); // Add that message that we couldn't before. if (!messageBatch.tryAddMessage(message)) { System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes()); } } if (messageBatch.getCount() > 0) { senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the topic: " + topicName); } //close the client senderClient.close(); }
Abonelikten ileti alma
Bu bölümde, bir abonelikten konuya ileti almak için kod eksersiniz.
Abonelikten ileti almak
receiveMessagesiçin adlı bir yöntem ekleyin. Bu yöntem, iletilerin işlenmesi için bir işleyici ve hataların işlenmesi için başka bir işleyiciServiceBusProcessorClientbelirterek abonelik için bir oluşturur. Ardından işlemciyi başlatır, birkaç saniye bekler, alınan iletileri yazdırır, ardından işlemciyi durdurur ve kapatır.Önemli
kodundaki
ServiceBusTopicTestServiceBusTopicTest::processMessageyerine sınıfını yazın.// handles received messages static void receiveMessages() throws InterruptedException { CountDownLatch countdownLatch = new CountDownLatch(1); // Create an instance of the processor through the ServiceBusClientBuilder ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .connectionString(connectionString) .processor() .topicName(topicName) .subscriptionName(subName) .processMessage(ServiceBusTopicTest::processMessage) .processError(context -> processError(context, countdownLatch)) .buildProcessorClient(); System.out.println("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.println("Stopping and closing the processor"); processorClient.close(); }Service Bus
processMessageaboneliğinden alınan iletiyi işleme yöntemini ekleyin.private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }Hata iletilerini
processErrorişlemek için yöntemini ekleyin.private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (!(context.getException() instanceof ServiceBusException)) { System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); return; } ServiceBusException exception = (ServiceBusException) context.getException(); ServiceBusFailureReason reason = exception.getReason(); if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) { System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", reason, exception.getMessage()); countdownLatch.countDown(); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { System.out.printf("Message lock lost for message: %s%n", context.getException()); } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { try { // Choosing an arbitrary amount of time to wait until trying again. TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { System.err.println("Unable to sleep for period of time"); } } else { System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(), reason, context.getException()); } }, ve
mainmetotlarınısendMessageçağırmak için yönteminisendMessageBatchreceiveMessagesgüncelleştirin ve 'i atacak şekildeInterruptedExceptiongüncelleştirin.public static void main(String[] args) throws InterruptedException { sendMessage(); sendMessageBatch(); receiveMessages(); }
Uygulamayı çalıştırma
Aşağıdaki çıkışa benzer bir çıktıyı görmek için programı çalıştırın:
Sent a single message to the topic: mytopic
Sent a batch of messages to the topic: mytopic
Starting the processor
Processing message. Session: e0102f5fbaf646988a2f4b65f7d32385, Sequence #: 1. Contents: Hello, World!
Processing message. Session: 3e991e232ca248f2bc332caa8034bed9, Sequence #: 2. Contents: First message
Processing message. Session: 56d3a9ea7df446f8a2944ee72cca4ea0, Sequence #: 3. Contents: Second message
Processing message. Session: 7bd3bd3e966a40ebbc9b29b082da14bb, Sequence #: 4. Contents: Third message
Service Bus ad Azure portal genel bakış sayfasında gelen ve giden ileti sayısını görebilirsiniz. En son değerleri görmek için bir dakika kadar beklemeniz ve sayfayı yenilemeniz gerekir.
Orta alt bölmede Konular sekmesine geçiş yapın ve konu başlığınıza ilişkin Service Bus sayfasını görmek için konuyu seçin. Bu sayfada, İletiler grafiğinde dört gelen ve dört giden ileti görüyorsanız.
yönteminde çağrısına açıklama ekleme ve uygulamayı yeniden çalıştırma, Service Bus Konu sayfasında 8 gelen ileti receiveMessages main (4 yeni) ama dört giden ileti olduğunu görebilirsiniz.
Bu sayfada, bir abonelik seçersiniz, Abonelik sayfasında Service Bus edinebilirsiniz. Bu sayfada etkin ileti sayısını, ileti sayısını ve daha fazlasını görebilirsiniz. Bu örnekte, henüz bir alıcı tarafından alınmış dört etkin ileti vardır.
Sonraki adımlar
Aşağıdaki belgelere ve örneklere bakın: