Message Queuing: A Scalable, Highly Available Load-Balancing Solution


Christopher Baldwin
Carl Nolan
Microsoft Corporation

July 2000

Summary: Presents a framework for designing and deploying a scalable, highly available load-balancing solution infrastructure, incorporating the drainer and processor components of the Message Queuing (MSMQ) architecture. (35 printed pages)

Download Msmq.exe.


IntroductionThe MSMQ SolutionWindows ATL ServicesTesting the SolutionConclusionReferences


The ability to build scalable, highly available applications is increasingly necessary for companies seeking to become functional e-businesses. To this end, a recent Microsoft article introduced a workflow architecture based on Microsoft® Windows® DNA. Designed to scale across multiple, highly available servers, with easy scale-out capabilities, this architecture can readily help with designing such applications. See Architectural Design: A Scalable, Highly Available Business Object Architecture.

The discussion that follows employs that architecture to present a framework for designing and deploying a scalable, highly available load-balancing solution infrastructure, incorporating the drainer and processor components of the Message Queuing (MSMQ) architecture. An understanding of the workflow architecture is therefore fundamental for working with the code contained herein.

The MSMQ Solution

In order to deploy the workflow architecture, two processes are introduced for dealing with MSMQ messages. These processes allow messages from various sources, such as an Internet application, to be placed into a general dispatching queue, from where they are routed to a processing queue. Once the messages are in the queue, a COM component is invoked and performs the required business logic. Processes must be implemented to handle both the smart message routing and the instancing of the COM component for the received message.

This article outlines how the architecture is implemented using Windows ATL services for the drainer and processor services. The code provided is not a final solution—although it can be implemented with minimal modifications—but is rather a framework for the reader's own implementation. An important part of this framework is the administrative inclusions (such as performance monitor counters) and the testing framework, both of which are discussed.

Before we embark on the discussion of the MSMQ solution, a brief mention of MSMQ, Transactions, and COM+ Component Services is warranted.


Certain features of MSMQ are used within the MSMQ solution, including the properties of an MSMQ message and MSMQ installation options; the installation options influence the availability of the deployed solution. In addition, the MSMQ solution can benefit from COM+ Component Services, specifically MTS transactions, for support of transactional queues. MSMQ has the ability to use MTS transactions in addition to its own internal transactions.

MSMQ and transactions

Two separate tasks are required to support transactional messages in MSMQ. When an MSMQ queue is defined, either through the GUI or programmatically, you should specify that the queue is transactional. To facilitate a Send or Receive to a transactional queue, you must also enlist the services of a transaction. This transaction can be obtained externally or you can enlist an internal MSMQ transaction.

To enlist an external transaction, you can use the currently available MTS transaction. In this case, MSMQ decides whether the call will truly be part of a transaction. When sending messages within an MTS transaction, the sending application must make sure the current MTS context is transactional. The IsInTransaction method provided by Component Services can be used to verify that the MTS context is participating in a transaction. Similarly, you can verify the transactional nature of the queue under which the message operation is performed by using the queue property IsTransactional.

When transactions are used to send messages, the messages can be sent to any transactional queue regardless of where the queue is located. In this case, nontransactional queues cannot be used. When using a transaction to retrieve messages, the queue must be local and transactional. A message cannot be retrieved from a remote transactional queue, or from a local nontransactional queue within a transaction.

Using these statements regarding transactions, a sample generic Send process would have to use a transaction for a transactional queue, enlisting the current MTS transaction if present, or performing a single message transaction if not. Of course, no transaction is needed if the queue is not transactional:

Dim oc As ObjectContext
Dim qiDispatcher As MSMQQueueInfo
Dim qDispatcher As MSMQQueue
Dim msg As MSMQMessage
Set oc = GetObjectContext()
Set qiDispatcher = New MSMQQueueInfo
Set msgDispatcher = New MSMQMessage
' Code to open the queues and define the message
If (qiDispatcher.IsTransactional) Then
    If (oc.IsInTransaction) Then
        Call msg.Send(qDispatcher, MQTRANSACTION.MQ_MTS_TRANSACTION)
        Call msg.Send(qDispatcher, MQTRANSACTION.MQ_SINGLE_MESSAGE)
    End If
    Call msg.Send(qDispatcher, MQTRANSACTION.MQ_NO_TRANSACTION)
End If

Note   MSMQ does not provide support for remote transactional-read operations. For this reason the MSMQ architecture consists of a drainer and processor service. Without transactional support the processor could support logic to read messages from remote queues. This concept would break if transactions were required. For this reason, a drainer process encompasses the logic of the decision where (to which remote queue) to forward the message, to allow all reads operations to be performed locally, and hence participate in transactions.

MSMQ message properties

Several properties are key to MSMQ messages: the message label and body, and an integer type application-specific property.

The message Label (property PROPID_M_LABEL) is used as a message identifier, being sufficiently unique to determine the origin of a message, allowing all necessary information for processing the message to be collected from a persistent data store such as a relational database. For example, in processing an order this identifier would be the order number.

The message Body (property PROPID_M_BODY) is the main container for the message contents. Strictly speaking, the solution does not require anything to reside in the body. If the label is used as the unique identifier for the message (such as the order number) nothing needs to be sent in the body.

If the processing of a message requires that XML be sent to an order fulfillment center as part of the order processing, it may be beneficial to send the required XML in the body. This way the processing component would have all the required information for dealing with the message. Either way the body section should be kept small.

The last property, AppSpecific (property PROPID_M_APPSPECIFIC), is a long integer designed for application usage. It can be used for saving characteristics of a message (such as the shipper number of an order) to determine how the message should be processed. As we will see later, this property can be used to determine the processing server where a message should be dispatched.

MSMQ installation

The decisions made when installing MSMQ form one important aspect of designing a highly available MSMQ solution infrastructure. Here is a brief outline of some of those considerations. The specific option chosen for this solution will be covered in the discussion of testing at the end of this article.

In the initial installation configuration of MSMQ, the type of installation must be defined. The options here are to install as an MSMQ Server with support for public or private queues or to install as a dependent client. In an MSMQ Server installation, the services that handle a message store are installed. This is not true in the case of a dependent client, which relies on the availability of another server for message storing and processing.

An MSMQ Server installation also provides the ability to access Active Directory (AD); MSMQ uses AD for management of public queues. The other option is support for private queues with direct connections. The AD installation has many benefits over private queues, including integrated security, and more importantly for a high-available applications support for outgoing queues.

When installed using AD, each MSMQ server has a local out-going queue where the messages are temporarily stored. If the required public queue is available, the messages are then routed to that queue. This means that the MSMQ application supports the temporarily unavailability of the server on which the public queue resides. This concept is important is designing a high-available system.

The MSMQ Solution

Now we'll present a framework solution for designing and deploying one's own MSMQ architecture. The framework consists of the following components: a Dispatcher service used to receive messages and forward them on to a processing server, a type library that a COM component must implement to be called by a processing service, and a Processor service that receives incoming messages, each of which is processed with the invocation of a COM component.

These Dispatcher and Processor services relate directly to the drainer and processor components of the MSMQ architecture. Both of these services have a similar structure; they read a message from a queue and perform a process with the message, either writing to another queue or calling a COM component.

Figure 1 is an outline of the base architecture that utilizes multiple MSMQ message stores and corresponding Dispatcher and Processor services.

Figure 1. MSMQ Architecture

To assist in administration, both the Dispatcher and Processor services include performance monitor counters for tracking total messages processed and messages processed per second. The services also make use of an application dead-message queue where messages that cannot be successfully processed are sent.

In addition, the Processor service has to invoke a COM object for each message received. To facilitate this invocation, an interface must be defined for the COM component to implement, allowing the Processor service to call a known method on a defined interface. This interface is called IWebMessage and provides a mechanism for giving feedback to the service on the success of its operations. It has three statuses: successful, bad message, and abort. When a bad message status is returned, the service is designed to write the message into a dead-message queue. The abort status tells the service to shut down processing and can be used for situations where failure to connect to a database or other dependant resources mean that all future message processing will fail.

Application dead-message queue

As already stated, both services rely on an application-defined dead-message queue. When a message is read from the initial queue, it is used to either create a message in a remote queue or invoke a COM component. If the message processing fails, the message is placed into this dead-message queue to ensure that it is not lost. The use of such a dead-message queue is similar in concept to the dead queue that is used when a COM+ application is marked as queued.

Once a message is placed into a dead-message queue, it still needs processing. To do so administratively, you may need to place the original message back into the processing queue. In the samples provided, a Windows script file is used for this purpose.

System dead-message queues

The solution's use of a dead-message queue differs from use of the system queues dead-letter message queue and transactional dead-letter message queue. These queues hold undelivered messages, provided the message sent has a journaling property of MQMSG_DEADLETTER.

When the journaling property of a message is set to MQMSG_DEADLETTER, the source queue manager sends a copy of the message to the dead-letter message queue, if the source queue manager does not receive a confirmation from the target Queue Manager indicating that the message was successfully retrieved from the target queue. This confirmation is not returned if the message is not delivered or is not received (after a timeout period), or in the case of transactional queues, if a receiving transaction is aborted.

Windows ATL Services

The core of the MSMQ architecture is the Dispatcher and Processor services. The purpose of these services is to receive incoming messages from local public queues and perform a defined process for that message. These services are implemented using an ATL service, modified from the template generated by the ATL COM Application wizard.

Before talking about the specifics of each of the Dispatcher and Processor services, it is worthwhile to talk about their common structure. Using register settings, the services obtain a list of queues to process. For each queue an object is constructed that administers the processing of that queue. This object also manages a collection of associated thread objects in addition to the performance monitor counter logic.

The corresponding thread object performs the work of reading the queues. The number of thread objects (and hence, the number of threads) reading the queue is based on a register setting for each queue. Other than the code contained within the thread object, the code for both services is very similar.

Base Service Functionality

The core of the ATL service template is the CServiceModule class, which contains all the code for handling the service functions. Modifications are made to this class to include the actual processing that is defined against each service function, namely starting, stopping, pausing, and continuing service activity. To facilitate this processing, several global procedures have been defined, one for each of the corresponding service functions. The CServiceModule class then uses these functions.

Global procedures

A series of queues is processed within each service, defined by a register entry. The service then creates an object that manages each one of these queues; the class in these examples is called CWorker. The purpose of the global functions is to manage the creation and status notification of these objects. These global functions, defined in the serviceRoutines header and implementation files, are defined as follows:

void initService(CServiceModule *serviceModule);
void stopAllProcessors();
void pauseAllProcessors();
void continueAllProcessors();

Each of these functions operates on a list of object pointers, with an object managed for each queue being processed. The object, to be described later, will have corresponding methods for service management: start, stop, pause, and resume.

The list of input queues is defined in the register as a multi-value string. For example, the code for the initService, which calls the object's start method, and stopAllProcessors, which calls the stop method, is as follows:

DWORD        g_cntWorkers;
Cworker      *g_paWorkers;

void initService(CServiceModule *serviceModule)
    CComBSTR    regKeyName;
    CregKey     reg;
    LONG        lRes;
    DWORD       dwCount;
    TCHAR       active[2048];
    Cworker     *pWorker;
    LPCTSTR     psz;


    // open the registry key
    lRes = reg.Open(HKEY_LOCAL_MACHINE, W2A(regKeyName), KEY_READ);
    if (lRes != ERROR_SUCCESS)
        throw(_T("Unable to open registry key"));

    // get the names of the output queues
    dwCount = sizeof(active);
    memset(active, '\0', dwCount);
    lRes = reg.QueryValue(active, _T("Active Workers"), &dwCount);
    if (lRes != ERROR_SUCCESS)
        throw(_T("Unable to read from registry"));

    // close the registry key

    // determine the number of objects needed
    g_cntWorkers = strcnt(active);

    // allocate the objects
    g_paWorkers = new CWorker[g_cntWorkers];
    if (g_paProcessors == NULL)
        throw(_T("Unable to allocate memory for objects"));

    // start the worker objects
    for (psz = active, dwCount = 0; *psz;
        psz += lstrlen(psz) + 1, dwCount++)
        pWorker = g_paWorkers + dwCount;
        pWorker->start(serviceModule, psz);

void stopAllProcessors()
    DWORD        dwCount;
    CWorker      *pWorker;

    // tell all of the workers to stop running
    for (dwCount = 0; dwCount < g_cntWorkers; dwCount++)
        pWorker = g_paWorkers + dwCount;

Similar functions exist for the pause and continue operations. The function strcnt merely parses the array of queue names, active, and returns the number of TCHAR string arrays found.

CServiceModule Class

The main entry point _tWinMain calls the start method, which in turn calls the Run method of the global instance of the class _Module. It is within the Run method that you need to include code to start the service. The Run method contains a loop for handling the service messages:

MSG msg;
while (GetMessage(&msg, 0, 0, 0))

Prior to entering this message loop the work to be performed by the service must be created on a separate thread. As already mentioned, this thread creation is managed through the initService method:

    // initialize service
catch (LPCTSTR msg)

The error handling code for starting the service needs to take into account the possibility that the service work cannot be initialized. In this case the message loop should not be entered and the service should be shut down.

The Handler method controls the code for stopping, pausing, and continuing the service functionality. Changes to support this functionality are required in the Handler method in order to make the appropriate calls to the defined global functions:

inline void CServiceModule::Handler(DWORD dwOpcode)
    switch (dwOpcode)
        // <MSMQ Example>
        // </MSMQ Example>
        PostThreadMessage(dwThreadID, WM_QUIT, 0, 0);
        // <MSMQ Example>
        // </MSMQ Example>
        // <MSMQ Example>
        // </MSMQ Example>
        LogEvent(_T("Bad service request"));

The final required task is to inform the service that pause is actually supported. Within the init method, the service must be told that pause is an accepted control:

m_status.dwControlsAccepted = SERVICE_ACCEPT_STOP | 

As a default, the ATL wizard generates the service name using the name given to the project. The service name is contained within a resource string called IDS_SERVICENAME. Modifying this string value controls the name of the deployed service.

In addition to the changes for the application, there are two other changes that can be made to the base ATL template. In the RegisterServer method, certain register values are set to register the service. One of these is the service name, set to the original value of IDS_SERVICENAME resource string. This can be changed to the class module variable for the service name, as read from this string:

key.SetValue(m_szServiceName, _T("LocalService"));

The final change to the template is to the LogEvent method. In the base template, this method logs events differently according to how the application is run. When it is running as a service, all events are written to the log file. Alternatively, as when the application is running in debug mode, events are written to the console. This configuration can be changed to use the ATLTRACE to write to a debug window:

lstrcat(chMsg, "\n");

When reviewing the code, you will see that a reference to CServiceModule instance is passed to the worker objects. This allows all event logging to be managed through the CServiceModule LogEvent method.

Including MSMQ support

There are two options when coding for MSMQ in C++: using the MSMQ API or using the COM Interfaces, the respective headers being mq.h and mqoai.h. The MSMQ Services discussed here all use the MSMQ API. To support this, the StdAfx.h file is modified to include this reference:

#include <mq.h>

In addition to support for MSMQ, support is also needed for transactions. Support for transactions in MSMQ is through a transaction handle, obtained through the MSMQ function MQBeginTransaction, or implicitly using a transaction handle of MQ_SINGLE_MESSAGE.

The single message transaction option, although faster, is similar to sending a message to a nontransactional queue within a transaction. Within the services, transactions will be managed with an internal MSMQ transaction. Doing so provides the option of ensuring that the Receive from one queue is within the same transaction as the Send to another queue. Support for an MSMQ transaction requires the inclusion of the transact header file.

#include <transact.h>

To get a quick overview of using the MSMQ API functions, refer to the appropriate section in the Platform SDK.

The CProcessor/CDispatcher class

As mentioned, a class is defined within the Dispatcher and Processor service for handling the processing of a queue, with the list of queues derived from register settings. Within the Dispatcher service, this class is called CDispatcher and within the Processor service it is called CProcessor. For the purpose of this discussion, the class is called CWorker.

The main components of this CWorker class are the public start, stop, pause, and continue methods, used by the CServiceModule class for service administration of individual queues. A corresponding collection of associated thread objects is created to perform the actual work of the service, with one object created for each required worker thread, the required number bring derived from the register.

The main function of the CWorker class is to manage the actual worker threads. The construction of these thread objects is managed by the CWorker start method.

void CProcessor::start(CServiceModule *serviceModule, LPCTSTR name)
    DWORD            dwCount;
    CWorkerThread    *pWorkerThread;

    // service module
    m_serviceModule = serviceModule;
    // service name
    m_name = new TCHAR[lstrlen(name)];
    lstrcpy(m_name, name);

    // initialize the worker object

    // allocate the worker thread objects
    m_paWorkerThreads = new CWorkerThread[m_dwThreadCnt];
    if (m_paWorkerThreads == NULL)
        throw(_T("Unable to allocate memory for thread objects"));

    // start the processor threads
    for (dwCount = 0; dwCount < m_dwThreadCnt; dwCount++)
        pWorkerThread = m_paWorkerThreads + dwCount;

The init method controls reading the register to locate its runtime parameters, including the main MSMQ input queue and the number of threads required to process the queue, and hence the thread objects that the service requires.

In the same fashion as the global methods, when a CWorker class instance is told to pause, continue, or stop, it calls the corresponding methods in the each of the collection of associated thread objects.

The CProcessorThread/CDispatcherThread class

As mentioned previously, for each CWorker class there is an associated thread class. Within the Dispatcher service this class is called CDispatcherThread and within the Processor service it is called CProcessorThread. For the purpose of this discussion, the class will be called CWorkerThread.

The main components of this CWorkerThread class are the public start, stop, pause, and continue methods, used by the CWorker class for service administration of worker threads associated with the processing of a queue.

Internally the CWorkerThread class uses local variables to indicate whether the service should be stopped or paused, both initialized in the class constructor.

      // default to running
      m_runThreadProc = true;
      m_pauseThreadProc = false;

The pause, continue, and stop methods merely alter the values of these variables. The start method is the one that actually creates the thread to perform the service functions, being called by the CWorker start method.

void CWorkerThread::start(CWorker *pWorker)
    // save the pointer to the worker object
    m_pWorker = pWorker;
    // start the thread
    m_hThread = CreateThread(NULL, 0, &threadProc,
        (LPVOID)this, 0, &m_dwThreadId);

Once the thread has been created, control can then be passed back to the service control manager for processing service control messages.

This private (and static) threadProc method performs the required functionality of the service. It is constructed as a simple loop that terminates when the m_runThreadProc variable is set to false. The service pause functionality is implemented by a sleep of one half second.

DWORD WINAPI CWorkerThread::threadProc(LPVOID lpParameter)
    CWorkerThread  *me = (CWorkerThread *) lpParameter;
    HRESULT        hr;

    // open the queues
    while (me->m_runThreadProc)
        if (me->m_pauseThreadProc)
        // read message from input queue
        hr = me->readMessage();
        if (hr == MQ_OK)
            // now we're ready to process the message
        if (hr == MQ_ERROR_IO_TIMEOUT)
            // try again
        // report error and shutdown
    // close the queues and exit
    return 0;

In this code the queue management is in the init, readMessage, and closeQueues methods. Depending on the type of service (Dispatcher or Processor), the functionality of the thread object will vary.

MSMQ and Transactions

Both services operate in exactly the same fashion as transactions and transactional queues. The CWorkerThread class queries for the transactional property of the queue (property PROPID_Q_TRANSACTION) and validates their equivalence. If the input queue is transactional, then the application requires that all output queues are transactional, and vice versa. Opening the queues is managed by the openQueue method:

bool CProcessorThread::openQueue(LPCTSTR lpcsPathName,
    QUEUEHANDLE *hQueue, DWORD *TxnType, DWORD dwAccess)

    LPWSTR            wBuffer;
    HRESULT           hr;
    DWORD             dwFormatNameBufferLength;
    WCHAR             wszFormatNameBuffer[256];
    MQQUEUEPROPS      qprops;
    HRESULT           aQueueStatus[NUMBEROFPROPERTIES];
    DWORD             cPropId = 0;

    wBuffer = A2W(lpcsPathName);

    // get format name for queue
    dwFormatNameBufferLength = 256;
    hr = MQPathNameToFormatName(wBuffer,
    wszFormatNameBuffer, &dwFormatNameBufferLength);
    if (FAILED(hr))
        return false;
    // open queue
    hr = MQOpenQueue(wszFormatNameBuffer,
        dwAccess, MQ_DENY_NONE, hQueue);
    if (FAILED(hr))
        return false;

    aQueuePropId[cPropId] = PROPID_Q_TRANSACTION; // Property ID
    aQueuePropVar[cPropId].vt = VT_UI1;           // Type indicator

    // Initialize the MQQUEUEPROPS structure.
    qprops.cProp = cPropId;           // Number of properties
    qprops.aPropID = aQueuePropId;    // Ids of properties
    qprops.aPropVar = aQueuePropVar;  // Values of properties
    qprops.aStatus = aQueueStatus;    // Error reports

    // Get queue property.
    hr = MQGetQueueProperties(wszFormatNameBuffer, &qprops);
    if (FAILED(hr))
        return false;

    // return the transaction type
    *TxnType = aQueuePropVar[0].bVal;

    return true;

The openQueue method is called by the class's init method, which is where the transactional property validation occurs. A Boolean value is then set to indicate the transactional mode under which all queue activity is managed. For the simple case of a single input and error queue (as in the processor service), the validation code is as follows:

DWORD        inQueueTxnType;
DWORD        erQueueTxnType;
LPWSTR       wBuffer;
DWORD        dwcPropId;


// open the input queue
if (openQueue(m_pProcessor->inQueueName,
    &m_hInQueue, &inQueueTxnType, MQ_RECEIVE_ACCESS) == false)
    throw(_T("Unable to open input queue"));
// open the error queue
if (openQueue(m_pProcessor->erQueueName,
    &m_hErQueue, &erQueueTxnType, MQ_SEND_ACCESS) == false)
    throw(_T("Unable to open error queue"));

// check queue transaction consistancy
if (inQueueTxnType != erQueueTxnType)
    throw("Queue transactional types are not consistant");

m_isTransactional = inQueueTxnType;

Once it is decided that all queues have the same transactional mode, managing queue receives and sends becomes easier. For each message to be processed, if transactions are required an internal MSMQ transaction handle is obtained and used. For the case of nontransactional queues, a no transaction handle is used:

HRESULT        hr;
ITransaction   *m_pTransaction;

if (m_isTransactional)
    hr = MQBeginTransaction(&m_pTransaction);
    if (FAILED(hr))
        throw("MQBeginTransaction failed.");
    m_pTransaction = MQ_NO_TRANSACTION;

This transaction handle is then used for the message receive and all corresponding sends, including those to the application dead-message queue.

MQReceiveMessage(m_hInQueue, 500, MQ_ACTION_RECEIVE, m_pmsgProps,
    NULL, NULL, NULL, m_pTransaction);

Once a message has been successfully processed, the transaction requires committing. Successful completion of a message means the message was sent to a processing queue, passed to a COM component for processing, or written to the application dead-message queue. A transaction abort is only needed in the event of a processing error; remember all failed messages are placed into an application dead-message queue:

if (m_isTransactional)
    m_pTransaction->Commit(0, 0, 0);
    m_pTransaction = NULL;

If a message receives timeout errors, the initiated transaction should be terminated by the use of a release method.

Error handling within the CWorkerThread object is managed through a CError class. The class contains a Boolean value to indicate whether the thread should continue executing after an error is thrown. Listed below are the catch statements for the CError class.

catch(CError err)
    // send message to error queue
    hr = me->sendMessage();
    if (FAILED(hr))
        wsprintf(errBuff, "Queue error = 0x%x\n", hr);
    me->logEvent("Message sent to error queue\t%s", err.msg);
    if (err.stopProcessing)
        throw("Thread terminated");

When an error is caught in processing a message, the message needs to be written to the application dead-message queue. The catch statements for a CError class take care of this requirement. In addition if the CError property stopProcessing is set, a new error is thrown to terminate the thread.

Register management

Both the MSMQ services use the register for configuration of such items as queue names and number of threads. The base key for all these settings are managed through a resource string value; IDS_REGKEY. This value is then read as required:

CComBSTR regKeyName;

For each service there is a top-level multi-byte string value that contains a list of secondary key values. Each secondary key value represents an input queue for the service to handle. Within this secondary key the values represent the configuration parameters for the service. These parameters will be described in detail when each service is outlined.

CAdmin Administration class

The basic structure of each service is designed to manage several processes, one for each queue requiring processing. Each process is named with the register subkey from which its runtime parameters are derived. To this end, each service has several independent processes that the service control manager can only manage as a single entity.

To better facilitate pausing and continuing individual processes, each service has been extended with a COM interface. The interface definition for this COM class:

interface IAdmin : IDispatch
    typedef [uuid(D78FBEBA-2263-40d1-A41F-3A575B3C9C28)]
    enum {
        SvcStatus_Unknown     = 0,
        SvcStatus_Stopped     = 1,
        SvcStatus_Paused      = 2,
        SvcStatus_Running     = 4,
    } SvcStatus;

    [propget, id(1), helpstring("property Count")]
        HRESULT Count([out, retval] long *pVal);
    [propget, id(2), helpstring("property Name")]
        HRESULT Name(long idx, [out, retval] BSTR *pVal);
    [propget, id(3), helpstring("property Status")]
        HRESULT Status(long idx, [out, retval] SvcStatus *pVal);
    [id(4), helpstring("method Pause")]
        HRESULT Pause(long idx);
    [id(5), helpstring("method Resume")]
        HRESULT Resume(long idx);

The implementation class, CAdmin, allows for pausing and continuing a queue process and obtaining the process name. The class is very simple but can easily be extended to allow greater functionality such as adding, removing, or stopping service processes. The currently implemented code is very simple:

STDMETHODIMP CAdmin::get_Count(long *pVal)
    *pVal = g_cntWorkers;
    return S_OK;

STDMETHODIMP CAdmin::get_Status(long idx, SvcStatus *pVal)
    Cworker    *pWorker;
    pWorker = g_paWorkers + idx;
    *pVal = (SvcStatus)pWorker->status();
    return S_OK;

STDMETHODIMP CAdmin::get_Name(long idx, BSTR *pVal)
    Cworker    *pWorker;
    CComBSTR   bstr;
    pWorker = g_paWorkers + idx;
    bstr = pWorker->name;
    *pVal = bstr;
    return S_OK;

STDMETHODIMP CAdmin::Pause(long idx)
    CWorker    *pWorker;
    pWorker = g_paWorkers + idx;
    return S_OK;

STDMETHODIMP CAdmin::Resume(long idx)
    CWorker    *pWorker;
    pWorker = g_paWorkers + idx;
    return S_OK;

The service manages an array of created objects internally. The CAdmin class merely operates on these objects in exactly the same fashion as the CServiceModule class. This works since the global variable g_paWorkers is the address of the first array item. Through the index the appropriate object is referenced and the standard pause and continue methods are called.

As the administrations of the service processes are exposed through COM, it is easy to write an administration application or an MMC snap-in. The sample code has such a Visual Basic administrative application.

Performance monitor counters

Any well-constructed service should have support for instrumentation, namely performance monitor counters. The Dispatcher and Processor services are no exception. Counters are included for the total number of messages processed and the messages processed a second.

Each queue to be processed by a service is managed by an object and several associated thread objects. Counter instances are implemented for the main object representing all messages being handled for a queue and for each thread object, giving the message that is processed on that thread.

For an overview of the method used to include performance monitor counters in the services, see Custom Performance Monitoring for Your Windows NT Applications.

Within the services the following files need to be added to the project: CmnHdr header, Optex header and implementation, PrfData header and implementation, and RegSettings header files. It is important that pre-compiled headers be turned off for these implementation files. Each service then requires the definition of the performance monitor counters and objects. This definition is done through the MqXXXSvcPrfData header and implementation files.

Several changes are required within the service template files to support performance monitor counters. Within the _tWinMain function the performance monitor counters must first be registered:

// Unregister PerfMon         
// Register PerfMon
GetModuleFileNameW(hInstance, szPath, sizeof(szPath));
wcscpy(wcsrchr(szPath, L'\\') + 1, L"MQDspSvcCtrs.dll");

When the service is started through the initService function, performance monitor counters require activation:

// Activate PerfMon counters

Within each processor, an instance of the CWorker class, the performance monitor counter instances require allocation. This is achieved with the CWorker init method:

// allocate the PerfMon counter instances
dwCount = cntQueues;
m_paPrfInstId = new CPrfData::INSTID[dwCount];
if (m_paPrfInstId == NULL)
    throw(_T("Unable to allocate memory for PerfMon"));
memset(m_paPrfInstId, '\0', sizeof(CPrfData::INSTID) * dwCount);

For each queue the appropriate performance monitor counter instance needs to be added:

// initialize PerfMon instance for input queue
wBuffer = A2W(inQueueName);
*m_paPrfInstId  = g_PrfData.AddInstance(PRFOBJ_DISPATCHER, wBuffer);

All that is left is to report the actual counter data. This is all managed by the CWorker class incCounters method.

void CDispatcher::incCounters(DWORD index)
        *(m_paPrfInstId + index))++; 
        *(m_paPrfInstId + index))++;

This method is called for each message processed by the CWorkerThread object.

This implementation of performance monitor counters is managed through a memory-mapped file and requires a DLL that reports the counters. The services actually record the counters for the DLL to report. The final implementation task is the compilation of this DLL.

The COM Component

An interface has been defined to support the Processor service calling a COM component to perform the required business logic. This interface supports a single method, Process, by which the Processor service can pass in the required contents of the message to the component. An important part of the interface is the definition of an enumeration to allow the component to inform the service of the success or failure of its processing task.

Interface definition

Here is the interface definition that is consumed by the Processor service and implemented by the COM component.

interface IWebMessage : IDispatch
    typedef [uuid(C572C0DD-E25B-4f85-831D-A1D20312312A)]
    enum {
        WebMSG_Return_Good    = 0,
        WebMSG_Return_Bad     = 1,
        WebMSG_Return_Abort   = 2,
    } WebMSG_Return;
    [id(1), helpstring("Method to Process a MSMQ Message")]
    HRESULT Process(      
        [in] BSTR        sMessageLabel,
        [in] BSTR        sMessageBody,
        [in] long        lAppSpecific,
        [out, retval]    WebMSG_Return *lReturn);

The parameters for the Process method are the message label and body, both defined as BSTRs, and the AppSpecific property of the message defined as a long integer. The single return value indicates the success or failure of the process, of type WebMSG_Return.

The WebMSG_Return enumeration has three values to represent the following cases: the processing was successful; the processing failed due to a bad message; and the process failed due to a failure that will cause all other messages to fail.

The two failure statuses (Bad and Abort) are for two separate types of failures. In the event that the component receives a message that cannot be processed, the Processor service is informed of the failure to process the single message, a Bad return status. In this case the processor takes the action of writing the failed message into a dead-message queue.

Serious errors warrant an Abort return status, which informs the Processor service to stop processing any more messages. Such an error might be the failure of the component to connect to a required resource (a relational database, for example). In this case the processor takes the action of terminating the queue handling process.

Interface implementation

To support calls from the Processor service, the COM component is required to implement the IWebMessage interface. The only other requirement for this COM component is that it be apartment threaded. To this end one can very easily create a component in Visual Basic. An example for such would be:

Implements WebMessage
' Rest of component logic
Private Function PersistMessage( _
    sMessageLabel As String, _
    sMessageBody As String, _
    lAppSpecific As Long _
    ) As Boolean
On Error GoTo Err_Handler
    ' Perform Business logic
    ' Return true if all went well
    PersistMessage = True
    Exit Function

    ' Return false if error
    PersistMessage = False
    Exit Function

End Function

Private Function WebMessage_Process ( _
    ByVal sMessageLabel As String, _
    ByVal sMessageBody As String, _
    ByVal lAppSpecific As Long _
    ) As WebMessageLib.WebMSG_Return
On Error GoTo Err_Handler
    Dim bGood As Boolean
    ' If successful return GOOD else return ABORT
    bGood = PersistMessage(sMessageLabel, sMessageBody, lAppSpecific)
    If bGood Then
        WebMessage_Process = WebMSG_Return_Good
        WebMessage_Process = WebMSG_Return_Abort
    End If
    Exit Function
    ' In the event of an error return a BAD
    WebMessage_Process = WebMSG_Return_Bad
    Exit Function
End Function

The implements statement forces the requirement of having the private method WebMessage_Process. This method relates directly to the Process method on the IWebMessage interface. In this example the Process method calls directly into the PersistMethod method, this method being where all the business logic is contained.

Finally, in writing such a Visual Basic component you should remember to change several properties of the project, the most important of these to check are the options "Unattended Execution" and "Retained In Memory." You should also modify the DLL Base Address from the default value; doing so ensures faster activation times for the DLL. The threading model for the DLL must also be Apartment.

The Dispatcher Service

The role of the Dispatcher service is to receive incoming messages from an application (a Web application or possibly another Dispatcher service) and dispatch them to a predefined queue for further processing. For each message that a Dispatcher service receives, a new message is written either to a queue for processing or to an application dead-message queue. The service should always receive from local queues and send to remote queues, and in this way support for transactional queues can be enabled.

The logic for the Dispatcher is controlled by register settings, which cover the input queues, error queue, and output queues, in addition to an indicator for the service to decide which output queue to use.

Queue decision logic

The key function for the Dispatcher service is deciding where to forward the received message (to which queue). You can choose various means for making this decision. The sample service uses one of three methods configured from the register: round robin, a queue number indicated in the AppSpecific property, and a queue number indicated in an offset in the label. You can of course implement your own custom logic.

In all cases the service will read a list of output queues from the register. An index then indicates which queue to send the message to:

