Share via


Échanger des messages avec des files d’attente Azure Service Bus (Go)

Dans ce tutoriel, vous apprendrez comment envoyer et recevoir des messages de files d’attente Azure Service Bus en utilisant le langage de programmation Go.

Azure Service Bus est un répartiteur de messages d’entreprise complètement managé, avec des files d’attente de messages et des fonctionnalités de publication/abonnement. Service Bus est utilisé pour dissocier les applications et les services les uns des autres, en fournissant un moyen de transport distribué, fiable et haute performance pour les messages.

Le package azservicebus d’Azure SDK pour Go vous permet d’envoyer et de recevoir des messages à partir d’Azure Service Bus et à l’aide du langage de programmation Go.

À la fin de ce tutoriel, vous serez en mesure d’envoyer un message unique ou un lot de messages à une file d’attente, ainsi que de recevoir des messages et des messages restés lettres mortes qui n’ont pas été traités.

Prérequis

Créer un exemple d'application

Commencez par créer un nouveau module Go.

  1. Créez un répertoire pour le module nommé service-bus-go-how-to-use-queues.

  2. Dans le répertoire azservicebus, initialisez le module et installez les packages requis.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Créez un nouveau fichier appelé main.go.

Authentifier et créer un client

Dans le fichier main.go, créez une fonction nommée GetClient et ajoutez le code suivant :

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

La fonction GetClient retourne un nouvel objet azservicebus.Client créé en utilisant des informations d’identification et un espace de noms Azure Service Bus. L’espace de noms est fourni par la variable d’environnement AZURE_SERVICEBUS_HOSTNAME. Quant aux informations d’identification, elles sont créées à l’aide de la fonction azidentity.NewDefaultAzureCredential.

Pour le développement local, le DefaultAzureCredential a utilisé le jeton d’accès d’Azure CLI, qui peut être créé en exécutant la commande az login pour s’authentifier auprès d’Azure.

Conseil

Pour vous authentifier à l’aide d’une chaîne de connexion, utilisez la fonction NewClientFromConnectionString.

Envoi de messages à une file d'attente

Dans le fichier main.go, créez une fonction nommée SendMessage et ajoutez le code suivant :

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage accepte deux paramètres : une chaîne de message et un objet azservicebus.Client. Il crée ensuite un objet azservicebus.Sender et envoie le message à la file d’attente. Pour envoyer des messages en bloc, ajoutez la fonction SendMessageBatch à votre fichier main.go.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch accepte deux paramètres : une tranche de messages et un objet azservicebus.Client. Il crée ensuite un objet azservicebus.Sender et envoie les messages à la file d’attente.

Réception des messages d'une file d'attente

Une fois que vous avez envoyé des messages à la file d’attente, vous pouvez les recevoir avec le type azservicebus.Receiver. Pour recevoir les messages d’une file d’attente, ajoutez la fonction GetMessage à votre fichier main.go.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage accepte un objet azservicebus.Client et crée un objet azservicebus.Receiver. Il reçoit ensuite les messages de la file d’attente. La fonction Receiver.ReceiveMessages accepte deux paramètres : un contexte et le nombre de messages à recevoir. La fonction Receiver.ReceiveMessages retourne une tranche d’objets azservicebus.ReceivedMessage.

Ensuite, une boucle for effectue une itération dans les messages et imprime le corps du message. Après quoi, la fonction CompleteMessage est appelée pour terminer le traitement du message et le supprimer de la file d’attente.

Les messages qui dépassent les limites de longueur sont envoyés à une file d’attente des messages non valides et ceux qui ne sont pas traités correctement peuvent être envoyés à la file d’attente de lettres mortes. Pour envoyer des messages à la file d’attente de lettres mortes, ajoutez la fonction SendDeadLetterMessage à votre fichier main.go.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage accepte un objet azservicebus.Client et un objet azservicebus.ReceivedMessage. Il envoie ensuite le message à la file d’attente de lettres mortes. La fonction accepte deux paramètres : un contexte et un objet azservicebus.DeadLetterOptions. La fonction Receiver.DeadLetterMessage retourne une erreur si l’envoi du message vers la file d’attente de lettres mortes échoue.

Pour recevoir des messages de la file d’attente de lettres mortes, ajoutez la fonction ReceiveDeadLetterMessage à votre fichier main.go.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage accepte un objet azservicebus.Client et crée un objet azservicebus.Receiver avec des options pour la file d’attente de lettres mortes. Il reçoit ensuite les messages de la file d’attente de lettres mortes. La fonction reçoit ensuite un message de la file d’attente de lettres mortes. Ensuite, elle imprime le motif de lettre morte du message et sa description.

Exemple de code

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Exécuter le code

Avant d’exécuter le code, créez une variable d’environnement nommée AZURE_SERVICEBUS_HOSTNAME. Définissez la valeur de la variable d’environnement sur l’espace de noms Service Bus.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Ensuite, exécutez la commande go run suivante pour exécuter l’application :

go run main.go

Étapes suivantes

Pour plus d’informations, reportez-vous aux liens suivants :