Azure Service Bus konu başlığına ileti gönderme ve aboneliklerden konuya ileti alma (Java)

Bu hızlı başlangıçta, bir Azure Service Bus konu başlığına ileti göndermek için azure-messaging-servicebus paketini kullanarak Java kodu yazacak ve ardından aboneliklerden bu konuya ileti alacaksınız.

Not

Bu hızlı başlangıç, bir konu başlığına toplu ileti gönderme ve bu iletileri konu aboneliğinden Service Bus basit bir senaryo için adım adım yönergeler sağlar. Azure sdk'sı için önceden Service Bus Javaörneklerini GitHub.

Önkoşullar

Konu başlığına ileti gönderme

Bu bölümde, bir Java konsol projesi oluşturacak ve oluşturduğunuz konuya ileti göndermek için kod eksersiniz.

Java konsol projesi oluşturma

Eclipse'i veya kendi tercihi olan bir aracı kullanarak bir Java projesi oluşturun.

Service Bus'yi kullanmak için Service Bus

Azure Core ve Azure depolama kitaplıklarına Service Bus ekleyin.

Eclipse kullanıyorsanız ve bir Java konsol uygulaması oluşturduysanız, Java projenizi Maven'a dönüştür: Paket Gezgini penceresinde projeye sağ tıklayın ve Maven projesine -> Dönüştür'leri Yapılandır'ı seçin. Ardından, aşağıdaki örnekte gösterildiği gibi bu iki kitaplıka bağımlılıklar ekleyin.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.myorg.sbusquickstarts</groupId>
    <artifactId>sbustopicqs</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <release>15</release>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.0.2</version>
        </dependency>
    </dependencies>
</project>

Konuya ileti göndermek için kod ekleme

  1. Java dosyasının import konu başlığına aşağıdaki deyimlerini ekleyin.

    import com.azure.messaging.servicebus.*;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. sınıfında, aşağıda gösterildiği gibi bağlantı dizesini ve konu adını tutmak için değişkenleri tanımlayın:

    static String connectionString = "<NAMESPACE CONNECTION STRING>";
    static String topicName = "<TOPIC NAME>";    
    static String subName = "<SUBSCRIPTION NAME>";
    

    yerine <NAMESPACE CONNECTION STRING> ad alanınıza bağlantı dizesini Service Bus değiştirin. ve yerine <TOPIC NAME> konunun adını yazın.

  3. Konuya bir ileti sendMessage göndermek için sınıfında adlı bir yöntem ekleyin.

    static void sendMessage()
    {
        // create a Service Bus Sender client for the queue 
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // send one message to the topic
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the topic: " + topicName);        
    }
    
  4. İleti listesi oluşturmak createMessages için sınıfında adlı bir yöntem ekleyin. Genellikle, bu iletileri uygulamanın farklı kısımlarından alırsınız. Burada, örnek iletilerin bir listesini oluşturuz.

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. Oluşturduğunuz konuya ileti sendMessageBatch göndermek için method adlı bir yöntem ekleyin. Bu yöntem konu için bir oluşturur, ileti listesini almak için yöntemini çağırır, bir veya daha fazla toplu iş hazırlar ve toplu ServiceBusSenderClient createMessages işleri konuya gönderir.

    static void sendMessageBatch()
    {
        // create a Service Bus Sender client for the topic 
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .topicName(topicName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();        
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.        
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the topic: " + topicName);
        }
    
        //close the client
        senderClient.close();
    }
    

Abonelikten ileti alma

Bu bölümde, bir abonelikten konuya ileti almak için kod eksersiniz.

  1. Abonelikten ileti almak receiveMessages için adlı bir yöntem ekleyin. Bu yöntem, iletilerin işlenmesi için bir işleyici ve hataların işlenmesi için başka bir işleyici ServiceBusProcessorClient belirterek abonelik için bir oluşturur. Ardından işlemciyi başlatır, birkaç saniye bekler, alınan iletileri yazdırır, ardından işlemciyi durdurur ve kapatır.

    Önemli

    kodundaki ServiceBusTopicTest ServiceBusTopicTest::processMessage yerine sınıfını yazın.

    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        CountDownLatch countdownLatch = new CountDownLatch(1);
    
        // Create an instance of the processor through the ServiceBusClientBuilder
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .processor()
            .topicName(topicName)
            .subscriptionName(subName)
            .processMessage(ServiceBusTopicTest::processMessage)
            .processError(context -> processError(context, countdownLatch))
            .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();        
    }  
    
  2. Service Bus processMessage aboneliğinden alınan iletiyi işleme yöntemini ekleyin.

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }    
    
  3. Hata iletilerini processError işlemek için yöntemini ekleyin.

    private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
    
            countdownLatch.countDown();
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }  
    
  4. , ve main metotlarını sendMessage çağırmak için yöntemini sendMessageBatch receiveMessages güncelleştirin ve 'i atacak şekilde InterruptedException güncelleştirin.

    public static void main(String[] args) throws InterruptedException {        
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }   
    

Uygulamayı çalıştırma

Aşağıdaki çıkışa benzer bir çıktıyı görmek için programı çalıştırın:

Sent a single message to the topic: mytopic
Sent a batch of messages to the topic: mytopic
Starting the processor
Processing message. Session: e0102f5fbaf646988a2f4b65f7d32385, Sequence #: 1. Contents: Hello, World!
Processing message. Session: 3e991e232ca248f2bc332caa8034bed9, Sequence #: 2. Contents: First message
Processing message. Session: 56d3a9ea7df446f8a2944ee72cca4ea0, Sequence #: 3. Contents: Second message
Processing message. Session: 7bd3bd3e966a40ebbc9b29b082da14bb, Sequence #: 4. Contents: Third message

Service Bus ad Azure portal genel bakış sayfasında gelen ve giden ileti sayısını görebilirsiniz. En son değerleri görmek için bir dakika kadar beklemeniz ve sayfayı yenilemeniz gerekir.

Gelen ve giden ileti sayısı

Orta alt bölmede Konular sekmesine geçiş yapın ve konu başlığınıza ilişkin Service Bus sayfasını görmek için konuyu seçin. Bu sayfada, İletiler grafiğinde dört gelen ve dört giden ileti görüyorsanız.

Gelen ve giden iletiler

yönteminde çağrısına açıklama ekleme ve uygulamayı yeniden çalıştırma, Service Bus Konu sayfasında 8 gelen ileti receiveMessages main (4 yeni) ama dört giden ileti olduğunu görebilirsiniz.

Güncelleştirilmiş konu sayfası

Bu sayfada, bir abonelik seçersiniz, Abonelik sayfasında Service Bus edinebilirsiniz. Bu sayfada etkin ileti sayısını, ileti sayısını ve daha fazlasını görebilirsiniz. Bu örnekte, henüz bir alıcı tarafından alınmış dört etkin ileti vardır.

Etkin ileti sayısı

Sonraki adımlar

Aşağıdaki belgelere ve örneklere bakın: