Mengirim pesan dan menerima pesan dari antrean Azure Service Bus (Go)

Di tutorial ini, Anda akan mempelajari cara mengirim pesan dan menerima pesan dari antrean Azure Service Bus menggunakan bahasa pemrogram Java.

Azure Service Bus adalah perantara pesan perusahaan yang dikelola sepenuhnya dengan antrean pesan dan kemampuan menerbitkan/berlangganan. Azure Service Bus digunakan untuk memisahkan aplikasi dan layanan satu sama lain, menyediakan transportasi pesan terdistribusi, andal, dan berkinerja tinggi.

SDK Azure untuk paket azservicebus memungkinkan Anda mengirim dan menerima pesan dari Azure Service Bus dan menggunakan bahasa pemrograman Go.

Pada akhir tutorial ini, Anda akan dapat: mengirim satu pesan atau batch pesan ke antrean, menerima pesan, dan pesan surat mati yang tidak diproses.

Prasyarat

Membuat sampel aplikasi

Untuk memulai, buat modul Go baru.

  1. Buat direktori baru untuk modul bernama service-bus-go-how-to-use-queues.

  2. Di direktori azservicebus, inisialisasi modul dan pasang paket yang diperlukan.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Buat file baru bernama main.go.

Mengautentikasi dan membuat klien

Dalam file main.go, buat fungsi baru bernama GetClient dan tambahkan kode berikut:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

Fungsi GetClient mengembalikan objek azservicebus.Client baru yang dibuat dengan menggunakan namespace layanan Azure Service Bus dan info masuk. Namespace layanan disediakan oleh variabel lingkungan AZURE_SERVICEBUS_HOSTNAME. Dan kredensial dibuat dengan menggunakan fungsi azidentity.NewDefaultAzureCredential.

Untuk pengembangan lokal, DefaultAzureCredential digunakan token akses dari Azure CLI, yang dapat dibuat dengan menjalankan perintah az login untuk mengautentikasi ke Azure.

Tip

Untuk mengautentikasi dengan string koneksi, gunakan fungsi NewClientFromConnectionString.

Mengirim pesan ke antrean

Dalam file main.go, buat fungsi baru bernama SendMessage dan tambahkan kode berikut:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage mengambil dua parameter: string pesan dan objek azservicebus.Client. Kemudian membuat objek azservicebus.Sender baru dan mengirim pesan ke antrean. Untuk mengirim pesan massal, tambahkan fungsi SendMessageBatch ke file main.go Anda.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch mengambil dua parameter: sebuah pesan dan objek azservicebus.Client. Kemudian membuat objek azservicebus.Sender baru dan mengirim pesan ke antrean.

Menerima pesan dari antrean

Setelah mengirim pesan ke antrean, Anda dapat menerimanya dengan jenis azservicebus.Receiver. Untuk menerima pesan dari antrean, tambahkan fungsi GetMessage ke file main.go Anda.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage mengambil objek azservicebus.Client dan membuat objek azservicebus.Receiver yang baru. Kemudian menerima pesan dari antrean. Fungsi Receiver.ReceiveMessages ini mengambil dua parameter: konteks dan jumlah pesan yang akan diterima. Fungsi Receiver.ReceiveMessages ini mengembalikan potongan objek azservicebus.ReceivedMessage.

Selanjutnya, perulangan for berulang melalui pesan dan mencetak isi pesan. Kemudian fungsi CompleteMessage dipanggil untuk menyelesaikan pesan, menghapusnya dari antrean.

Pesan yang melebihi batas panjang, dikirim ke antrean yang tidak valid, atau tidak berhasil diproses dapat dikirim ke antrean surat gagal. Untuk mengirim pesan ke antrean surat mati, tambahkan fungsi SendDeadLetterMessage ke file main.go Anda.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage mengambil objek azservicebus.Client dan objek azservicebus.ReceivedMessage. Kemudian mengirim pesan ke antrean surat gagal. Fungsi ini mengambil dua parameter: konteks dan objek azservicebus.DeadLetterOptions. Fungsi Receiver.DeadLetterMessage mengembalikan kesalahan jika pesan gagal dikirim ke antrean surat gagal.

Untuk menerima pesan dari antrean surat mati, tambahkan fungsi ReceiveDeadLetterMessage file main.go Anda.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage mengambil objek azservicebus.Client dan membuat objek azservicebus.Receiver baru dengan opsi untuk antrean surat mati. Kemudian menerima pesan dari antrean surat gagal. Fungsi kemudian menerima satu pesan dari antrean surat gagal. Kemudian mencetak alasan surat mati dan deskripsi untuk pesan tersebut.

Kode sampel

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Jalankan kode

Sebelum Anda menjalankan kode, buat variabel lingkungan bernama AZURE_SERVICEBUS_HOSTNAME. Atur nilai variabel lingkungan ke namespace layanan Azure Service Bus.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Jalankan perintah berikut go run untuk menjalankan aplikasi:

go run main.go

Langkah berikutnya

Untuk mengetahui informasi selengkapnya, lihat link berikut: