question

AnirudThapliyal-9057 avatar image
0 Votes"
AnirudThapliyal-9057 asked PramodValavala-MSFT answered

Not able to receive messages in sequential order from concurrent session in Azure Service Bus

I am new to Azure Service Bus and trying to understand how session enabled queue works.

I have 3 console applications: 1 Message Sender app, to send message to queue and 2 Message Receiver apps.
The Azure Service Bus queue is session enabled and I am trying to implement concurrent sessions.

Each user who runs the message sender console application will send a message to the queue and this message has to be received by both receiver console applications.
And the receiver applications must receive/process each message in sequential order, Receiver1 first and then Receiver2. For that I am tagging each message
with the receiver application name while sending and then in receiver code checking the tag name for the receiver application to receive.

I am not receiving messages in sequential order within a session id.

Below is the code I am using for sending and receiving message.

Message Sender code:

public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}

public async Task MainAsync()
{
await SendSessionMessagesAsync();
}

public async Task SendSessionMessagesAsync()
{
// Below userList coming from db

foreach (var user in userList)
{
List<ReceiverApplicationName> receiverApplicationNameList = new List<ReceiverApplicationName>();
var receiverApplicationNames = //getting the ReceiverApplicationName list from appsettings.json, [ "Receiver1", "Receiver2" ]
foreach (var item in receiverApplicationNames)
{
ReceiverApplicationName receiverApplicationName = new ReceiverApplicationName();
receiverApplicationName.UserId = user.Id;
receiverApplicationName.ApplicationName = item;
receiverApplicationNameList.Add(receiverApplicationName);
}
foreach (var queueMessageItem in receiverApplicationNameList)
{
var messageBody = JsonConvert.SerializeObject(queueMessageItem);
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
message.SessionId = "Session " + queueMessageItem.UserId;
Console.WriteLine($"Sending message for UserId: {queueMessageItem.UserId}, SessionId: {message.SessionId}, Message: {messageBody}");
await queueClient.SendAsync(message); // 2 messages(Receiver1 and Receiver2) for each user
}
}
}


Message Receiver code(Receiver1):

public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}

public async Task MainAsync()
{
RegisterOnSessionHandlerAndReceiveSessionMessages();
}

public void RegisterOnSessionHandlerAndReceiveSessionMessages()
{
var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentSessions = 2,
MessageWaitTimeout = TimeSpan.FromSeconds(5),
AutoComplete = false,
};
queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);
}

public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
return Task.CompletedTask;
}

public async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)
{
var result = JsonConvert.DeserializeObject<ReceiverApplicationName>(Encoding.UTF8.GetString(message.Body));
if (result.ReceiverApplicationName == "Receiver1")
{
Console.WriteLine($"Received message for UserId: {result.UserId}, Session: {session.SessionId}, Message: SequenceNumber: {message.SystemProperties.SequenceNumber}, Body:{Encoding.UTF8.GetString(message.Body)}, at: {DateTime.Now}");
await session.CompleteAsync(message.SystemProperties.LockToken);
}
}

Message Receiver code(Receiver2):

public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}

public async Task MainAsync()
{
RegisterOnSessionHandlerAndReceiveSessionMessages();
}

public void RegisterOnSessionHandlerAndReceiveSessionMessages()
{
var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentSessions = 2,
MessageWaitTimeout = TimeSpan.FromSeconds(5),
AutoComplete = false,
};
queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);
}

public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
return Task.CompletedTask;
}

public async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)
{
var result = JsonConvert.DeserializeObject<ReceiverApplicationName>(Encoding.UTF8.GetString(message.Body));
if (result.ReceiverApplicationName == "Receiver2")
{
Console.WriteLine($"Received message for UserId: {result.UserId}, Session: {session.SessionId}, Message: SequenceNumber: {message.SystemProperties.SequenceNumber}, Body:{Encoding.UTF8.GetString(message.Body)}, at: {DateTime.Now}");
await session.CompleteAsync(message.SystemProperties.LockToken);
}
}

Results:

The timestamp of the received message for Receiver1 and Receiver2 are not in sequence. For ex, in UserId = 2, Receiver1 should receive message before it is received by Receiver2 application or maybe I am doing something incorrect here.

Message sender:
114223-image.png


Receiver1:
114165-image.png


Receiver2:
114202-image.png



dotnet-csharpazure-service-bus
image.png (43.1 KiB)
image.png (28.3 KiB)
image.png (28.7 KiB)
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Bruce-SqlWork avatar image
0 Votes"
Bruce-SqlWork answered AnirudThapliyal-9057 edited

Each receiver receives messages for a given session in order. The order of sessions is not defined. As a receiver will lock a message in a session, another receiver waiting on the lock will process a different session.

If you want processing to be receiver 1, then receiver 2, receiver should queue the the request to receiver 2 after it has processed it.

Even if you used one session, while both receivers would see the messages in order, the messages would not be processed in receiver order. 2 could process several 2 messages before 1 processed any.

· 1
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Is there any way where in a session I can receive the messages in the order in which they were added, like by SequenceNumber property? That way I can make sure that Receiver1 gets the message before Receiver2 does or a better way to handle this scenario?

0 Votes 0 ·
PramodValavala-MSFT avatar image
0 Votes"
PramodValavala-MSFT answered

@AnirudThapliyal-9057 Message Sessions are useful when particular messages need to be handled sequentially even when multiple receivers are listening for messages on the same queue. But if you had only one receiver per queue, you could simply process messages in order by fetching one message at a time.

Based on the scenario you described, it would be best to have two separate queues, one for Receiver 1 and one for Receiver 2. The sender would send messages into Queue 1 which Receiver 1 processes and sends the same message to Queue 2 for Receiver 2 to process.

That being said, you could still achieve the above with a single queue and sessions by changing the Session ID of you messages to be Receiver 1 and Receiver 2. Instead of fetching available sessions, you receiver will specifically process one defined session using session processor.

And after Receiver 1 processes a message, it would send the same message back to the queue but the Session ID for Receiver 2.


5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.