Berichten verzenden naar een Azure Service Bus-onderwerp en berichten ontvangen van abonnementen op het onderwerp (Java)

In deze quickstart schrijft u Java-code die het pakket azure-messaging-servicebus gebruikt voor het verzenden van berichten naar een Azure Service Bus-onderwerp en ontvangt u vervolgens berichten van abonnementen op dit onderwerp.

Notitie

Deze quickstart bevat stapsgewijs instructies voor een eenvoudig scenario voor het verzenden van een batch berichten naar een Service Bus-onderwerp en het ontvangen van die berichten van een abonnement van het onderwerp. U vindt vooraf gebouwde Java-voorbeelden voor Azure Service Bus in de azure SDK voor Java-opslagplaats op GitHub.

Vereisten

Berichten verzenden naar een onderwerp

In deze sectie maakt u een Java-consoleproject en voegt u code toe om berichten te verzenden naar het onderwerp dat u hebt gemaakt.

Een Java-consoleproject maken

Maak een Java-project met Eclipse of een hulpprogramma van uw keuze.

Uw toepassing configureren voor het gebruik van Service Bus

Voeg verwijzingen toe naar Azure Core- en Azure Service Bus bibliotheken.

Als u Eclipse gebruikt en een Java-consoletoepassing hebt gemaakt, converteert u uw Java-project naar een Maven: klik met de rechtermuisknop op het project in het venster Package Explorer en selecteer Converteren naar -> Maven-project configureren. Voeg vervolgens afhankelijkheden toe aan deze twee bibliotheken, zoals wordt weergegeven in het volgende voorbeeld.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.myorg.sbusquickstarts</groupId>
    <artifactId>sbustopicqs</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <release>15</release>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.0.2</version>
        </dependency>
    </dependencies>
</project>

Code toevoegen om berichten naar het onderwerp te verzenden

  1. Voeg de volgende import-instructies toe aan het onderwerp van het Java-bestand.

    import com.azure.messaging.servicebus.*;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. In de klasse definieert u de variabelen die de verbindingstekenreeks en onderwerpnaam bevatten, zoals hieronder wordt weergegeven:

    static String connectionString = "<NAMESPACE CONNECTION STRING>";
    static String topicName = "<TOPIC NAME>";    
    static String subName = "<SUBSCRIPTION NAME>";
    

    Vervang <NAMESPACE CONNECTION STRING> door de verbindingstekenreeks voor uw Service Bus-naamruimte. Vervang <TOPIC NAME> door de naam van het onderwerp.

  3. Voeg in de klasse een methode toe met de naam sendMessage om een bericht naar het onderwerp te verzenden.

    static void sendMessage()
    {
        // create a Service Bus Sender client for the queue 
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // send one message to the topic
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the topic: " + topicName);        
    }
    
  4. Voeg in de klasse een methode toe met de naam createMessages om een lijst met berichten te maken. Normaal gesproken ontvangt u deze berichten van verschillende onderdelen van uw toepassing. Hier maken we een lijst met voorbeeldberichten.

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. Voeg een methode toe met de naam sendMessageBatch om berichten te verzenden naar het onderwerp dat u hebt gemaakt. Met deze methode maakt u een ServiceBusSenderClient voor het onderwerp, roept u de methode createMessages aan om de lijst met berichten op te halen, bereidt u een of meer batches voor en stuurt u de batch(es) naar het onderwerp.

    static void sendMessageBatch()
    {
        // create a Service Bus Sender client for the topic 
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();        
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.        
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
        }
    
        //close the client
        senderClient.close();
    }
    

Berichten van een abonnement ontvangen

In deze sectie voegt u code toe om berichten uit een abonnement op het onderwerp op te halen.

  1. Voeg een methode toe met de naam receiveMessages om berichten van het abonnement te ontvangen. Met deze methode maakt u een ServiceBusProcessorClient voor het abonnement door een handler op te geven voor de verwerking van berichten en een andere voor het afhandelen van fouten. Vervolgens wordt de verwerker gestart, wordt er een paar seconden gewacht, worden de ontvangen berichten getoond en wordt de verwerker gestopt en gesloten.

    Belangrijk

    Vervang ServiceBusTopicTest in in de code door de naam van uw ServiceBusTopicTest::processMessage klasse.

    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        CountDownLatch countdownLatch = new CountDownLatch(1);
    
        // Create an instance of the processor through the ServiceBusClientBuilder
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .processor()
            .topicName(topicName)
            .subscriptionName(subName)
            .processMessage(ServiceBusTopicTest::processMessage)
            .processError(context -> processError(context, countdownLatch))
            .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();        
    }  
    
  2. Voeg de processMessage methode toe om een bericht te verwerken dat is ontvangen van Service Bus abonnement.

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }    
    
  3. Voeg de methode processError toe om foutberichten te verwerken.

    private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
    
            countdownLatch.countDown();
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }  
    
  4. Werk de methode main bij om de methoden sendMessage, sendMessageBatchen receiveMessages aan te roepen en InterruptedException te genereren.

    public static void main(String[] args) throws InterruptedException {        
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }   
    

De app uitvoeren

Voer het programma uit om te controleren of the uitvoer lijkt op de volgende uitvoer:

Sent a single message to the topic: mytopic
Sent a batch of messages to the topic: mytopic
Starting the processor
Processing message. Session: e0102f5fbaf646988a2f4b65f7d32385, Sequence #: 1. Contents: Hello, World!
Processing message. Session: 3e991e232ca248f2bc332caa8034bed9, Sequence #: 2. Contents: First message
Processing message. Session: 56d3a9ea7df446f8a2944ee72cca4ea0, Sequence #: 3. Contents: Second message
Processing message. Session: 7bd3bd3e966a40ebbc9b29b082da14bb, Sequence #: 4. Contents: Third message

Op de pagina Overzicht voor de Service Bus-naamruimte in Azure Portal ziet u het aantal inkomende en uitgaande berichten. Mogelijk moet u ongeveer een minuut wachten en de pagina vervolgens vernieuwen om de meest recente waarden te zien.

Aantal inkomende en uitgaande berichten

Ga naar het tabblad Onderwerpen in het venster midden-onder en selecteer het onderwerp om de pagina Service Bus-onderwerp te bekijken voor uw onderwerp. Op deze pagina ziet u vier inkomende en vier uitgaande berichten in de grafiek Berichten.

Inkomende en uitgaande berichten

Als u de aanroep receiveMessages met een commentaarteken uitschakelt in de methode main en de app opnieuw uitvoert, ziet u op de pagina Service Bus-onderwerp 8 inkomende berichten (4 nieuwe) maar vier uitgaande berichten.

Bijgewerkte onderwerppagina

Als u op deze pagina een abonnement selecteert, krijgt u toegang tot de pagina Service Bus-abonnement. Op deze pagina ziet u het aantal actieve berichten, het aantal onbestelbare berichten, en meer. In dit voorbeeld zijn er vier actieve berichten die nog niet zijn ontvangen door een ontvanger.

Aantal actieve berichten

Volgende stappen

Raadpleeg de volgende documentatie en voorbeelden: