Odesílání a příjem zpráv z front Service Bus Azure (Java)
V tomto rychlém startu vytvoříte aplikaci v Javě, která bude odesílat zprávy do fronty azure a přijímat je z Service Bus Azure.
Poznámka
Tento rychlý start obsahuje podrobné pokyny pro jednoduchý scénář odesílání zpráv do Service Bus fronty a jejich příjmu. Předdefinované ukázky v Javě pro Azure Service Bus najdete v úložišti Azure SDK pro Javu na GitHub.
Požadavky
- Předplatné Azure. K dokončení tohoto kurzu potřebujete mít účet Azure. Můžete si aktivovat výhody předplatitele MSDN nebo si zaregistrovat bezplatný účet.
- Pokud nemáte frontu, se kterou můžete pracovat, postupujte podle kroků v článku Vytvoření fronty pomocí Azure Portal k Service Bus fronty. Poznamenejte si připojovací řetězec pro váš Service Bus názvů a název fronty, kterou jste vytvořili.
- Nainstalujte sadu Azure SDK pro Javu. Pokud používáte Eclipse, můžete si nainstalovat Azure Toolkit pro Eclipse, která zahrnuje sadu Azure SDK pro Javu. Pak můžete do projektu Microsoft Azure knihovny pro Javu. Pokud používáte IntelliJ, najdete informace v tématu Instalace azure Toolkit pro IntelliJ.
Zasílání zpráv do fronty
V této části vytvoříte projekt konzoly Java a přidáte kód pro odesílání zpráv do fronty, kterou jste vytvořili dříve.
Vytvoření projektu konzoly Java
Vytvořte projekt v Javě pomocí Eclipse nebo nástroje podle vašeho výběru.
Konfigurace aplikace pro použití Service Bus
Přidání odkazů na knihovny Azure Core Service Bus Azure
Pokud používáte Eclipse a vytvořili jste konzolovou aplikaci Java, převeďte projekt Java na Maven: v okně Průzkumník balíčků klikněte pravým tlačítkem na projekt a vyberte Konfigurovat projekt -> Convert to Maven. Pak do těchto dvou knihoven přidejte závislosti, jak je znázorněno v následujícím příkladu.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.myorg.sbusquickstarts</groupId>
<artifactId>sbustopicqs</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>15</release>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.0.2</version>
</dependency>
</dependencies>
</project>
Přidání kódu pro odesílání zpráv do fronty
Přidejte
importnásledující příkazy do tématu souboru Java.import com.azure.messaging.servicebus.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.Arrays; import java.util.List;Ve třídě definujte proměnné pro připojovací řetězec a název fronty, jak je znázorněno níže:
static String connectionString = "<NAMESPACE CONNECTION STRING>"; static String queueName = "<QUEUE NAME>";Nahraďte
<NAMESPACE CONNECTION STRING>připojovacím řetězcem pro váš Service Bus názvů. A<QUEUE NAME>nahraďte názvem fronty.Přidejte metodu
sendMessages názvem ve třídě pro odeslání jedné zprávy do fronty.static void sendMessage() { // create a Service Bus Sender client for the queue ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .queueName(queueName) .buildClient(); // send one message to the queue senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.println("Sent a single message to the queue: " + queueName); }Přidejte
createMessagesmetodu s názvem ve třídě pro vytvoření seznamu zpráv. Tyto zprávy obvykle získáte z různých částí aplikace. Tady vytvoříme seznam ukázkových zpráv.static List<ServiceBusMessage> createMessages() { // create a list of messages and return it to the caller ServiceBusMessage[] messages = { new ServiceBusMessage("First message"), new ServiceBusMessage("Second message"), new ServiceBusMessage("Third message") }; return Arrays.asList(messages); }Přidejte metodu s názvem
sendMessageBatchmethod, která bude odesílat zprávy do fronty, kterou jste vytvořili. Tato metoda vytvoří pro frontu , vyvolá metodu pro získání seznamu zpráv, připraví jednu nebo více dávek a odešle dávkyServiceBusSenderClientcreateMessagesdo fronty.static void sendMessageBatch() { // create a Service Bus Sender client for the queue ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .queueName(queueName) .buildClient(); // Creates an ServiceBusMessageBatch where the ServiceBus. ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch(); // create a list of messages List<ServiceBusMessage> listOfMessages = createMessages(); // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all // messages are sent. for (ServiceBusMessage message : listOfMessages) { if (messageBatch.tryAddMessage(message)) { continue; } // The batch is full, so we create a new batch and send the batch. senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the queue: " + queueName); // create a new batch messageBatch = senderClient.createMessageBatch(); // Add that message that we couldn't before. if (!messageBatch.tryAddMessage(message)) { System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes()); } } if (messageBatch.getCount() > 0) { senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the queue: " + queueName); } //close the client senderClient.close(); }
Příjem zpráv z fronty
V této části přidáte kód pro načtení zpráv z fronty.
Přidejte metodu s
receiveMessagesnázvem pro příjem zpráv z fronty. Tato metoda vytvoří pro frontu zadáním obslužné rutiny pro zpracování zpráv a další proServiceBusProcessorClientzpracování chyb. Potom spustí procesor, několik sekund počká, vytiskne přijaté zprávy a pak zastaví a zavře procesor.Důležité
Nahraďte
QueueTestv kódu názvem vašíQueueTest::processMessagetřídy.// handles received messages static void receiveMessages() throws InterruptedException { CountDownLatch countdownLatch = new CountDownLatch(1); // Create an instance of the processor through the ServiceBusClientBuilder ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .connectionString(connectionString) .processor() .queueName(queueName) .processMessage(QueueTest::processMessage) .processError(context -> processError(context, countdownLatch)) .buildProcessorClient(); System.out.println("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.println("Stopping and closing the processor"); processorClient.close(); }Přidejte
processMessagemetodu pro zpracování zprávy přijaté z odběru Service Bus předplatného.private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }Přidejte
processErrormetodu pro zpracování chybových zpráv.private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (!(context.getException() instanceof ServiceBusException)) { System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); return; } ServiceBusException exception = (ServiceBusException) context.getException(); ServiceBusFailureReason reason = exception.getReason(); if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) { System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", reason, exception.getMessage()); countdownLatch.countDown(); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { System.out.printf("Message lock lost for message: %s%n", context.getException()); } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { try { // Choosing an arbitrary amount of time to wait until trying again. TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { System.err.println("Unable to sleep for period of time"); } } else { System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(), reason, context.getException()); } }Aktualizujte
mainmetodusendMessagetak, aby vyvolalasendMessageBatch, a metody areceiveMessagesvyvolalaInterruptedException.public static void main(String[] args) throws InterruptedException { sendMessage(); sendMessageBatch(); receiveMessages(); }
Spuštění aplikace
Při spuštění aplikace se v okně konzoly zobrazí následující zprávy.
Sent a single message to the queue: myqueue
Sent a batch of messages to the queue: myqueue
Starting the processor
Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World!
Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message
Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message
Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message
Stopping and closing the processor
Na stránce Přehled oboru názvů Service Bus v části Azure Portal uvidíte počet příchozích a odchozích zpráv. Možná budete muset asi minutu počkat a pak stránku aktualizovat, abyste viděli nejnovější hodnoty.
Výběrem fronty na této stránce Přehled přejděte na Service Bus fronty. Na této stránce se zobrazí také počet příchozích a odchozích zpráv. Zobrazí se také další informace, jako je aktuální velikost fronty, maximální velikost, počet aktivních zpráv atd.
Další kroky
Projděte si následující dokumentaci a ukázky: