Azure depolama kuyruklarında (Java) azure Service Bus gönderme ve ileti alma
Bu hızlı başlangıçta, azure depolama kuyruğuna ileti göndermek ve bu kuyruktan ileti almak için bir Java Service Bus oluşturabilirsiniz.
Not
Bu hızlı başlangıç, basit bir kuyruğa ileti gönderme ve bunları alma senaryosu için Service Bus adım yönergeler sağlar. Azure sdk'sı için önceden Service Bus Javaörneklerini GitHub.
Önkoşullar
- Azure aboneliği. Bu öğreticiyi tamamlamak için bir Azure hesabınızın olması gerekir. MSDN abone avantajlarınızı etkinleştirebilirsiniz veya ücretsiz bir hesap için kaydolabilirsiniz.
- Üzerinde çalışılacak bir kuyruğuz yoksa, kuyruk oluşturmak için Azure portal kuyruğu oluşturmak için Service Bus kullanma makalesinde yer alan adımları izleyin. Oluşturduğunuz kuyruğun adını ve Service Bus için bağlantı dizesini not edin.
- Java için Azure SDK'yı yükleyin. Eclipse kullanıyorsanız, Java için Azure SDK'Azure Toolkit for Eclipse azure SDK'sı içeren uygulamaları yükleyebilirsiniz. Ardından Projenize Java için Microsoft Azure Kitaplıkları'nın ekini eklediniz. IntelliJ kullanıyorsanız bkz. Azure Toolkit for IntelliJ.
Kuyruğa ileti gönderme
Bu bölümde bir Java konsol projesi oluşturacak ve daha önce oluşturduğunuz kuyruğa ileti göndermek için kod eksersiniz.
Java konsol projesi oluşturma
Eclipse'i veya kendi tercihi olan bir aracı kullanarak bir Java projesi oluşturun.
Service Bus'yi kullanmak için Service Bus
Azure Core ve Azure depolama kitaplıklarına Service Bus ekleyin.
Eclipse kullanıyorsanız ve bir Java konsol uygulaması oluşturduysanız, Java projenizi Maven'a dönüştür: Paket Gezgini penceresinde projeye sağ tıklayın ve Maven projesine -> Dönüştür'leri Yapılandır'ı seçin. Ardından, aşağıdaki örnekte gösterildiği gibi bu iki kitaplıka bağımlılıklar ekleyin.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.myorg.sbusquickstarts</groupId>
<artifactId>sbustopicqs</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>15</release>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.0.2</version>
</dependency>
</dependencies>
</project>
Kuyruğa ileti göndermek için kod ekleme
Java dosyasının
importkonu başlığına aşağıdaki deyimlerini ekleyin.import com.azure.messaging.servicebus.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.Arrays; import java.util.List;sınıfında, aşağıda gösterildiği gibi bağlantı dizesini ve kuyruk adını tutmak için değişkenleri tanımlayın:
static String connectionString = "<NAMESPACE CONNECTION STRING>"; static String queueName = "<QUEUE NAME>";yerine
<NAMESPACE CONNECTION STRING>ad alanınıza bağlantı dizesini Service Bus değiştirin. ve yerine<QUEUE NAME>kuyruğun adını yazın.Kuyruğa
sendMessagebir ileti göndermek için sınıfında adlı bir yöntem ekleyin.static void sendMessage() { // create a Service Bus Sender client for the queue ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .queueName(queueName) .buildClient(); // send one message to the queue senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.println("Sent a single message to the queue: " + queueName); }İleti listesi oluşturmak
createMessagesiçin sınıfında adlı bir yöntem ekleyin. Genellikle, bu iletileri uygulamanın farklı kısımlarından alırsınız. Burada, örnek iletilerin bir listesini oluşturuz.static List<ServiceBusMessage> createMessages() { // create a list of messages and return it to the caller ServiceBusMessage[] messages = { new ServiceBusMessage("First message"), new ServiceBusMessage("Second message"), new ServiceBusMessage("Third message") }; return Arrays.asList(messages); }Oluşturduğunuz kuyruğa
sendMessageBatchileti göndermek için method adlı bir yöntem ekleyin. Bu yöntem kuyruk için bir oluşturur, ileti listesini almak için yöntemini çağırır, bir veya daha fazla toplu iş hazırlar ve topluServiceBusSenderClientcreateMessagesişleri kuyruğa gönderir.static void sendMessageBatch() { // create a Service Bus Sender client for the queue ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .queueName(queueName) .buildClient(); // Creates an ServiceBusMessageBatch where the ServiceBus. ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch(); // create a list of messages List<ServiceBusMessage> listOfMessages = createMessages(); // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all // messages are sent. for (ServiceBusMessage message : listOfMessages) { if (messageBatch.tryAddMessage(message)) { continue; } // The batch is full, so we create a new batch and send the batch. senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the queue: " + queueName); // create a new batch messageBatch = senderClient.createMessageBatch(); // Add that message that we couldn't before. if (!messageBatch.tryAddMessage(message)) { System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes()); } } if (messageBatch.getCount() > 0) { senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the queue: " + queueName); } //close the client senderClient.close(); }
Kuyruktan ileti alma
Bu bölümde kuyruktan iletileri almak için kod eksersiniz.
Kuyruktan iletileri almak
receiveMessagesiçin adlı bir yöntem ekleyin. Bu yöntem, iletilerin işlenmesi için bir işleyici ve hataların işlenmesi için başka bir işleyiciServiceBusProcessorClientbelirterek kuyruk için bir oluşturur. Ardından işlemciyi başlatır, birkaç saniye bekler, alınan iletileri yazdırır, ardından işlemciyi durdurur ve kapatır.Önemli
kodundaki
QueueTestQueueTest::processMessageyerine sınıfını yazın.// handles received messages static void receiveMessages() throws InterruptedException { CountDownLatch countdownLatch = new CountDownLatch(1); // Create an instance of the processor through the ServiceBusClientBuilder ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .connectionString(connectionString) .processor() .queueName(queueName) .processMessage(QueueTest::processMessage) .processError(context -> processError(context, countdownLatch)) .buildProcessorClient(); System.out.println("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.println("Stopping and closing the processor"); processorClient.close(); }Service Bus
processMessageaboneliğinden alınan iletiyi işleme yöntemini ekleyin.private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }Hata iletilerini
processErrorişlemek için yöntemini ekleyin.private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (!(context.getException() instanceof ServiceBusException)) { System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); return; } ServiceBusException exception = (ServiceBusException) context.getException(); ServiceBusFailureReason reason = exception.getReason(); if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) { System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", reason, exception.getMessage()); countdownLatch.countDown(); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { System.out.printf("Message lock lost for message: %s%n", context.getException()); } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { try { // Choosing an arbitrary amount of time to wait until trying again. TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { System.err.println("Unable to sleep for period of time"); } } else { System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(), reason, context.getException()); } }, ve
mainmetotlarınısendMessageçağırmak için yönteminisendMessageBatchreceiveMessagesgüncelleştirin ve 'i atacak şekildeInterruptedExceptiongüncelleştirin.public static void main(String[] args) throws InterruptedException { sendMessage(); sendMessageBatch(); receiveMessages(); }
Uygulamayı çalıştırma
Uygulamayı çalıştırarak konsol penceresinde aşağıdaki iletileri görebilirsiniz.
Sent a single message to the queue: myqueue
Sent a batch of messages to the queue: myqueue
Starting the processor
Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World!
Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message
Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message
Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message
Stopping and closing the processor
Service Bus ad Azure portal genel bakış sayfasında gelen ve giden ileti sayısını görebilirsiniz. En son değerleri görmek için bir dakika kadar beklemeniz ve sayfayı yenilemeniz gerekir.
Bu Genel Bakış sayfasında kuyruğu seçerek Kuyruk Service Bus gidin. Bu sayfada gelen ve giden ileti sayısını da görebilirsiniz. Ayrıca kuyruğun geçerli boyutu, maksimum boyut, etkin ileti sayısı gibi diğer bilgileri de görüyorsunuz.
Sonraki Adımlar
Aşağıdaki belgelere ve örneklere bakın: