Odesílání zpráv do tématu azure Service Bus a příjem zpráv z odběrů do tématu (Java)

V tomto rychlém startu napíšete kód Java pomocí balíčku azure-messaging-servicebus, který bude odesílat zprávy do tématu Azure Service Bus a pak přijímat zprávy z odběrů tohoto tématu.

Poznámka

Tento rychlý start obsahuje podrobné pokyny pro jednoduchý scénář odesílání dávky zpráv do Service Bus tématu a příjem těchto zpráv z odběru tématu. Předdefinované ukázky v Javě pro Azure Service Bus najdete v úložišti Azure SDK pro Javu na GitHub.

Požadavky

Odeslání zprávy do tématu

V této části vytvoříte projekt konzoly Java a přidáte kód pro odesílání zpráv do tématu, které jste vytvořili.

Vytvoření projektu konzoly Java

Vytvořte projekt v Javě pomocí Eclipse nebo nástroje podle vašeho výběru.

Konfigurace aplikace pro použití Service Bus

Přidání odkazů na knihovny Azure Core Service Bus Azure

Pokud používáte Eclipse a vytvořili jste konzolovou aplikaci Java, převeďte projekt Java na Maven: v okně Průzkumník balíčků klikněte pravým tlačítkem na projekt a vyberte Konfigurovat projekt -> Convert to Maven. Pak do těchto dvou knihoven přidejte závislosti, jak je znázorněno v následujícím příkladu.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.myorg.sbusquickstarts</groupId>
    <artifactId>sbustopicqs</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <release>15</release>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.0.2</version>
        </dependency>
    </dependencies>
</project>

Přidání kódu pro odesílání zpráv do tématu

  1. Přidejte import následující příkazy do tématu souboru Java.

    import com.azure.messaging.servicebus.*;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. Ve třídě definujte proměnné pro připojovací řetězec a název tématu, jak je znázorněno níže:

    static String connectionString = "<NAMESPACE CONNECTION STRING>";
    static String topicName = "<TOPIC NAME>";    
    static String subName = "<SUBSCRIPTION NAME>";
    

    Nahraďte <NAMESPACE CONNECTION STRING> připojovacím řetězcem pro váš Service Bus názvů. A <TOPIC NAME> nahraďte názvem tématu.

  3. Přidejte metodu sendMessage s názvem ve třídě pro odeslání jedné zprávy do tématu.

    static void sendMessage()
    {
        // create a Service Bus Sender client for the queue 
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // send one message to the topic
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the topic: " + topicName);        
    }
    
  4. Přidejte createMessages metodu s názvem ve třídě pro vytvoření seznamu zpráv. Tyto zprávy obvykle získáte z různých částí aplikace. Tady vytvoříme seznam ukázkových zpráv.

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. Přidejte metodu sendMessageBatch s názvem method, která bude odesílat zprávy do tématu, které jste vytvořili. Tato metoda vytvoří pro téma , vyvolá metodu pro získání seznamu zpráv, připraví jednu nebo více dávek a odešle dávky ServiceBusSenderClient createMessages do tématu.

    static void sendMessageBatch()
    {
        // create a Service Bus Sender client for the topic 
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();        
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.        
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
        }
    
        //close the client
        senderClient.close();
    }
    

Příjem zpráv z odběru

V této části přidáte kód pro načtení zpráv z odběru tématu.

  1. Přidejte metodu s receiveMessages názvem pro příjem zpráv z odběru. Tato metoda vytvoří pro odběr zadáním obslužné rutiny pro zpracování zpráv a další pro ServiceBusProcessorClient zpracování chyb. Potom spustí procesor, několik sekund počká, vytiskne přijaté zprávy a pak zastaví a zavře procesor.

    Důležité

    Nahraďte ServiceBusTopicTest v kódu názvem vaší ServiceBusTopicTest::processMessage třídy.

    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        CountDownLatch countdownLatch = new CountDownLatch(1);
    
        // Create an instance of the processor through the ServiceBusClientBuilder
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .processor()
            .topicName(topicName)
            .subscriptionName(subName)
            .processMessage(ServiceBusTopicTest::processMessage)
            .processError(context -> processError(context, countdownLatch))
            .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();        
    }  
    
  2. Přidejte processMessage metodu pro zpracování zprávy přijaté z Service Bus předplatného.

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }    
    
  3. Přidejte processError metodu pro zpracování chybových zpráv.

    private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
    
            countdownLatch.countDown();
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }  
    
  4. Aktualizujte main metodu sendMessage tak, aby vyvolala sendMessageBatch , a metody a receiveMessages vyvolala InterruptedException .

    public static void main(String[] args) throws InterruptedException {        
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }   
    

Spuštění aplikace

Spuštěním programu zobrazte výstup podobný následujícímu výstupu:

Sent a single message to the topic: mytopic
Sent a batch of messages to the topic: mytopic
Starting the processor
Processing message. Session: e0102f5fbaf646988a2f4b65f7d32385, Sequence #: 1. Contents: Hello, World!
Processing message. Session: 3e991e232ca248f2bc332caa8034bed9, Sequence #: 2. Contents: First message
Processing message. Session: 56d3a9ea7df446f8a2944ee72cca4ea0, Sequence #: 3. Contents: Second message
Processing message. Session: 7bd3bd3e966a40ebbc9b29b082da14bb, Sequence #: 4. Contents: Third message

Na stránce Přehled oboru názvů Service Bus v části Azure Portal uvidíte počet příchozích a odchozích zpráv. Možná budete muset asi minutu počkat a pak stránku aktualizovat, abyste viděli nejnovější hodnoty.

Počet příchozích a odchozích zpráv

V prostředním dolním podokně přepněte na kartu Témata a výběrem tématu zobrazte stránku Service Bus tématu pro vaše téma. Na této stránce by se v grafu Zprávy měly zobrazit čtyři příchozí a čtyři odchozí zprávy.

Příchozí a odchozí zprávy

Pokud zakomentujete volání v metodě a aplikaci znovu spustíte, na stránce Service Bus Topic uvidíte 8 příchozích receiveMessages main zpráv (4 nové), ale čtyři odchozí zprávy.

Aktualizovaná stránka tématu

Pokud na této stránce vyberete předplatné, dostanete se na stránku Service Bus předplatného. Na této stránce se můžete podívat na počet aktivních zpráv, počet zpráv bez zprávy a další informace. V tomto příkladu jsou čtyři aktivní zprávy, které příjemce ještě neobdrží.

Počet aktivních zpráv

Další kroky

Projděte si následující dokumentaci a ukázky: