C-C++ Code Example: Reading Messages Asynchronously Using Completion Ports

 

Applies To: Windows 10, Windows 7, Windows 8, Windows 8.1, Windows Server 2008, Windows Server 2008 R2, Windows Server 2012, Windows Server 2012 R2, Windows Server Technical Preview, Windows Vista

The following example includes three application-defined functions that can be used for asynchronously reading the messages arriving in a specified queue using completion ports. The first function initiates a receive operation. The second function is the start routine executed by the worker threads created in the third function. The third function receives the format name of the queue to be monitored from a caller in the application and orchestrates all the operations involved in receiving messages asynchronously using a completion port.

For more information about reading messages asynchronously, see Asynchronous Reading.

An application using these functions must include the Windows.h, Stdio.h, and Mq.h header files.

This example uses application-defined RECEIVE_CONTEXT structures, which associate an OVERLAPPED structure with the handle to the queue specified and an MQMSGPROPS structure for retrieving properties of the message received. The following code defines this structure.

struct RECEIVE_CONTEXT  
{  
    OVERLAPPED ov;                  // OVERLAPPED structure  
    HANDLE hQueue;                  // Queue handle  
    MQMSGPROPS *pmsgprops;          // Message properties structure  
};  

Receive Operation

To initiate a receive operation

  1. Call MQReceiveMessage to receive a message in the queue specified by the queue handle in the RECEIVE_CONTEXT structure provided.

  2. When reading messages asynchronously, MQReceiveMessage returns MQ_OK if a suitable message is found. Otherwise, MQReceiveMessage returns immediately with the return value MQ_INFORMATION_OPERATION_PENDING. This return value indicates that the operation is pending and will be completed when a suitable message can be found or will be terminated if the time-out period elapses.

  3. When a message is available, the message properties specified in the MQMSGPROPS structure indicated in the RECEIVE_CONTEXT structure are retrieved, and the RECEIVE_CONTEXT structure is passed back to the caller as an out parameter. The OVERLAPPED structure is also passed back to the caller as part of the RECEIVE_CONTEXT structure.

Start Routine for a Thread

To implement the start routine for a thread

  1. Call GetQueuedCompletionStatus to wait for notification of completion of a pending receive operation associated with the completion port. This call returns the OVERLAPPED structure received on notification as an out parameter. If this call fails, a NULL pointer to the OVERLAPPED structure is returned and fSuccess is set to FALSE. When fSuccess is TRUE, the OVERLAPPED structure may still contain an error code.

  2. Call MQGetOverlappedResult to retrieve the success or error code from the OVERLAPPED structure. If no message is received before the time-out period elapses, an error is returned, and the start routine returns, terminating the thread in an implicit call to ExitThread.

  3. Process the message received. This example displays the application-specific data transmitted in the PROPID_M_APPSPECIFIC property of the message along with the thread identifier.

  4. Initiate the next receive operation using the same RECEIVE_CONTEXT structure.

Use a Completion Port

To receive messages asynchronously using a completion port

  1. Validate the input string provided by the caller in the application and then use it as the format name to open the queue to be monitored and obtain a queue handle.

  2. Call CreateIoCompletionPort to create a new completion port without associating it with a specific queue.

  3. In a loop, call CreateThread to create two worker threads. CreateThread creates a thread and implicitly calls its start routine. Multiple threads are created to demonstrate scaling.

  4. Call CreateIoCompletionPort again to associate the completion port with the queue using the handle obtained.

  5. In a loop, create receive contexts for four receive operations and use each of them to initiate a receive operation. Multiple receive contexts are created to demonstrate scaling.

  6. Call Sleep to keep this function from terminating during the sleep time specified by the caller.

Code Example

The following code example can be run on all versions of Message Queuing, but cannot be run on dependent clients.

HRESULT EnableAsyncReceive(  
                           RECEIVE_CONTEXT* prc  
                           )  
{  
  
  // Validate the input parameter.  
  if (prc == NULL)  
  {  
    return MQ_ERROR_INVALID_PARAMETER;  
  }  
  
  // Initiate a receive operation.  
  return MQReceiveMessage(  
                 prc->hQueue,         // Queue handle  
                 INFINITE,            // Time to wait for a message  
                 MQ_ACTION_RECEIVE,  
                 prc->pmsgprops,      // Pointer to an MQMSGPROPS structure  
                 &prc->ov,            // OVERLAPPED structure   
                 NULL,                // No callback function  
                 NULL,                // No cursor  
                 MQ_NO_TRANSACTION    // Not in a transaction  
                 );  
}  
  
DWORD WINAPI CompletionPortThread(LPVOID lParam)  
{  
  HANDLE hPort = (HANDLE)lParam;  
  DWORD dwNoOfBytes = 0;  
  ULONG_PTR ulKey = 0;  
  OVERLAPPED* pov = NULL;  
  HRESULT hr = MQ_OK;  
  
  for ( ; ; )  
  {  
  
    // Wait for a completion notification.  
    pov = NULL;  
    BOOL fSuccess = GetQueuedCompletionStatus(  
                    hPort,         // Completion port handle  
                    &dwNoOfBytes,  // Bytes transferred  
                    &ulKey,  
                    &pov,          // OVERLAPPED structure  
                    INFINITE       // Notification time-out interval  
                    );  
  
    //  
    // Add code to handle specific errors when fSuccess is FALSE   
    // and provide a way to exit the loop.  
    //  
  
    if (pov == NULL)  
    {  
  
      // An unrecoverable error occurred in the completion port. Wait for the next notification.  
      continue;  
    }  
  
    // Get the base address of the RECEIVE_CONTEXT structure   
    // containing the OVERLAPPED structure received.  
    RECEIVE_CONTEXT* prc = CONTAINING_RECORD(pov, RECEIVE_CONTEXT, ov);  
  
    // Get the final status (last error) for the message received.  
    hr = MQGetOverlappedResult(&prc->ov);  
    if (FAILED(hr))  
    {  
       return hr;  
    }  
  
    // Retrieve the application-specific data in the message.  
    long lAppSpecific = prc->pmsgprops->aPropVar[0].lVal;  
    wprintf(L"Message received. Thread ID: %x. Application-specific data: %ld\n",  
            GetCurrentThreadId(),  
            lAppSpecific  
            );  
  
    // Initiate the next message receive operation.  
    hr = EnableAsyncReceive(prc);  
    if (FAILED(hr))  
    {  
      return hr;  
    }  
  }  
  return hr;  
}  
  
HRESULT AsyncReadCP(  
                    WCHAR * wszFormatName,  
                    DWORD dwSleepTime  
                    )  
{  
  int cThread = 0, cReceive = 0;  
  HANDLE hPort = NULL;  
  QUEUEHANDLE hQueue = NULL;  
  const int NUMBEROFTHREADS = 2;  
  const int NUMBEROFRECEIVES = 4;  
  HRESULT hr = MQ_OK;  
  
  // Validate the input parameter and open the queue.  
  if (wszFormatName == NULL)  
  {  
    return MQ_ERROR_INVALID_PARAMETER;  
  }  
  hr = MQOpenQueue(  
                   wszFormatName,  
                   MQ_RECEIVE_ACCESS,  
                   MQ_DENY_NONE,  
                   &hQueue  
                   );  
  if (FAILED(hr))  
  {  
    wprintf(L"MQOpenQueue failed. Error: 0x%X.\n, hr");  
    return hr;  
  }  
  
  // Create a new completion port.  
  hPort = CreateIoCompletionPort(  
              INVALID_HANDLE_VALUE,  // Do not associate a queue handle.  
              NULL,                  // New port  
              0,                     // No completion key  
              NUMBEROFTHREADS        // Number of concurrently executed threads  
              );  
  if (hPort == NULL)  
  {  
    MQCloseQueue(hQueue);  
    return GetLastError();  
  {  
  
  // Create worker threads for the completion port.  
  for(cThread = 0; cThread < NUMBEROFTHREADS; cThread++)  
  {  
    DWORD dwThreadId = 0;  
    HANDLE hThread = NULL;  
    hThread = CreateThread(  
              NULL,                  // Default thread security descriptor  
              0,                     // Default stack size  
              CompletionPortThread,  // Start routine  
              hPort,                 // Start routine parameter  
              0,                     // Run immediately  
              &dwThreadId            // Thread ID  
              );  
    if (hThread == NULL)  
    {  
      //  
      // Add code to terminate any other threads created in this function.  
      //  
  
      MQCloseQueue(hQueue);  
      CloseHandle(hPort);  
      return GetLastError();  
    }  
    CloseHandle(hThread);  
  }  
  
  // Associate the queue with the completion port already created.  
  hPort = CreateIoCompletionPort(  
                            hQueue,           // Queue handle  
                            hPort,            // Port handle  
                            0,                // No completion key  
                            NUMBEROFTHREADS  
                            );  
  if (hPort == NULL)  
  {  
    MQCloseQueue(hQueue);  
    return GetLastError();  
  }  
  
  // Initiate several overlapped receive operations.  
  RECEIVE_CONTEXT* prc[NUMBEROFRECEIVES];  
  for (cReceive; cReceive < NUMBEROFRECEIVES; cReceive++)  
  {  
  
    // Allocate and initialize a receive context.  
    prc[cReceive] = new RECEIVE_CONTEXT;  
    if (prc[cReceive] == NULL)  
    {  
      return MQ_ERROR_INSUFFICIENT_RESOURCES;  
    }  
    memset(prc[cReceive], 0, sizeof(RECEIVE_CONTEXT));  
    prc[cReceive]->hQueue = hQueue;  
    prc[cReceive]->pmsgprops = new MQMSGPROPS;  
    if (prc[cReceive]->pmsgprops == NULL)  
    {  
      return MQ_ERROR_INSUFFICIENT_RESOURCES;  
    }  
  
    const int NUMBEROFPROPERTIES = 1;  
    prc[cReceive]->pmsgprops->aPropID = new MSGPROPID[NUMBEROFPROPERTIES];  
    if (prc[cReceive]->pmsgprops->aPropID == NULL)  
    {  
      return MQ_ERROR_INSUFFICIENT_RESOURCES;  
    }  
    prc[cReceive]->pmsgprops->aPropVar = new MQPROPVARIANT[NUMBEROFPROPERTIES];  
    if (prc[cReceive]->pmsgprops->aPropVar == NULL)  
    {  
      return MQ_ERROR_INSUFFICIENT_RESOURCES;  
    }  
    prc[cReceive]->pmsgprops->aStatus = new HRESULT[NUMBEROFPROPERTIES];  
    if (prc[cReceive]->pmsgprops->aStatus == NULL)  
    {  
      return MQ_ERROR_INSUFFICIENT_RESOURCES;  
    }  
  
    DWORD cPropId = 0;  
    prc[cReceive]->pmsgprops->aPropID[cPropId] = PROPID_M_APPSPECIFIC;  
    prc[cReceive]->pmsgprops->aPropVar[cPropId].vt = VT_UI4;  
    cPropId++;  
  
    prc[cReceive]->pmsgprops->cProp = cPropId;  
  
    hr = EnableAsyncReceive(prc);  
    if (FAILED(hr))  
    {  
      break;  
    }  
  }  
  if (SUCCEEDED(hr))  
  {  
    Sleep(dwSleepTime)  
  }  
  
  // Free the memory allocated for the RECEIVE_CONTEXT structures.  
  for ( ; cReceive >= 0; cReceive--)  
  {  
    delete prc[cReceive]->pmsgprops->aStatus;  
    delete prc[cReceive]->pmsgprops->aPropVar;  
    delete prc[cReceive]->pmsgprops->aPropID;  
    delete prc[cReceive]->pmsgprops;  
    delete prc[cReceive];  
}  
  
  // Close the queue and free the port.  
  MQCloseQueue(hQueue);  
  CloseHandle(hPort);  
  return hr;  
}