Mengirim pesan dan menerima pesan dari antrean Azure Service Bus (Go)
Di tutorial ini, Anda akan mempelajari cara mengirim pesan dan menerima pesan dari antrean Azure Service Bus menggunakan bahasa pemrogram Java.
Azure Service Bus adalah perantara pesan perusahaan yang dikelola sepenuhnya dengan antrean pesan dan kemampuan menerbitkan/berlangganan. Azure Service Bus digunakan untuk memisahkan aplikasi dan layanan satu sama lain, menyediakan transportasi pesan terdistribusi, andal, dan berkinerja tinggi.
SDK Azure untuk paket azservicebus memungkinkan Anda mengirim dan menerima pesan dari Azure Service Bus dan menggunakan bahasa pemrograman Go.
Pada akhir tutorial ini, Anda akan dapat: mengirim satu pesan atau batch pesan ke antrean, menerima pesan, dan pesan surat mati yang tidak diproses.
Prasyarat
- Langganan Azure. Anda dapat mengaktifkan keuntungan pelanggan Visual Studio atau MSDN Anda atau mendaftar akun gratis.
- Jika Anda tidak memiliki antrean untuk dikerjakan, ikuti langkah-langkah di artikel Gunakan portal Microsoft Azure untuk membuat antrean Azure Bus Layanan untuk membuat antrean.
- Go versi 1.18 atau di atasnya
Membuat sampel aplikasi
Untuk memulai, buat modul Go baru.
Buat direktori baru untuk modul bernama
service-bus-go-how-to-use-queues
.Di direktori
azservicebus
, inisialisasi modul dan pasang paket yang diperlukan.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Buat file baru bernama
main.go
.
Mengautentikasi dan membuat klien
Dalam file main.go
, buat fungsi baru bernama GetClient
dan tambahkan kode berikut:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
Fungsi GetClient
mengembalikan objek azservicebus.Client
baru yang dibuat dengan menggunakan namespace layanan Azure Service Bus dan info masuk. Namespace layanan disediakan oleh variabel lingkungan AZURE_SERVICEBUS_HOSTNAME
. Dan kredensial dibuat dengan menggunakan fungsi azidentity.NewDefaultAzureCredential
.
Untuk pengembangan lokal, DefaultAzureCredential
digunakan token akses dari Azure CLI, yang dapat dibuat dengan menjalankan perintah az login
untuk mengautentikasi ke Azure.
Tip
Untuk mengautentikasi dengan string koneksi, gunakan fungsi NewClientFromConnectionString.
Mengirim pesan ke antrean
Dalam file main.go
, buat fungsi baru bernama SendMessage
dan tambahkan kode berikut:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
mengambil dua parameter: string pesan dan objek azservicebus.Client
. Kemudian membuat objek azservicebus.Sender
baru dan mengirim pesan ke antrean. Untuk mengirim pesan massal, tambahkan fungsi SendMessageBatch
ke file main.go
Anda.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
mengambil dua parameter: sebuah pesan dan objek azservicebus.Client
. Kemudian membuat objek azservicebus.Sender
baru dan mengirim pesan ke antrean.
Menerima pesan dari antrean
Setelah mengirim pesan ke antrean, Anda dapat menerimanya dengan jenis azservicebus.Receiver
. Untuk menerima pesan dari antrean, tambahkan fungsi GetMessage
ke file main.go
Anda.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
mengambil objek azservicebus.Client
dan membuat objek azservicebus.Receiver
yang baru. Kemudian menerima pesan dari antrean. Fungsi Receiver.ReceiveMessages
ini mengambil dua parameter: konteks dan jumlah pesan yang akan diterima. Fungsi Receiver.ReceiveMessages
ini mengembalikan potongan objek azservicebus.ReceivedMessage
.
Selanjutnya, perulangan for
berulang melalui pesan dan mencetak isi pesan. Kemudian fungsi CompleteMessage
dipanggil untuk menyelesaikan pesan, menghapusnya dari antrean.
Pesan yang melebihi batas panjang, dikirim ke antrean yang tidak valid, atau tidak berhasil diproses dapat dikirim ke antrean surat gagal. Untuk mengirim pesan ke antrean surat mati, tambahkan fungsi SendDeadLetterMessage
ke file main.go
Anda.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
mengambil objek azservicebus.Client
dan objek azservicebus.ReceivedMessage
. Kemudian mengirim pesan ke antrean surat gagal. Fungsi ini mengambil dua parameter: konteks dan objek azservicebus.DeadLetterOptions
. Fungsi Receiver.DeadLetterMessage
mengembalikan kesalahan jika pesan gagal dikirim ke antrean surat gagal.
Untuk menerima pesan dari antrean surat mati, tambahkan fungsi ReceiveDeadLetterMessage
file main.go
Anda.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
mengambil objek azservicebus.Client
dan membuat objek azservicebus.Receiver
baru dengan opsi untuk antrean surat mati. Kemudian menerima pesan dari antrean surat gagal. Fungsi kemudian menerima satu pesan dari antrean surat gagal. Kemudian mencetak alasan surat mati dan deskripsi untuk pesan tersebut.
Kode sampel
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Jalankan kode
Sebelum Anda menjalankan kode, buat variabel lingkungan bernama AZURE_SERVICEBUS_HOSTNAME
. Atur nilai variabel lingkungan ke namespace layanan Azure Service Bus.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Jalankan perintah berikut go run
untuk menjalankan aplikasi:
go run main.go
Langkah berikutnya
Untuk mengetahui informasi selengkapnya, lihat link berikut: