question

AtencioAdamContractorBSC-7354 avatar image
0 Votes"
AtencioAdamContractorBSC-7354 asked XiaopoYang-MSFT edited

Issue with IO Completion port not returning buffer from WSARecv

Having a issue with IO completion ports:

So getting a receive message on the IO completion port i set up, It triggers GetQueuedCompletionStatus and returns with the Completion Key and the Overlapped data. Both appear to be good and I can see data populated in their structures. However the Buffer which was passed to WSARecv was not populated with the incoming message. (the BytesTransfered indicates that there were bytes received, but no data in the WSABUF).

Here is the code as it currently stands, looking for help as to why the Buffer isn't being populated.

networkhandlerthread.ccp

 #include "NetworkHandlerThread.h"
    
 // Worker thread, processes IOCP messages.
 DWORD ServerWorkerThread(LPVOID lpParam)
 {
     HANDLE CompletionPort = (HANDLE)lpParam;
     DWORD BytesTransferred = 0;
     OVERLAPPED* lpOverlapped = NULL;
     LPCONNECTED_SOCKET_DATA ConnectedSocketData = NULL;
     LPPER_IO_OPERATION_DATA PerIoData = NULL;
     DWORD Flags = 0;
     WSABUF* DataBuf;
     DWORD RecvBytes = 0;
     Type1MessageParser Type1MsgParser;
     Type2MessageParser Type2MsgParser;
     int DestinationAddress = 0;
     bool IsType1 = false;
    
     while (TRUE)//run forever
     {
         //Check for new message
         if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&ConnectedSocketData, (LPOVERLAPPED*)&PerIoData, INFINITE) == 0)
         {
             DWORD Err = GetLastError();
             if (Err != WAIT_TIMEOUT)
             {
                 printf("GetQueuedCompletionStatus() failed with error %d\n", Err);
    
                 if (closesocket(ConnectedSocketData->Socket) == SOCKET_ERROR)
                 {
                     printf("closesocket() failed with error %d\n", WSAGetLastError());
                     return 0;
                 }
    
                 GlobalFree(ConnectedSocketData);
             }
             continue;
         }
    
    
         //We have a message, determine if it's something we receaved or something we should send.
         if (PerIoData->OperationType == OPERATION_TYPE_RECV)
         {
             ///tbd process recv
             ConnectedSocketData; //this is comming in good and has data
             PerIoData->Buffer; // this is empty (pointer is good, but no data)
         }
         else if (PerIoData->OperationType == OPERATION_TYPE_SEND)
         {
             ///tbd process send
         }
     }
 };
    
    
 //Thread for handling Listener sockets and Accepting connections
 DWORD ListenThread(LPVOID lpParam)
 {
     LPLISTEN_SOCKET_DATA pSocketData = (LPLISTEN_SOCKET_DATA)(lpParam);
     WSANETWORKEVENTS NetworkEvents;
     DWORD dwRet;
     SOCKADDR_IN NewSockAddr;
     SOCKET      NewSocket;
     int         nLen;
    
     while (true) //run forever
     {
         //Wait for event
         dwRet = WSAWaitForMultipleEvents(1,
             &(pSocketData->hAcceptEvent),
             false,
             100,
             false);
    
         //Nothing happened, back to top
         if (dwRet == WSA_WAIT_TIMEOUT)
             continue;
    
         //We got a event, find out which one.
         int nRet = WSAEnumNetworkEvents(pSocketData->Socket,
             pSocketData->hAcceptEvent,
             &NetworkEvents);
         if (nRet == SOCKET_ERROR)
         {
             wprintf(L"WSAEnumNetworkEvents error %ld\n", WSAGetLastError());
             break;
         }
    
         //We got a Accept event
         if (NetworkEvents.lNetworkEvents & FD_ACCEPT)
         {
             //Check for errors
             if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
             {
    
                 // Accept new connection
                 nLen = sizeof(SOCKADDR_IN);
                 NewSocket = WSAAccept(pSocketData->Socket,
                     (LPSOCKADDR)&NewSockAddr,
                     &nLen, NULL, NULL);
                 if (NewSocket == SOCKET_ERROR)
                 {
                     wprintf(L"accept() error %ld\n", WSAGetLastError());
                     break;
                 }
    
                 wprintf(L"Accepted Connection %ld", NewSockAddr.sin_addr.S_un.S_addr);
    
                 //Set new connection as TCP connection, No Delay
                 //const char chOpt = 1;
                 //int nErr = setsockopt(NewSocket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
                 //if (nErr == -1)
                 //{
                 //    wprintf(L"setsockopt() error %ld\n", WSAGetLastError());
                 //    break;
                 //}
    
    
                 LPCONNECTED_SOCKET_DATA ConnectedSocketData = new CONNECTED_SOCKET_DATA;
    
                 ZeroMemory(ConnectedSocketData, sizeof(CONNECTED_SOCKET_DATA));
    
                 ConnectedSocketData->Socket = NewSocket;
                 ConnectedSocketData->Port = pSocketData->Port;
                 ConnectedSocketData->IOCP = pSocketData->IOCP;
                 ConnectedSocketData->CfgHandle = pSocketData->CfgHandle;
                 ConnectedSocketData->ForwardMessager = pSocketData->ForwardMessager;
    
                 //Add the new socket to the completion port, message from the socker will be queued up for proccessing by worker threads.
                 if (CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD_PTR)ConnectedSocketData, 0) == NULL)
                 {
                     wprintf(L"CreateIOCompletionPort error %ld\n", WSAGetLastError());
                     delete ConnectedSocketData;
                     ConnectedSocketData = NULL;
                     closesocket(NewSocket);
                     break;
                 }
    
                 //Set the PerIOData, will be used at completion time
                 LPPER_IO_OPERATION_DATA PerIoData;
                 PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA));
    
                 ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED));
                 PerIoData->BufferLen = 0;
                 PerIoData->OperationType = OPERATION_TYPE_RECV;
                 DWORD RecvBytes = 0;
                 DWORD Flags = 0;
                 PerIoData->Buffer.buf = PerIoData->cBuffer;
                 PerIoData->Buffer.len = DATA_BUFSIZE;
    
    
                 //Kick off the first Recv request for the Socket, will be handled by the completion Queue.
                 if (WSARecv(NewSocket, &(PerIoData->Buffer), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL) == SOCKET_ERROR)
                 {
                     wprintf(L"WSARecv error %ld\n", WSAGetLastError());
                     return 0;
                 }
             }
             else
             {
                 wprintf(L"Unknown network event error %ld\n", WSAGetLastError());
                 break;
             }
         }
     }
 }
    
    
 NetworkHandlerThread::NetworkHandlerThread()
 {
     m_CompletionPort = 0;
     m_hListenThread = 0;
 }
    
 NetworkHandlerThread::~NetworkHandlerThread()
 {
    
 }
    
 void NetworkHandlerThread::StartNetworkHandler()
 {
     int iResult = 0;
     SYSTEM_INFO SystemInfo;
     unsigned int i = 0;
    
     //Start WSA
     iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
     if (iResult != NO_ERROR) {
         wprintf(L"WSAStartup() failed with error: %d\n", iResult);
         return;
     }
    
     //Start Completion Port
     m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
     if (m_CompletionPort != NULL)
     {
         wprintf(L"Completion Port Created\n");
     }
    
     //Get # of system processors
     GetSystemInfo(&SystemInfo);
    
     //create Worker Threads for each processor.
     for (i = 0; i < SystemInfo.dwNumberOfProcessors * THREADS_PER_PROCESSOR; i++)
     {
         HANDLE ThreadHandle;
    
         // Create a server worker thread, and pass the
         // completion port to the thread. 
         ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, m_CompletionPort, 0, NULL);
    
         // Close the thread handle
         if (ThreadHandle != NULL)
         {
             CloseHandle(ThreadHandle);
         }
     }
 }
    
 void NetworkHandlerThread::AddListenThread(int Port,
     ConfigHandler* pConfigHandle,
     void* ForwardHandle)
 {
     SOCKADDR_IN InternetAddr;
     int iResult = 0;
     LPLISTEN_SOCKET_DATA pListenSocketData = new LISTEN_SOCKET_DATA;
    
     if (pListenSocketData == NULL)
     {
         return;
     }
    
     //Create the listener Socket
     pListenSocketData->Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
     if (pListenSocketData->Socket == INVALID_SOCKET)
     {
         wprintf(L"socket function failed with error: %ld\n", WSAGetLastError());
         WSACleanup();
         return;
     }
    
     // Create a Event to handle Socket Accepts
     pListenSocketData->hAcceptEvent = WSACreateEvent();
     if (pListenSocketData->hAcceptEvent == WSA_INVALID_EVENT)
     {
         wprintf(L"WSACreateEvent() error %ld\n", WSAGetLastError());
         closesocket(pListenSocketData->Socket);
         return;
     }
    
     // Set the Event to Trigger on FD_ACCEPT (this occurs on socket connection attempts)
     int nRet = WSAEventSelect(pListenSocketData->Socket,
         pListenSocketData->hAcceptEvent,
         FD_ACCEPT);
     if (nRet == SOCKET_ERROR)
     {
         wprintf(L"WSAAsyncSelect() error %ld\n", WSAGetLastError());
         closesocket(pListenSocketData->Socket);
         return;
     }
    
     //Assign the Port Number
     InternetAddr.sin_family = AF_INET;
     InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
     InternetAddr.sin_port = htons(Port);
     pListenSocketData->Port = Port;
     pListenSocketData->IOCP = m_CompletionPort;
     pListenSocketData->CfgHandle = pConfigHandle;
     pListenSocketData->ForwardMessager = ForwardHandle;
    
     //Bind the Socket to the Port
     iResult = ::bind((pListenSocketData->Socket), (sockaddr*)&InternetAddr, sizeof(InternetAddr));
     if (iResult == SOCKET_ERROR) {
         wprintf(L"bind function failed with error %d\n", WSAGetLastError());
         iResult = closesocket(pListenSocketData->Socket);
         if (iResult == SOCKET_ERROR)
             wprintf(L"closesocket function failed with error %d\n", WSAGetLastError());
         WSACleanup();
         return;
     }
    
     //Listen for incomming connection requests.
     if (listen(pListenSocketData->Socket, SOMAXCONN) == SOCKET_ERROR)
     {
         wprintf(L"listen function failed with error: %d\n", WSAGetLastError());
         closesocket(pListenSocketData->Socket);
         WSACleanup();
         return;
     }
    
     wprintf(L"Listening on %ld", Port);
    
     m_hListenThread = (HANDLE)CreateThread(NULL,                // Security
         0,                  // Stack size - use default
         ListenThread,  // Thread fn entry point
         (void*)pListenSocketData, //Listen Socket Data
         0,                  // Init flag
         NULL);  // Thread address
 }

NetworkHandlerThread.h

 #pragma once
 #include <WinSock2.h>
 #include <ws2tcpip.h>
 #include <stdio.h>
 #include "ForwardMessageHandler.h"
 #include "ConfigHandler.h"
 #include "Type1MessageParser.h"
 #include "Type2Message-Parser.h"
 #include "ThreadUtilities.h"
    
 #define DATA_BUFSIZE 8192
 #define THREADS_PER_PROCESSOR 2
    
 class NetworkHandlerThread
 {
 public:
     WSADATA wsaData;
     HANDLE m_CompletionPort;
     HANDLE m_hListenThread;
    
 public:
     NetworkHandlerThread();
     ~NetworkHandlerThread();
    
     void StartNetworkHandler();
    
     void AddListenThread(int Port,
         ConfigHandler* pConfigHandle,
         void* ForwardHandle);
 };

ThreadUtilities.h

 #pragma once
 #include <mutex>
 #include "ConfigHandler.h"
    
    
 using namespace std;
    
 #define DATA_BUFSIZE 8192
 #define THREADS_PER_PROCESSOR 2
    
 typedef struct _THREAD_MESSAGE
 {
     mutex cmd_mtx;
     string command;
 } THREAD_MESSAGE, * LPTHREAD_MESSAGE;
    
 typedef struct _LISTEN_SOCKET_DATA
 {
     SOCKET Socket;
     int    Port;
     HANDLE hAcceptEvent;
     HANDLE IOCP;
     VOID* ForwardMessager;
     ConfigHandler* CfgHandle;
     // Other information useful to be associated with the handle
 } LISTEN_SOCKET_DATA, * LPLISTEN_SOCKET_DATA;
    
 typedef struct _CONNECTED_SOCKET_DATA
 {
     SOCKET Socket;
     int Port;
     HANDLE IOCP;
     VOID* ForwardMessager;
     ConfigHandler* CfgHandle;
 } CONNECTED_SOCKET_DATA, * LPCONNECTED_SOCKET_DATA;


windows-apic++
· 2
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Your question is related with win32, I will remove network tag and add the corresponding win32 tag. Thank you!

0 Votes 0 ·

So is this case solved according to the SO Question?



0 Votes 0 ·

1 Answer

XiaopoYang-MSFT avatar image
0 Votes"
XiaopoYang-MSFT answered XiaopoYang-MSFT edited

According to the Remarks of CreateIoCompletionPort function,

Objects created by other functions such as socket can also be associated with an I/O completion port. For an example using sockets, see AcceptEx.

This is the hyperlinks of the Example Code of AcceptEx function which uses overlapped I/O and completion ports.





5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.