Berichten verzenden naar en berichten ontvangen van Azure Service Bus wachtrijen (Go)

In deze zelfstudie leert u hoe u berichten kunt verzenden naar en ontvangen van Azure Service Bus wachtrijen met behulp van de programmeertaal Go.

Azure Service Bus is een volledig beheerde zakelijke berichtenbroker met berichtenwachtrijen en mogelijkheden voor publiceren/abonneren. Service Bus wordt gebruikt om toepassingen en services van elkaar te ontkoppelen, waardoor een gedistribueerd, betrouwbaar en hoogwaardig berichtentransport mogelijk is.

Met het pakket azservicebus van de Azure SDK voor Go kunt u berichten verzenden en ontvangen van Azure Service Bus en de programmeertaal Go gebruiken.

Aan het einde van deze zelfstudie kunt u het volgende doen: één bericht of batch berichten verzenden naar een wachtrij, berichten ontvangen en berichten in onbestelbare berichten die niet worden verwerkt.

Vereisten

De voorbeeld-app maken

Maak eerst een nieuwe Go-module.

  1. Maak een nieuwe map voor de module met de naam service-bus-go-how-to-use-queues.

  2. Initialiseer de module in de azservicebus map en installeer de vereiste pakketten.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Maak een nieuw bestand met de naam main.go.

Een client verifiëren en maken

Maak in het main.go bestand een nieuwe functie met de naam GetClient en voeg de volgende code toe:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

De GetClient functie retourneert een nieuw azservicebus.Client object dat is gemaakt met behulp van een Azure Service Bus naamruimte en een referentie. De naamruimte wordt geleverd door de omgevingsvariabele AZURE_SERVICEBUS_HOSTNAME . En de referentie wordt gemaakt met behulp van de azidentity.NewDefaultAzureCredential functie.

Voor lokale ontwikkeling heeft de DefaultAzureCredential het toegangstoken van Azure CLI gebruikt, dat kan worden gemaakt door de az login opdracht uit te voeren om te verifiëren bij Azure.

Tip

Als u zich wilt verifiëren met een connection string gebruikt u de functie NewClientFromConnectionString.

Berichten verzenden naar een wachtrij

Maak in het main.go bestand een nieuwe functie met de naam SendMessage en voeg de volgende code toe:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage heeft twee parameters: een berichttekenreeks en een azservicebus.Client -object. Vervolgens wordt er een nieuw azservicebus.Sender object gemaakt en wordt het bericht naar de wachtrij verzonden. Als u bulkberichten wilt verzenden, voegt u de SendMessageBatch functie toe aan uw main.go bestand.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch heeft twee parameters: een segment berichten en een azservicebus.Client -object. Vervolgens wordt er een nieuw azservicebus.Sender object gemaakt en worden de berichten naar de wachtrij verzonden.

Berichten van een wachtrij ontvangen

Nadat u berichten naar de wachtrij hebt verzonden, kunt u deze ontvangen met het azservicebus.Receiver type. Als u berichten van een wachtrij wilt ontvangen, voegt u de GetMessage functie toe aan uw main.go bestand.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage neemt een azservicebus.Client object en maakt een nieuw azservicebus.Receiver object. Vervolgens worden de berichten van de wachtrij ontvangen. De Receiver.ReceiveMessages functie heeft twee parameters: een context en het aantal berichten dat moet worden ontvangen. De Receiver.ReceiveMessages functie retourneert een segment van azservicebus.ReceivedMessage objecten.

Vervolgens doorloopt een for lus de berichten en wordt de hoofdtekst van het bericht afgedrukt. Vervolgens wordt de CompleteMessage functie aangeroepen om het bericht te voltooien en uit de wachtrij te verwijderen.

Berichten die de lengtelimieten overschrijden, naar een ongeldige wachtrij worden verzonden of die niet goed worden verwerkt, kunnen worden verzonden naar de wachtrij met onbestelbare berichten. Als u berichten wilt verzenden naar de wachtrij met onbestelbare berichten, voegt u de SendDeadLetterMessage functie toe aan uw main.go bestand.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage gebruikt een azservicebus.Client object en een azservicebus.ReceivedMessage object. Vervolgens wordt het bericht naar de wachtrij met onbestelbare berichten verzonden. De functie heeft twee parameters: een context en een azservicebus.DeadLetterOptions object. De Receiver.DeadLetterMessage functie retourneert een fout als het bericht niet naar de wachtrij met onbestelbare berichten kan worden verzonden.

Als u berichten uit de wachtrij met onbestelbare berichten wilt ontvangen, voegt u de ReceiveDeadLetterMessage functie toe aan uw main.go bestand.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage neemt een azservicebus.Client object en maakt een nieuw azservicebus.Receiver object met opties voor de wachtrij met onbestelbare berichten. Vervolgens worden de berichten van de wachtrij met onbestelbare berichten ontvangen. De functie ontvangt vervolgens één bericht uit de wachtrij met onbestelbare berichten. Vervolgens worden de reden en beschrijving van de dode letter voor dat bericht afgedrukt.

Voorbeeldcode

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

De code uitvoeren

Voordat u de code uitvoert, maakt u een omgevingsvariabele met de naam AZURE_SERVICEBUS_HOSTNAME. Stel de waarde van de omgevingsvariabele in op de Service Bus-naamruimte.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Voer vervolgens de volgende go run opdracht uit om de app uit te voeren:

go run main.go

Volgende stappen

Bekijk de volgende koppelingen voor meer informatie: