Azure depolama kuyruklarında (Java) azure Service Bus gönderme ve ileti alma

Bu hızlı başlangıçta, azure depolama kuyruğuna ileti göndermek ve bu kuyruktan ileti almak için bir Java Service Bus oluşturabilirsiniz.

Not

Bu hızlı başlangıç, basit bir kuyruğa ileti gönderme ve bunları alma senaryosu için Service Bus adım yönergeler sağlar. Azure sdk'sı için önceden Service Bus Javaörneklerini GitHub.

Önkoşullar

Kuyruğa ileti gönderme

Bu bölümde bir Java konsol projesi oluşturacak ve daha önce oluşturduğunuz kuyruğa ileti göndermek için kod eksersiniz.

Java konsol projesi oluşturma

Eclipse'i veya kendi tercihi olan bir aracı kullanarak bir Java projesi oluşturun.

Service Bus'yi kullanmak için Service Bus

Azure Core ve Azure depolama kitaplıklarına Service Bus ekleyin.

Eclipse kullanıyorsanız ve bir Java konsol uygulaması oluşturduysanız, Java projenizi Maven'a dönüştür: Paket Gezgini penceresinde projeye sağ tıklayın ve Maven projesine -> Dönüştür'leri Yapılandır'ı seçin. Ardından, aşağıdaki örnekte gösterildiği gibi bu iki kitaplıka bağımlılıklar ekleyin.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.myorg.sbusquickstarts</groupId>
    <artifactId>sbustopicqs</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <release>15</release>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.0.2</version>
        </dependency>
    </dependencies>
</project>

Kuyruğa ileti göndermek için kod ekleme

  1. Java dosyasının import konu başlığına aşağıdaki deyimlerini ekleyin.

    import com.azure.messaging.servicebus.*;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.Arrays;
    import java.util.List;
    
  2. sınıfında, aşağıda gösterildiği gibi bağlantı dizesini ve kuyruk adını tutmak için değişkenleri tanımlayın:

    static String connectionString = "<NAMESPACE CONNECTION STRING>";
    static String queueName = "<QUEUE NAME>";    
    

    yerine <NAMESPACE CONNECTION STRING> ad alanınıza bağlantı dizesini Service Bus değiştirin. ve yerine <QUEUE NAME> kuyruğun adını yazın.

  3. Kuyruğa sendMessage bir ileti göndermek için sınıfında adlı bir yöntem ekleyin.

    static void sendMessage()
    {
        // create a Service Bus Sender client for the queue 
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // send one message to the queue
        senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
        System.out.println("Sent a single message to the queue: " + queueName);        
    }
    
  4. İleti listesi oluşturmak createMessages için sınıfında adlı bir yöntem ekleyin. Genellikle, bu iletileri uygulamanın farklı kısımlarından alırsınız. Burada, örnek iletilerin bir listesini oluşturuz.

    static List<ServiceBusMessage> createMessages()
    {
        // create a list of messages and return it to the caller
        ServiceBusMessage[] messages = {
                new ServiceBusMessage("First message"),
                new ServiceBusMessage("Second message"),
                new ServiceBusMessage("Third message")
        };
        return Arrays.asList(messages);
    }
    
  5. Oluşturduğunuz kuyruğa sendMessageBatch ileti göndermek için method adlı bir yöntem ekleyin. Bu yöntem kuyruk için bir oluşturur, ileti listesini almak için yöntemini çağırır, bir veya daha fazla toplu iş hazırlar ve toplu ServiceBusSenderClient createMessages işleri kuyruğa gönderir.

    static void sendMessageBatch()
    {
        // create a Service Bus Sender client for the queue 
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .queueName(queueName)
                .buildClient();
    
        // Creates an ServiceBusMessageBatch where the ServiceBus.
        ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch();        
    
        // create a list of messages
        List<ServiceBusMessage> listOfMessages = createMessages();
    
        // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when
        // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all
        // messages are sent.        
        for (ServiceBusMessage message : listOfMessages) {
            if (messageBatch.tryAddMessage(message)) {
                continue;
            }
    
            // The batch is full, so we create a new batch and send the batch.
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
    
            // create a new batch
            messageBatch = senderClient.createMessageBatch();
    
            // Add that message that we couldn't before.
            if (!messageBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes());
            }
        }
    
        if (messageBatch.getCount() > 0) {
            senderClient.sendMessages(messageBatch);
            System.out.println("Sent a batch of messages to the queue: " + queueName);
        }
    
        //close the client
        senderClient.close();
    }
    

Kuyruktan ileti alma

Bu bölümde kuyruktan iletileri almak için kod eksersiniz.

  1. Kuyruktan iletileri almak receiveMessages için adlı bir yöntem ekleyin. Bu yöntem, iletilerin işlenmesi için bir işleyici ve hataların işlenmesi için başka bir işleyici ServiceBusProcessorClient belirterek kuyruk için bir oluşturur. Ardından işlemciyi başlatır, birkaç saniye bekler, alınan iletileri yazdırır, ardından işlemciyi durdurur ve kapatır.

    Önemli

    kodundaki QueueTest QueueTest::processMessage yerine sınıfını yazın.

    // handles received messages
    static void receiveMessages() throws InterruptedException
    {
        CountDownLatch countdownLatch = new CountDownLatch(1);
    
        // Create an instance of the processor through the ServiceBusClientBuilder
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .processor()
            .queueName(queueName)
            .processMessage(QueueTest::processMessage)
            .processError(context -> processError(context, countdownLatch))
            .buildProcessorClient();
    
        System.out.println("Starting the processor");
        processorClient.start();
    
        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping and closing the processor");
        processorClient.close();        
    }   
    
  2. Service Bus processMessage aboneliğinden alınan iletiyi işleme yöntemini ekleyin.

    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
            message.getSequenceNumber(), message.getBody());
    }    
    
  3. Hata iletilerini processError işlemek için yöntemini ekleyin.

    private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
            context.getFullyQualifiedNamespace(), context.getEntityPath());
    
        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }
    
        ServiceBusException exception = (ServiceBusException) context.getException();
        ServiceBusFailureReason reason = exception.getReason();
    
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
            || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
            || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n",
                reason, exception.getMessage());
    
            countdownLatch.countDown();
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            System.out.printf("Message lock lost for message: %s%n", context.getException());
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                // Choosing an arbitrary amount of time to wait until trying again.
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                System.err.println("Unable to sleep for period of time");
            }
        } else {
            System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(),
                reason, context.getException());
        }
    }  
    
  4. , ve main metotlarını sendMessage çağırmak için yöntemini sendMessageBatch receiveMessages güncelleştirin ve 'i atacak şekilde InterruptedException güncelleştirin.

    public static void main(String[] args) throws InterruptedException {        
        sendMessage();
        sendMessageBatch();
        receiveMessages();
    }   
    

Uygulamayı çalıştırma

Uygulamayı çalıştırarak konsol penceresinde aşağıdaki iletileri görebilirsiniz.

Sent a single message to the queue: myqueue
Sent a batch of messages to the queue: myqueue
Starting the processor
Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World!
Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message
Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message
Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message
Stopping and closing the processor

Service Bus ad Azure portal genel bakış sayfasında gelen ve giden ileti sayısını görebilirsiniz. En son değerleri görmek için bir dakika kadar beklemeniz ve sayfayı yenilemeniz gerekir.

Gelen ve giden ileti sayısı

Bu Genel Bakış sayfasında kuyruğu seçerek Kuyruk Service Bus gidin. Bu sayfada gelen ve giden ileti sayısını da görebilirsiniz. Ayrıca kuyruğun geçerli boyutu, maksimum boyut, etkin ileti sayısı gibi diğer bilgileri de görüyorsunuz.

Kuyruk ayrıntıları

Sonraki Adımlar

Aşağıdaki belgelere ve örneklere bakın: