Share via


Enviar e receber mensagens em filas do Barramento de Serviço do Azure (Go)

Neste tutorial, você aprenderá como enviar e receber mensagens em filas do Barramento de Serviço do Azure com a linguagem de programação Go.

O Barramento de Serviço do Azure é um agente de mensagens empresarial totalmente gerenciado com filas de mensagens e tópicos de publicação/assinatura. O Barramento de Serviço é usado para desacoplar aplicativos e serviços uns dos outros, fornecendo um transporte de mensagens distribuído, confiável e de alto desempenho.

O pacote azservicebus do SDK do Azure para linguagem Go permite que você envie e receba mensagens de Barramento de Serviço do Azure e usa a linguagem de programação Go.

Ao final deste tutorial, você conseguirá: enviar uma única mensagem ou lote de mensagens para uma fila, receber mensagens e mensagens de mensagens mortas que não são processadas.

Pré-requisitos

Criar o aplicativo de exemplo

Para começar, crie um novo módulo Go.

  1. Crie um novo diretório para o módulo chamado service-bus-go-how-to-use-queues.

  2. No diretório azservicebus, inicialize o módulo e instale os pacotes necessários.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Crie um arquivo chamado main.go.

Autenticar e criar um cliente

No arquivomain.go, crie uma nova função nomeada GetClient e adicione o seguinte código:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

A função GetClient retorna um novo objeto azservicebus.Client criado com um namespace do Barramento de Serviço do Azure e uma credencial. O namespace é fornecido pela variável de ambiente AZURE_SERVICEBUS_HOSTNAME. E a credencial é criada pela função azidentity.NewDefaultAzureCredential.

Para desenvolvimento local, o DefaultAzureCredential usou o token de acesso da CLI do Azure, que pode ser criado executando o comando az login para autenticação no Azure.

Dica

Para autenticar com uma cadeia de conexão, use a função NewClientFromConnectionString.

Enviar mensagens a uma fila

No arquivomain.go, crie uma nova função nomeada SendMessage e adicione o seguinte código:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage usa dois parâmetros: uma cadeia de caracteres de mensagem e um objeto azservicebus.Client. Em seguida, ele cria um novo objeto azservicebus.Sender e envia a mensagem para a fila. Para enviar mensagens em massa, adicione a SendMessageBatch função ao arquivo main.go.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatchusa dois parâmetros: uma fatia de mensagens e um objeto azservicebus.Client. Em seguida, ele cria um novo objeto azservicebus.Sender e envia as mensagens para a fila.

Receber mensagens de uma fila

Depois de enviar mensagens para a fila, você pode recebê-las com o tipo azservicebus.Receiver. Para receber mensagens de uma fila, adicione a função GetMessage ao arquivo main.go.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage usa um objeto azservicebus.Client e cria um novo objeto azservicebus.Receiver. Em seguida, ela recebe as mensagens da fila. A função Receiver.ReceiveMessages usa dois parâmetros: um contexto e o número de mensagens a serem recebidas. A função Receiver.ReceiveMessages retorna uma fatia de objetos azservicebus.ReceivedMessage.

Em seguida, um loop for itera pelas mensagens e imprime o corpo da mensagem. Em seguida, a função CompleteMessage é chamada para concluir a mensagem, removendo-a da fila.

Mensagens que excedem os limites de comprimento são enviadas para uma fila inválida ou as que não são processadas com êxito podem ser enviadas para a fila de mensagens mortas. Para enviar mensagens para a fila de mensagens mortas, adicione a função SendDeadLetterMessage ao arquivo main.go.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage usa um objeto azservicebus.Client e um objeto azservicebus.ReceivedMessage. Em seguida, ela envia a mensagem para a fila de mensagens mortas. A função usa dois parâmetros: um contexto e um objeto azservicebus.DeadLetterOptions. A função Receiver.DeadLetterMessage retornará um erro se a mensagem não for enviada para a fila de mensagens mortas.

Para enviar mensagens para a fila de mensagens mortas, adicione a função ReceiveDeadLetterMessage ao arquivo main.go.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage usa um objeto azservicebus.Client e cria um novo objeto azservicebus.Receiver com opções para a fila de mensagens mortas. Em seguida, ela recebe as mensagens da fila de mensagens mortas. Em seguida, a função recebe uma mensagem da fila de mensagens mortas. Em seguida, ela imprime o motivo da mensagem morta e a descrição dessa mensagem.

Código de exemplo

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Executar o código

Antes de executar o código, crie uma variável de ambiente chamada AZURE_SERVICEBUS_HOSTNAME. Defina o valor da variável de ambiente como o namespace do Barramento de Serviço.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Em seguida, execute o seguinte comando go run para executar o aplicativo:

go run main.go

Próximas etapas

Para obter mais informações, confira estes links: