Distributed tracing and correlation through Service Bus messaging
One of the common problems in microservices development is the ability to trace operation from a client through all the services that are involved in processing. It's useful for debugging, performance analysis, A/B testing, and other typical diagnostics scenarios. One part of this problem is tracking logical pieces of work. It includes message processing result and latency and external dependency calls. Another part is correlation of these diagnostics events beyond process boundaries.
When a producer sends a message through a queue, it typically happens in the scope of some other logical operation, initiated by some other client or service. The same operation is continued by consumer once it receives a message. Both producer and consumer (and other services that process the operation), presumably emit telemetry events to trace the operation flow and result. In order to correlate such events and trace operation end-to-end, each service that reports telemetry has to stamp every event with a trace context.
Microsoft Azure Service Bus messaging has defined payload properties that producers and consumers should use to pass such trace context. The protocol is based on the HTTP Correlation protocol.
Property Name | Description |
---|---|
Diagnostic-Id | Unique identifier of an external call from producer to the queue. Refer to Request-Id in HTTP protocol for the rationale, considerations, and format |
Correlation-Context | Operation context, which is propagated across all services involved in operation processing. For more information, see Correlation-Context in HTTP protocol |
Service Bus .NET Client autotracing
Starting with version 3.0.0 Microsoft Azure ServiceBus Client for .NET provides tracing instrumentation points that can be hooked by tracing systems, or piece of client code. The instrumentation allows tracking all calls to the Service Bus messaging service from client side. If message processing is done with the message handler pattern, message processing is also instrumented
Tracking with Azure Application Insights
Microsoft Application Insights provides rich performance monitoring capabilities including automagical request and dependency tracking.
Depending on your project type, install Application Insights SDK:
- ASP.NET - install version 2.5-beta2 or higher
- ASP.NET Core - install version 2.2.0-beta2 or higher. These links provide details on installing SDK, creating resources, and configuring SDK (if needed). For non-ASP.NET applications, refer to Azure Application Insights for Console Applications article.
If you use message handler pattern to process messages, you are done: all Service Bus calls done by your service are automatically tracked and correlated with other telemetry items. Otherwise refer to the following example for manual message processing tracking.
Trace message processing
private readonly TelemetryClient telemetryClient;
async Task ProcessAsync(Message message)
{
var activity = message.ExtractActivity();
// If you are using Microsoft.ApplicationInsights package version 2.6-beta or higher, you should call StartOperation<RequestTelemetry>(activity) instead
using (var operation = telemetryClient.StartOperation<RequestTelemetry>("Process", activity.RootId, activity.ParentId))
{
telemetryClient.TrackTrace("Received message");
try
{
// process message
}
catch (Exception ex)
{
telemetryClient.TrackException(ex);
operation.Telemetry.Success = false;
throw;
}
telemetryClient.TrackTrace("Done");
}
}
In this example, RequestTelemetry
is reported for each processed message, having a timestamp, duration, and result (success). The telemetry also has a set of correlation properties.
Nested traces and exceptions reported during message processing are also stamped with correlation properties representing them as 'children' of the RequestTelemetry
.
In case you make calls to supported external components during message processing, they are also automatically tracked and correlated. Refer to Track custom operations with Application Insights .NET SDK for manual tracking and correlation.
If you are running any external code in addition to the Application Insights SDK, expect to see longer duration when viewing Application Insights logs.
It doesn't mean that there was a delay in receiving the message. In this scenario, the message has already been received since the message is passed in as a parameter to the SDK code. And, the name tag in the App Insights logs (Process) indicates that the message is now being processed by your external event processing code. This issue is not Azure-related. Instead, these metrics refer to the efficiency of your external code given that the message has already been received from Service Bus. See this file on GitHub to see where the Process tag is generated and assigned once the message has been received from Service Bus.
Tracking without tracing system
In case your tracing system does not support automatic Service Bus calls tracking you may be looking into adding such support into a tracing system or into your application. This section describes diagnostics events sent by Service Bus .NET client.
Service Bus .NET Client is instrumented using .NET tracing primitives System.Diagnostics.Activity and System.Diagnostics.DiagnosticSource.
Activity
serves as a trace context while DiagnosticSource
is a notification mechanism.
If there is no listener for the DiagnosticSource events, instrumentation is off, keeping zero instrumentation costs. DiagnosticSource gives all control to the listener:
- listener controls which sources and events to listen to
- listener controls event rate and sampling
- events are sent with a payload that provides full context so you can access and modify Message object during the event
Familiarize yourself with DiagnosticSource User Guide before proceeding with implementation.
Let's create a listener for Service Bus events in ASP.NET Core app that writes logs with Microsoft.Extension.Logger. It uses System.Reactive.Core library to subscribe to DiagnosticSource (it's also easy to subscribe to DiagnosticSource without it)
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory factory, IApplicationLifetime applicationLifetime)
{
// configuration...
var serviceBusLogger = factory.CreateLogger("Microsoft.Azure.ServiceBus");
IDisposable innerSubscription = null;
IDisposable outerSubscription = DiagnosticListener.AllListeners.Subscribe(delegate (DiagnosticListener listener)
{
// subscribe to the Service Bus DiagnosticSource
if (listener.Name == "Microsoft.Azure.ServiceBus")
{
// receive event from Service Bus DiagnosticSource
innerSubscription = listener.Subscribe(delegate (KeyValuePair<string, object> evnt)
{
// Log operation details once it's done
if (evnt.Key.EndsWith("Stop"))
{
Activity currentActivity = Activity.Current;
TaskStatus status = (TaskStatus)evnt.Value.GetProperty("Status");
serviceBusLogger.LogInformation($"Operation {currentActivity.OperationName} is finished, Duration={currentActivity.Duration}, Status={status}, Id={currentActivity.Id}, StartTime={currentActivity.StartTimeUtc}");
}
});
}
});
applicationLifetime.ApplicationStopping.Register(() =>
{
outerSubscription?.Dispose();
innerSubscription?.Dispose();
});
}
In this example, listener logs duration, result, unique identifier, and start time for each Service Bus operation.
Events
For every operation, two events are sent: 'Start' and 'Stop'. Most probably, you are only interested in 'Stop' events. They provide the result of operation, as well as start time and duration as Activity properties.
Event payload provides a listener with the context of the operation, it replicates API incoming parameters and return value. 'Stop' event payload has all the properties of 'Start' event payload, so you can ignore 'Start' event completely.
All events also have 'Entity' and 'Endpoint' properties, they are omitted in below table
string Entity
- - Name of the entity (queue, topic, etc.)Uri Endpoint
- Service Bus endpoint URL
Each 'Stop' event has Status
property with TaskStatus
async operation was completed with, that is also omitted in the following table for simplicity.
Here is the full list of instrumented operations:
Operation Name | Tracked API | Specific Payload Properties |
---|---|---|
Microsoft.Azure.ServiceBus.Send | MessageSender.SendAsync | IList<Message> Messages - List of messages being sent |
Microsoft.Azure.ServiceBus.ScheduleMessage | MessageSender.ScheduleMessageAsync | Message Message - Message being processedDateTimeOffset ScheduleEnqueueTimeUtc - Scheduled message offsetlong SequenceNumber - Sequence number of scheduled message ('Stop' event payload) |
Microsoft.Azure.ServiceBus.Cancel | MessageSender.CancelScheduledMessageAsync | long SequenceNumber - Sequence number of te message to be canceled |
Microsoft.Azure.ServiceBus.Receive | MessageReceiver.ReceiveAsync | int RequestedMessageCount - The maximum number of messages that could be received.IList<Message> Messages - List of received messages ('Stop' event payload) |
Microsoft.Azure.ServiceBus.Peek | MessageReceiver.PeekAsync | int FromSequenceNumber - The starting point from which to browse a batch of messages.int RequestedMessageCount - The number of messages to retrieve.IList<Message> Messages - List of received messages ('Stop' event payload) |
Microsoft.Azure.ServiceBus.ReceiveDeferred | MessageReceiver.ReceiveDeferredMessageAsync | IEnumerable<long> SequenceNumbers - The list containing the sequence numbers to receive.IList<Message> Messages - List of received messages ('Stop' event payload) |
Microsoft.Azure.ServiceBus.Complete | MessageReceiver.CompleteAsync | IList<string> LockTokens - The list containing the lock tokens of the corresponding messages to complete. |
Microsoft.Azure.ServiceBus.Abandon | MessageReceiver.AbandonAsync | string LockToken - The lock token of the corresponding message to abandon. |
Microsoft.Azure.ServiceBus.Defer | MessageReceiver.DeferAsync | string LockToken - The lock token of the corresponding message to defer. |
Microsoft.Azure.ServiceBus.DeadLetter | MessageReceiver.DeadLetterAsync | string LockToken - The lock token of the corresponding message to dead letter. |
Microsoft.Azure.ServiceBus.RenewLock | MessageReceiver.RenewLockAsync | string LockToken - The lock token of the corresponding message to renew lock on.DateTime LockedUntilUtc - New lock token expiry date and time in UTC format. ('Stop' event payload) |
Microsoft.Azure.ServiceBus.Process | Message Handler lambda function provided in IReceiverClient.RegisterMessageHandler | Message Message - Message being processed. |
Microsoft.Azure.ServiceBus.ProcessSession | Message Session Handler lambda function provided in IQueueClient.RegisterSessionHandler | Message Message - Message being processed.IMessageSession Session - Session being processed |
Microsoft.Azure.ServiceBus.AddRule | SubscriptionClient.AddRuleAsync | RuleDescription Rule - The rule description that provides the rule to add. |
Microsoft.Azure.ServiceBus.RemoveRule | SubscriptionClient.RemoveRuleAsync | string RuleName - Name of the rule to remove. |
Microsoft.Azure.ServiceBus.GetRules | SubscriptionClient.GetRulesAsync | IEnumerable<RuleDescription> Rules - All rules associated with the subscription. ('Stop' payload only) |
Microsoft.Azure.ServiceBus.AcceptMessageSession | ISessionClient.AcceptMessageSessionAsync | string SessionId - The sessionId present in the messages. |
Microsoft.Azure.ServiceBus.GetSessionState | IMessageSession.GetStateAsync | string SessionId - The sessionId present in the messages.byte [] State - Session state ('Stop' event payload) |
Microsoft.Azure.ServiceBus.SetSessionState | IMessageSession.SetStateAsync | string SessionId - The sessionId present in the messages.byte [] State - Session state |
Microsoft.Azure.ServiceBus.RenewSessionLock | IMessageSession.RenewSessionLockAsync | string SessionId - The sessionId present in the messages. |
Microsoft.Azure.ServiceBus.Exception | any instrumented API | Exception Exception - Exception instance |
In every event, you can access Activity.Current
that holds current operation context.
Logging additional properties
Activity.Current
provides detailed context of current operation and its parents. For more information, see Activity documentation for more details.
Service Bus instrumentation provides additional information in the Activity.Current.Tags
- they hold MessageId
and SessionId
whenever they are available.
Activities that track 'Receive', 'Peek' and 'ReceiveDeferred' event also may have RelatedTo
tag. It holds distinct list of Diagnostic-Id
(s) of messages that were received as a result.
Such operation may result in several unrelated messages to be received. Also, the Diagnostic-Id
is not known when operation starts, so 'Receive' operations could be correlated to 'Process' operations using this Tag only. It's useful when analyzing performance issues to check how long it took to receive the message.
Efficient way to log Tags is to iterate over them, so adding Tags to the preceding example looks like
Activity currentActivity = Activity.Current;
TaskStatus status = (TaskStatus)evnt.Value.GetProperty("Status");
var tagsList = new StringBuilder();
foreach (var tags in currentActivity.Tags)
{
tagsList.Append($", {tags.Key}={tags.Value}");
}
serviceBusLogger.LogInformation($"{currentActivity.OperationName} is finished, Duration={currentActivity.Duration}, Status={status}, Id={currentActivity.Id}, StartTime={currentActivity.StartTimeUtc}{tagsList}");
Filtering and sampling
In some cases, it's desirable to log only part of the events to reduce performance overhead or storage consumption. You could log 'Stop' events only (as in preceding example) or sample percentage of the events.
DiagnosticSource
provide way to achieve it with IsEnabled
predicate. For more information, see Context-Based Filtering in DiagnosticSource.
IsEnabled
may be called multiple times for a single operation to minimize performance impact.
IsEnabled
is called in following sequence:
IsEnabled(<OperationName>, string entity, null)
for example,IsEnabled("Microsoft.Azure.ServiceBus.Send", "MyQueue1")
. Note there is no 'Start' or 'Stop' at the end. Use it to filter out particular operations or queues. If callback returnsfalse
, events for the operation are not sent- For the 'Process' and 'ProcessSession' operations, you also receive
IsEnabled(<OperationName>, string entity, Activity activity)
callback. Use it to filter events based onactivity.Id
or Tags properties.
- For the 'Process' and 'ProcessSession' operations, you also receive
IsEnabled(<OperationName>.Start)
for example,IsEnabled("Microsoft.Azure.ServiceBus.Send.Start")
. Checks whether 'Start' event should be fired. The result only affects 'Start' event, but further instrumentation does not depend on it.
There is no IsEnabled
for 'Stop' event.
If some operation result is exception, IsEnabled("Microsoft.Azure.ServiceBus.Exception")
is called. You could only subscribe to 'Exception' events and prevent the rest of the instrumentation. In this case, you still have to handle such exceptions. Since other instrumentation is disabled, you should not expect trace context to flow with the messages from consumer to producer.
You can use IsEnabled
also implement sampling strategies. Sampling based on the Activity.Id
or Activity.RootId
ensures consistent sampling across all tires (as long as it is propagated by tracing system or by your own code).
In presence of multiple DiagnosticSource
listeners for the same source, it's enough for just one listener to accept the event, so IsEnabled
is not guaranteed to be called,
Next steps
- Application Insights Correlation
- Application Insights Monitor Dependencies to see if REST, SQL, or other external resources are slowing you down.
- Track custom operations with Application Insights .NET SDK