MQSendMessage(*(m_paQueueHandles + index), m_pmsgProps,

Three separate procedures are used to calculate this index, based on the required queue selection method, again configurable from the register:

switch (m_dwDispatchMode)
case 0:
    index = queueIndex_RoundRobin();
case 1:
    index = queueIndex_FromLabel();
case 2:
    index = queueIndex_FromAppSpecific();

In the round robin method, an index is incremented from the last index value used; the modulo is taken with the maximum number of queues to calculate the final value:

DWORD index = m_dwLastRoundRobinQueue + 1;
m_dwLastRoundRobinQueue =
    (m_dwLastRoundRobinQueue + 1) % m_cntOutputQueues;

A round robin technique offers a very simple load-balancing solution for dispatching messages to a series of processing servers.

The other two methods make an informed decision as to which queue to forward the message to. The two provided implementations operate in a similar fashion; they use an integer offset on the array of queue names. One takes the offset from the AppSpecific property of the message, the other from an offset in the label. Using the AppSpecific property, the calculation of the index is very simple:

DWORD index = m_paVariant[4].ulVal;

Using logic that makes this informed decision, you have the option of specifying a particular server for processing. An order might be forwarded for shipping to separate servers based on the shipping method. This way load balancing can be achieved by using a server dedicated to a particular shipping method.

In this scenario, if one shipping method required extra load balancing, the Dispatcher service can forward messages to another Dispatcher service, which can use a round robin technique to provide further load balancing.

Register settings

The main register entry for the Dispatcher service is the multi-value string entry "Active Dispatchers." Each string entry represents an input queue for processing, and hence the creation of a CDispatcher class. This operation is managed in the global initService procedure.

Within the CDispatcher class a series of register values defines the processing of the class, managed in the init method. Table 1 defines the register entries for the service.

Table 1. CDispatcher register entries

Entry Name Entry Type Meaning
Dispatcher Threads REG_DWORD Number of threads and hence CDispatcherThread objects required to manage reading the input queue.
Dispatcher Mode REG_DWORD Definition of how to decide which output queue to forward the message to.
Index From Label Offset REG_DWORD The position in the label of the message from which an integer can be derived for the index of the output queue.
Input Queue REG_SZ The name of the queue from which to receive messages.
Output Queues REG_MULTI_SZ The name of the queues to which the Dispatcher should send message.
Error Queue REG_SZ The name of the queue to which erroneous messages are sent.

The Dispatcher Mode entry informs the CDispatcher class which method to call to calculate the new queue index position. The possible values of 0, 1, or 2 represent round robin, determination from the label offset, and determination from the AppSpecific property, respectively.

Send and dead-message queue

The purpose of the Dispatcher service is to receive a message and send it to another queue for processing. In the event that this process fails, the received message is sent to an application defined dead-message queue.

There are various reasons that a message may be written to the dead-message queue. The obvious one is that the send to the receiving queue fails. In this circumstance, the processing continues with the next message.

When monitoring the service remember that a class can be processing several queues on several threads. Because of errors, at any point in time, queues may not be processed on all possible threads.

Threads and transactions

The total activity list for this service is receiving and sending MSMQ messages. If you use transactional queues and hence MSMQ transactions, the throughput of a thread receiving and sending messages is greatly diminished. Actual examples of throughputs will be shown in the testing section.

If you are using nontransactional queues, you will only require one or two threads per processor for each input queue. Transactional queues are an order of magnitude slower because of the extra amount of disk activity that a transaction incurs. Eight or sixteen threads per processor would improve the throughput. In either case, you should perform scalability numbers on the actual deployment and find the optimal number of threads, and possibly processors, required for handling peak dispatching workloads.

The Processor Service

The role of the Processor service is to receive incoming messages and for each message call a COM component that implements a given interface. Upon successful completion of the COM method call, the next message is processed. If the COM component informs the service that the message processing failed, the message is the written to the application dead-message queue.

The apartment-threaded COM component encompasses the required business logic for the processing step. This step can include sending a new message to another Dispatcher service for further processing.

The logic for the processor is controlled by register settings. This covers the input queues, error queue, and ProgId of the COM component to call.

Register settings

The main register entry for the Dispatcher service is the multi-value string entry "Active Processors." Each string entry represents an input queue for processing, and hence the creation of a CProcessor class. This operation is managed in the global initService procedure. Within each CProcessor object a series of register values defines the processing of the class, managed in the init procedure. Table 2 defines the register entries for the service.

Table2. CDispatcher register entries

Entry Name Entry Type Meaning
Processor Threads REG_DWORD Number of threads and hence CProcessorThread objects required to manage reading the input queue.
Prog Id REG_SZ Defines the COM method to be called to process the message.
Input Queue REG_SZ The name of the queue from which to receive messages.
Error Queue REG_SZ The name of the queue where erroneous messages are sent.

The Prog Id entry is the programmatic identifier (ProgID) of the class to be invoked.

The IWebMessage interface

For each message to be processed the service invokes a COM component that implements the IWebMessage interface. To facilitate this several things must happen. First, COM must be initialized on the thread processing the message:


After this you can obtain a safe reference to the required IWebMessage interface of the COM component:

CComPtr<IWebMessage>   pCP;
HRESULT                hr;
hr = pCP.CoCreateInstance(me->m_pProcessor->progId);

Finally, the Process method can be called with the parameters from the last message received:

CComBSTR               label;
CComBSTR               body;
long                   filter;
WebMSG_Return          msgRC;
HRESULT                hr;

label = me->m_wszLabelBuffer;
body = CComBSTR((me->m_paVariant[1].ulVal / 2) + 1, "");
memcpy(body.m_str, me->m_paVariant[0].caub.pElems,
filter = me->m_paVariant[4].iVal;
hr = pCP->Process(label, body, filter, &msgRC);

After the Process method call, the service needs to interpret the return code from the call. One of three possible values is available: Good Message, Bad Message, and Abort Processing. The CError class is used in this circumstance for throwing errors, to allow the error capture section to correctly determine whether or not the thread should continue processing:

switch (msgRC)
case WebMSG_Return_Good:
case WebMSG_Return_Bad:
    err.stopProcessing = false;
    err.format("Returned WebMSG_Return_Bad");
case WebMSG_Return_Abort:
    err.stopProcessing = true;
    err.format("Returned WebMSG_Return_Abort");
    err.stopProcessing = true;
    err.format("Invalid Return");

The error capture section, in addition to deciding the fate of the thread, sends the message to the application dead-message queue.

The final action of the thread, since COM is being used, is to uninitialize COM on the thread:



Unlike the Dispatcher service, the majority of the processing of a message is likely to be contained within the COM component that is called by the service. For this reason the number of threads that should be allocated to processing a queue depends on the work being done by this component. A component that has heavy CPU utilization requires fewer threads than one dependent on external resources, such as a database server or http server.

As a baseline you could start with two threads per processor for nontransactional queues and eight threads per processor for transactional queues; adjusting this based on actual performance testing.

Testing the Solution

A large part of developing this solution framework involved developing a comprehensive testing framework. The testing framework had to provide realistic scalability numbers as well as adequate unit and stress testing for the Dispatcher and Processor services. Scripts and components were written and are provided as part of the sample code.

Two important components were used, one to send the message into the dispatcher queue and another that implemented the IWebMessage interface for calling by the Processor service. The ProgId's for the components are respectively called WebMessageCreate.Order and WebMessageRead.Order. Throughout the testing both these components were managed as COM+ applications.

The testing scenario consisted of reading the orders from the Northwind sample database and loading them into a dispatcher queue, using WebMessageCreate.Order. The order Shipper Id was used as a determining metric for deciding which processing queue to forward the message to. The Shipper Id had three possible values: 0, 1, and 2, and was placed into the message AppSpecific property.

The WebMessageRead.Order component then took this message and persisted it into a secondary Northwind database, through the use of a stored procedure. The component thus performed substantial processing per message compared to the Processor service.

Figure 2 outlines the infrastructure used in testing the solution framework. It consisted of two Web servers for loading messages into the dispatcher queue, a dispatcher, three processing servers, and two database servers. All machines resided within the same domain, with the domain controller being a separate machine. The MSMQ server component was installed on all machines, including the domain controller.

Figure 2. Testing Infrastructure

The database server was a shared quad Xeon 550 megahertz (MHz) processor machine with 2 gigabytes (GB) of memory. The Web and processor servers shared this database server because the Web servers used a static image of the orders table that only required a single database access for the length of the test. The relation database used was Microsoft SQL Server 7.0.

The Web server was a single 733 MHz processor machine with 256 megabytes (MB) of memory. On each server a script was used that created a static image of the order database and repeatedly called the WebMessageCreate.Order method to load each order into the dispatcher queue. The component was managed as a COM+ application with a construction string for the dispatcher queue name. Multiple instances of the loading script were run on each machine to provide extra load to the Dispatcher service.

The dispatcher server was a quad Xeon 550 MHz processor machine with 2 GB of memory. This server ran the Dispatcher service. The Dispatcher processed a single input queue using the AppSpecific property to decide where, among the three processor servers, to send the message.

The Processor service was a dual 733 MHz processor machine with 512 MB of memory. This server ran the Processor service, which calls the WebMessageRead.Order method to process each received message. The component was managed as a COM+ Application. The database connection parameters where managed through a UDL file.

Performance Metrics

Performance testing was accomplished under two different scenarios using transactional and nontransactional queues. For each case several tests were performed to show the scalability of both the Dispatcher and Processor services, utilizing differing numbers of processing threads.

Nontransactional queues

For nontransactional queues the Dispatcher service was configured to run two threads per processor, eight threads in all. Each of the Processor services was configured to run two threads per processor, four threads per server. Several tests were run with up to twelve simultaneously loading scripts. Tables 3 and 4 outline the results for two loaders per machine and six loaders per machine, respectively.

Table 3. Nontransactional queues with four loading scripts

Two loaders per machine, four in total.

Processing Time = 18:33 minutes

Server Nbr Msgs Msgs/Sec Msgs in Queue % Proc Time
  Total Avg Max Avg Max Avg Max
Dispatcher 149400 134 331 0 1 3 8
Processor 1 44820 40 99 1 68 23 60
Processor 2 58680 53 130 8 125 30 76
Processor 3 45900 41 102 1 79 23 59

From the Table 3 figures, we can see that the Dispatcher service handled the workload with very minimal server activity. At this load the Processor servers are registering minimal activity.

Table 4. Nontransactional queues with 12 loading scripts

Six loaders per machine, 12 in total.

Processing Time = 19:59 minutes

Server Nbr Msgs Msgs/Sec Msgs in Queue % Proc Time
  Total Avg Max Avg Max Avg Max
Dispatcher 4448200 376 731 0 1 7 14
Processor 1 134460 112 160 76 496 64 93
Processor 2 176040 146 168 1868 4694 84 98
Processor 3 137700 114 161 82 499 65 92

From the Table 4 figures we can still see that the Dispatcher service handled the workload with very minimal server activity. The Dispatcher service running on a quad processing machine for nontransactional queues is capable of handling up to fifteen million messages an hour.

In contrast, the second Processor service is hitting its threshold. This constraint of about half a million messages an hour on a dual processor machine is due to the workload of the associated COM component.

Transactional queues

For transactional queue the Dispatcher service was configured to run 16 threads per processor, 64 threads in all. Each of the Processor services was configured to run eight threads per processor, 16 threads per server. Several tests were run with up to eight simultaneously loading scripts. Tables 5 and 6 outline the results for two loaders per machine and four loaders per machine, respectively.

Table 5. Transactional queues with four loading scripts

Two loaders per machine, four in total.

Processing Time = 25:43 minutes

Server Nbr Msgs Msgs/Sec Msgs in Queue % Proc Time
  Total Avg Max Avg Max Avg Max
Dispatcher 149400 96 197 145 785 8 15
Processor 1 44820 29 62 3 80 20 42
Processor 2 58677 38 82 4 27 25 54
Processor 3 45987 29 61 3 19 20 42

Table 5 figures reveal that the Dispatcher service handled the workload with very minimal server activity, although processing the transactional messages took longer than for nontransactional messages. At this load the Processor servers are registering minimal activity.

Table 6. Transactional queues with eight loading scripts

Four loaders per machine, eight in total.

Processing Time = 53:24 minutes

Server Nbr Msgs Msgs/Sec Msgs in Queue % Proc Time
  Total Avg Max Avg Max Avg Max
Dispatcher 298800 92 218 81147 172400 7 16
Processor 1 89638 27 67 3 20 19 43
Processor 2 117356 36 81 4 31 24 55
Processor 3 91792 28 67 3 21 19 43

Table 6 figures reveal that the Dispatcher service handled the workload with very minimal processor activity. Running this large amount of transactional messages stressed the Dispatcher service at the disk level. The Dispatcher service running on a quad processing machine for transactional queues processed a third of a million messages an hour. This number could be increased with a faster disk subsystem.

In contrast, the Processor services are still not stressed. This difference reflects the fact that most of the workload of the service is contained within the associated COM component, which is capable of half a million messages an hour.


This article has provided a framework for your implementation of MSMQ. The provided services should offer the flexibility to easily construct a scalable, highly available application utilizing MSMQ. In addition, you should also have the tools to develop this framework into a complete solution.

Using multiple Dispatcher and Processor services spread across multiple machines, you have the capability to derive a complete workflow-based architecture that can be used by your organization to create a scalable, high-availability application running on multiple servers.


Further information about MSMQ and technologies referred to in this article can be found at:

Architectural Design: A Scalable, Highly Available Business Object Architecture (Introduces the Microsoft Windows DNA architecture from which the solution presented was derived.)

Custom Performance Monitoring for Your Windows NT Applications

Instrumenting Windows NT Applications with Performance Monitor

MSMQ Web Site (Includes a developer site and the MSMQ Trigger download.)

Microsoft Systems Journal Q & A: Win 32

The Platform SDK, available on the MSDN CD and Online Libraries, rovides a complete overview of MSMQ.

About the Authors

Christopher Baldwin and Carl Nolan both work in Northern California for the Western Region of the Microsoft e-Business Solutions Team. The team focuses on development of Internet-based solutions using the Windows DNA platform. They can be reached at and