question

noname-1014 avatar image
0 Votes"
noname-1014 asked noname-1014 published

I/O completions ports - Completion packets not seen with simultaneous connections

I have written a test TCP server using IOCP. The server just listens on a port and dumps on the screen the received data (it sends nothing back to the client).

The problem I am facing is that, if the server receives a second connection while there is one connection open, the second connection is not accepted until the first one is closed.

The program works as follows:

  • Create an I/O completion port (CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0)).

  • Create listener socket.

  • Bind listener socket.

  • Listen.

  • Associate listener socket with the I/O completion port.

  • Create 4 sockets (maximum number of simultaneous connections) (4 x WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, WSA_FLAG_OVERLAPPED)).

  • Associate the 4 sockets with the I/O completion port.

  • Start an asynchronous accept for each socket (4 x AcceptEx()).

  • Create two threads which loop calling GetQueuedCompletionStatus().

  • When a connection is accepted, I call setsockopt() with the option SO_UPDATE_ACCEPT_CONTEXT and start an asynchronous read (WSARecv()).

  • After data has been received, I start another asynchronous read (WSARecv()).

  • When the client closes the connection, I call DisconnectEx() and, once DisconnectEx() has completed, I call AcceptEx() again.


Why don't I get notified in GetQueuedCompletionStatus() when a second connection is received?


The output of the program and the source code can be found here:
https://stackoverflow.com/questions/69064482/i-o-completions-ports-completion-packets-not-seen-with-simultaneous-connection

c++windows-api-general
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

XiaopoYang-MSFT avatar image
1 Vote"
XiaopoYang-MSFT answered

I have tested the server program successfully with the client code. I only add Sleep after sending.

5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

XiaopoYang-MSFT avatar image
0 Votes"
XiaopoYang-MSFT answered XiaopoYang-MSFT edited

There is a example which uses overlapped I/O and completion ports. And I found that the attached code is lack of creating an accepting socket.

· 2
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

I am creating the accept socket in the method connection::create(). I pass than the socket to the connection's constructor.
In the given example a single accept socket is created, I would like to create more than one.

0 Votes 0 ·

Perhaps you should try WSAAccept, CreateIoCompletionPort, WSARecv instead of CreateIoCompletionPort, WSAAccept, WSARecv. The question has a complete sample.

0 Votes 0 ·
noname-1014 avatar image
0 Votes"
noname-1014 answered noname-1014 commented

Thank you for your answer.
I have changed the order (AcceptEx, CreateIoCompletionPort and WSARecv) but still doesn't work.
Only when I close the first connection is the second connection processed.

I have added timestamps to the test program so it is easier to see what's going on. I have added a couple of comments in the output (search for <<<):
[2021/09/14 07:46:41] Created I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] Created listener socket 204.
[2021/09/14 07:46:41] Associated listener socket 204 with the I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] Created socket 184.
[2021/09/14 07:46:41] Created connection 0x000002d13c285960 (fd: 184, listener: 204).
[2021/09/14 07:46:41] [conn: 0x000002d13c285960] Starting asynchronous accept (fd: 184, listener: 204).
[2021/09/14 07:46:41] Associated socket 184 with the I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] Created socket 212.
[2021/09/14 07:46:41] Created connection 0x000002d13c2a2490 (fd: 212, listener: 204).
[2021/09/14 07:46:41] [conn: 0x000002d13c2a2490] Starting asynchronous accept (fd: 212, listener: 204).
[2021/09/14 07:46:41] Associated socket 212 with the I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] Created socket 188.
[2021/09/14 07:46:41] Created connection 0x000002d13c2869e0 (fd: 188, listener: 204).
[2021/09/14 07:46:41] [conn: 0x000002d13c2869e0] Starting asynchronous accept (fd: 188, listener: 204).
[2021/09/14 07:46:41] Associated socket 188 with the I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] Created socket 192.
[2021/09/14 07:46:41] Created connection 0x000002d13c287a60 (fd: 192, listener: 204).
[2021/09/14 07:46:41] [conn: 0x000002d13c287a60] Starting asynchronous accept (fd: 192, listener: 204).
[2021/09/14 07:46:41] Associated socket 192 with the I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] [thread-0] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:46:41] [thread-1] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:46:41] Waiting for signal to arrive.


<<< [2021/09/14 07:47:02] Here comes the first connection (fd: 192) >>>


[2021/09/14 07:47:02] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Processing I/O, state: waiting-accept, transferred: 0 (fd: 192, listener: 204).
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Accepted connection (fd: 192, listener: 204).
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Starting asynchronous read (fd: 192, listener: 204).
[2021/09/14 07:47:02] [thread-1] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:02] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Processing I/O, state: accepted, transferred: 660 (fd: 192, listener: 204).
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Got 660 byte(s) (fd: 192, listener: 204).
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Starting asynchronous read (fd: 192, listener: 204).
[2021/09/14 07:47:02] [thread-0] About to wait in GetQueuedCompletionStatus().


<<< [2021/09/14 07:47:10] I start the second connection but nothing happens >>>
<<< [2021/09/14 07:47:23] I close the first connection (fd: 192) >>>


[2021/09/14 07:47:23] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Processing I/O, state: accepted, transferred: 0 (fd: 192, listener: 204).
[2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Got 0 byte(s) (fd: 192, listener: 204).
[2021/09/14 07:47:23] [thread-0] Error processing I/O for connection 0x000002d13c287a60.
[2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Disconnecting socket (fd: 192, listener: 204).


<<< After having closed the first connection (fd: 192) is the second connection (fd: 188) processed >>>


[2021/09/14 07:47:23] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Processing I/O, state: waiting-accept, transferred: 0 (fd: 188, listener: 204).
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Accepted connection (fd: 188, listener: 204).
[2021/09/14 07:47:23] [thread-0] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:23] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Processing I/O, state: disconnecting, transferred: 0 (fd: 192, listener: 204).
[2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Starting asynchronous accept (fd: 192, listener: 204).
[2021/09/14 07:47:23] [thread-0] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Starting asynchronous read (fd: 188, listener: 204).
[2021/09/14 07:47:23] [thread-1] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:23] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Processing I/O, state: accepted, transferred: 660 (fd: 188, listener: 204).
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Got 660 byte(s) (fd: 188, listener: 204).
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Starting asynchronous read (fd: 188, listener: 204).
[2021/09/14 07:47:23] [thread-1] About to wait in GetQueuedCompletionStatus().


<<< [2021/09/14 07:47:32] Here I close the second connection >>>


[2021/09/14 07:47:32] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Processing I/O, state: accepted, transferred: 0 (fd: 188, listener: 204).
[2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Got 0 byte(s) (fd: 188, listener: 204).
[2021/09/14 07:47:32] [thread-1] Error processing I/O for connection 0x000002d13c2869e0.
[2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Disconnecting socket (fd: 188, listener: 204).
[2021/09/14 07:47:32] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Processing I/O, state: disconnecting, transferred: 0 (fd: 188, listener: 204).
[2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Starting asynchronous accept (fd: 188, listener: 204).
[2021/09/14 07:47:32] [thread-1] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:32] [thread-0] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:35] Signal received.
[2021/09/14 07:47:35] [thread-0] GetQueuedCompletionStatus() succeeded, key: 2.
[2021/09/14 07:47:35] [thread-0] Exiting loop, key: 2.
[2021/09/14 07:47:35] [thread-1] GetQueuedCompletionStatus() succeeded, key: 2.
[2021/09/14 07:47:35] [thread-1] Exiting loop, key: 2.

· 4
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

noname-1014 avatar image noname-1014 XiaopoYang-MSFT ·

I am calling setsockopt() in the method connection::accepted():

     // Connection has been accepted.
     bool accepted()
     {
 #if DEBUG
       printf("[conn: 0x%p] Accepted connection (fd: %llu, listener: %llu).\n",
              static_cast<const void*>(this),
              _M_fd,
              _M_listener);
 #endif // DEBUG
    
       if ((setsockopt(_M_fd,
                       SOL_SOCKET,
                       SO_UPDATE_ACCEPT_CONTEXT,
                       reinterpret_cast<const char*>(&_M_listener),
                       sizeof(SOCKET)) == 0) &&
           (read())) {
         _M_state = state::accepted;
         return true;
       }
    
       return false;
     }
0 Votes 0 ·

I have checked your code again. How many worker threads do you create?

0 Votes 0 ·
Show more comments
noname-1014 avatar image
0 Votes"
noname-1014 answered noname-1014 commented

This also works for me. The problem I am facing is when I open a second connection and send data. This second connection is not processed until I close the first connection.
The tcp_receiver listens on 127.0.0.1:2000. The easiest way to test it is:

  1. Open the browser.

  2. Visit http://127.0.0.1:2000

  3. Open another tab and visit http://127.0.0.1:2000

  4. In the debug messages of the tcp_receiver you will see only the data sent by the first tab.

  5. Close the first tab, now you might see the data sent by the second tab.

As I call AcceptEx() twice, I would expect that I can process both connections.

· 2
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

132306-image.png
II see These two threads are all working.

0 Votes 0 ·
image.png (72.2 KiB)
noname-1014 avatar image noname-1014 XiaopoYang-MSFT ·

That's exactly the behavior I was expecting! The second connection is processed while the first one is waiting for data.

If you haven't changed anything to my code:

  1. Which Windows version are you using? I am using Windows 10 Enterprise.

  2. Are you compiling with Visual Studio? I am compiling with Mingw-w64.

If you are testing with a non Windows server edition, then the problem might be with my Mingw-w64.

0 Votes 0 ·
noname-1014 avatar image
0 Votes"
noname-1014 answered noname-1014 published

I am posting here the whole test program, maybe it is easier for you to test it.
I have changed the following:

  • Added timestamps to the debug messages.

  • Changed order (WSASocket() / AcceptEx() / CreateIoCompletionPort()


Source code:

 #include <stdint.h>
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
 #include <stdarg.h>
 #include <time.h>
 #include <winsock2.h>
 #include <ws2tcpip.h>
 #include <mswsock.h>
 #include <windows.h>
 #include <process.h>
 #include <new>
    
 enum class completion_key {
   io = 1,
   shutdown
 };
    
 struct thread_data {
   size_t nthread;
   HANDLE io_completion_port;
 };
    
 static HANDLE stop_event = nullptr;
    
 static bool init_winsock();
 static BOOL WINAPI signal_handler(DWORD control_type);
 static void log(const char* format, ...);
    
 class connection : private OVERLAPPED {
   private:
     // Connection state.
     enum class state {
       disconnected,
       waiting_accept,
       accepted,
       disconnecting
     };
    
   public:
     // Set address of the AcceptEx() function.
     static void acceptex(LPFN_ACCEPTEX fn)
     {
       _M_acceptex = fn;
     }
    
     // Set address of the DisconnectEx() function.
     static void disconnectex(LPFN_DISCONNECTEX fn)
     {
       _M_disconnectex = fn;
     }
    
     // Create connection.
     static connection* create(SOCKET listener, HANDLE io_completion_port)
     {
       // Create socket.
       const SOCKET fd = WSASocket(AF_INET,
                                   SOCK_STREAM,
                                   IPPROTO_TCP,
                                   nullptr,
                                   0,
                                   WSA_FLAG_OVERLAPPED);
    
       // If the socket could be created...
       if (fd != INVALID_SOCKET) {
 #if DEBUG
         log("Created socket %llu.\n", fd);
 #endif
    
         // Create connection.
         connection* const conn = new (std::nothrow) connection(fd, listener);
    
         // If the connection could be created...
         if (conn) {
 #if DEBUG
           log("Created connection 0x%p (fd: %llu, listener: %llu).\n",
               static_cast<const void*>(conn),
               fd,
               listener);
 #endif // DEBUG
    
           // Start an asynchronous accept.
           if (conn->accept()) {
             // Associate socket with the I/O completion port.
             if (CreateIoCompletionPort(reinterpret_cast<HANDLE>(fd),
                                        io_completion_port,
                                        static_cast<ULONG_PTR>(completion_key::io),
                                        0)) {
 #if DEBUG
               log("Associated socket %llu with the I/O completion port 0x%p.\n",
                   fd,
                   static_cast<const void*>(io_completion_port));
 #endif // DEBUG
    
               return conn;
             }
           }
    
           delete conn;
    
           return nullptr;
         }
    
         closesocket(fd);
       }
    
       return nullptr;
     }
    
     // Constructor.
     connection(SOCKET fd, SOCKET listener)
       : OVERLAPPED{0},
         _M_fd(fd),
         _M_listener(listener)
     {
     }
    
     // Destructor.
     ~connection()
     {
       shutdown(_M_fd, SD_BOTH);
       closesocket(_M_fd);
     }
    
     // Start an asynchronous accept.
     bool accept()
     {
 #if DEBUG
       log("[conn: 0x%p] Starting asynchronous accept (fd: %llu, "
           "listener: %llu).\n",
           static_cast<const void*>(this),
           _M_fd,
           _M_listener);
 #endif // DEBUG
    
       DWORD bytes;
       if ((_M_acceptex(_M_listener,
                        _M_fd,
                        _M_addresses,
                        0,
                        sizeof(struct sockaddr_in) + 16,
                        sizeof(struct sockaddr_in) + 16,
                        &bytes,
                        this)) ||
           (WSAGetLastError() == WSA_IO_PENDING)) {
         _M_state = state::waiting_accept;
         return true;
       }
    
       return false;
     }
    
     // Connection has been accepted.
     bool accepted()
     {
 #if DEBUG
       log("[conn: 0x%p] Accepted connection (fd: %llu, listener: %llu).\n",
           static_cast<const void*>(this),
           _M_fd,
           _M_listener);
 #endif // DEBUG
    
       if ((setsockopt(_M_fd,
                       SOL_SOCKET,
                       SO_UPDATE_ACCEPT_CONTEXT,
                       reinterpret_cast<const char*>(&_M_listener),
                       sizeof(SOCKET)) == 0) &&
           (read())) {
         _M_state = state::accepted;
         return true;
       }
    
       return false;
     }
    
     // Start an asynchronous read.
     bool read()
     {
 #if DEBUG
       log("[conn: 0x%p] Starting asynchronous read (fd: %llu, "
           "listener: %llu).\n",
           static_cast<const void*>(this),
           _M_fd,
           _M_listener);
 #endif // DEBUG
    
       WSABUF buf;
       buf.buf = reinterpret_cast<char*>(_M_buf);
       buf.len = sizeof(_M_buf);
    
       DWORD flags = 0;
    
       return ((WSARecv(_M_fd,
                        &buf,
                        1,
                        nullptr,
                        &flags,
                        this,
                        nullptr) == 0) ||
               (WSAGetLastError() == WSA_IO_PENDING));
     }
    
     // Data has been read.
     bool read(DWORD transferred)
     {
 #if DEBUG
       log("[conn: 0x%p] Got %lu byte(s) (fd: %llu, listener: %llu).\n",
           static_cast<const void*>(this),
           transferred,
           _M_fd,
           _M_listener);
 #endif // DEBUG
    
       if (transferred > 0) {
         log("Read %lu byte(s):\n", transferred);
         log("%.*s\n",
             static_cast<int>(transferred),
             reinterpret_cast<const char*>(_M_buf));
    
         log("===========================================================\n");
    
         // Start another read operation.
         return read();
       } else {
         return false;
       }
     }
    
     // Close connection.
     void close()
     {
 #if DEBUG
       log("[conn: 0x%p] Disconnecting socket (fd: %llu, listener: %llu).\n",
           static_cast<const void*>(this),
           _M_fd,
           _M_listener);
 #endif // DEBUG
    
       // Close connection.
       if (_M_disconnectex(_M_fd, this, TF_REUSE_SOCKET, 0)) {
         _M_state = state::disconnected;
       } else {
         _M_state = (WSAGetLastError() == WSA_IO_PENDING) ? state::disconnecting :
                                                            state::disconnected;
       }
     }
    
     // Process I/O.
     bool process_io(DWORD transferred)
     {
 #if DEBUG
       log("[conn: 0x%p] Processing I/O, state: %s, transferred: %lu "
           "(fd: %llu, listener: %llu).\n",
           static_cast<const void*>(this),
           to_string(_M_state),
           transferred,
           _M_fd,
           _M_listener);
 #endif // DEBUG
    
       switch (_M_state) {
         case state::waiting_accept:
           return accepted();
         case state::accepted:
           return read(transferred);
         case state::disconnecting:
           return accept();
         case state::disconnected:
         default:
           return true;
       }
     }
    
     static const char* to_string(state s)
     {
       switch (s) {
         case state::disconnected:
           return "disconnected";
         case state::waiting_accept:
           return "waiting-accept";
         case state::accepted:
           return "accepted";
         case state::disconnecting:
           return "disconnecting";
         default:
           return "(unknown)";
       }
     }
    
   private:
     // Socket descriptor.
     SOCKET _M_fd;
    
     // Listener socket.
     SOCKET _M_listener;
    
     // Socket addresses.
     uint8_t _M_addresses[2 * (sizeof(struct sockaddr_in) + 16)] = {0};
    
     // Buffer.
     static constexpr const size_t buffer_size = 4 * 1024;
     uint8_t _M_buf[buffer_size];
    
     // Connection state.
     state _M_state;
    
     // Address of the AcceptEx() function.
     static LPFN_ACCEPTEX _M_acceptex;
    
     // Address of the DisconnectEx() function.
     static LPFN_DISCONNECTEX _M_disconnectex;
    
     // Disable copy constructor and assignment operator.
     connection(const connection&) = delete;
     connection& operator=(const connection&) = delete;
 };
    
 LPFN_ACCEPTEX connection::_M_acceptex = nullptr;
 LPFN_DISCONNECTEX connection::_M_disconnectex = nullptr;
    
 static unsigned __stdcall worker(void* arg)
 {
   const thread_data* const data = static_cast<const thread_data*>(arg);
    
   do {
 #if DEBUG
     log("[thread-%zu] About to wait in GetQueuedCompletionStatus().\n", 
         data->nthread);
 #endif // DEBUG
    
     DWORD transferred;
     ULONG_PTR key;
     OVERLAPPED* overlapped;
     if (GetQueuedCompletionStatus(data->io_completion_port,
                                   &transferred,
                                   &key,
                                   &overlapped,
                                   INFINITE)) {
 #if DEBUG
       log("[thread-%zu] GetQueuedCompletionStatus() succeeded, key: %u.\n",
           data->nthread,
           static_cast<unsigned>(key));
 #endif // DEBUG
    
       if (static_cast<completion_key>(key) == completion_key::io) {
         connection* const conn = reinterpret_cast<connection*>(overlapped);
    
         if (!conn->process_io(transferred)) {
 #if DEBUG
           log("[thread-%zu] Error processing I/O for connection 0x%p.\n",
               data->nthread,
               static_cast<const void*>(conn));
 #endif // DEBUG
    
           conn->close();
         }
       } else {
 #if DEBUG
         log("[thread-%zu] Exiting loop, key: %u.\n",
             data->nthread,
             static_cast<unsigned>(key));
 #endif // DEBUG
    
         _endthreadex(0);
         return 0;
       }
     } else {
 #if DEBUG
       log("[thread-%zu] GetQueuedCompletionStatus() failed, key: %u, "
           "overlapped: 0x%p.\n",
           data->nthread,
           static_cast<unsigned>(key),
           static_cast<const void*>(overlapped));
 #endif // DEBUG
    
       if (overlapped) {
         reinterpret_cast<connection*>(overlapped)->close();
       }
     }
   } while (true);
 }
    
 int main()
 {
   static constexpr const unsigned short port = 2000;
    
   // Initialize Winsock library.
   if (!init_winsock()) {
     fprintf(stderr, "Error initializing Winsock library.\n");
     return EXIT_FAILURE;
   }
    
   // Create event.
   stop_event = CreateEvent(nullptr, TRUE, FALSE, nullptr);
    
   // If the event could not be created...
   if (!stop_event) {
     fprintf(stderr, "Error creating stop event.\n");
    
     WSACleanup();
    
     return EXIT_FAILURE;
   }
    
   // Install signal handler.
   if (!SetConsoleCtrlHandler(signal_handler, TRUE)) {
     fprintf(stderr, "Error installing signal handler.\n");
    
     CloseHandle(stop_event);
     WSACleanup();
    
     return EXIT_FAILURE;
   }
    
   // Create I/O completion port.
   const HANDLE
     io_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
    
   // If the I/O completion port could not be created...
   if (!io_completion_port) {
     fprintf(stderr, "Error creating I/O completion port.\n");
    
     SetConsoleCtrlHandler(nullptr, FALSE);
     CloseHandle(stop_event);
     WSACleanup();
    
     return EXIT_FAILURE;
   }
    
 #if DEBUG
   log("Created I/O completion port 0x%p.\n",
       static_cast<const void*>(io_completion_port));
 #endif
    
   // Create listener socket.
   const SOCKET listener = WSASocket(AF_INET,
                                     SOCK_STREAM,
                                     IPPROTO_TCP,
                                     nullptr,
                                     0,
                                     WSA_FLAG_OVERLAPPED);
    
   // If the listener socket could not be created...
   if (listener == INVALID_SOCKET) {
     fprintf(stderr, "Error creating listener socket.\n");
    
     CloseHandle(io_completion_port);
     SetConsoleCtrlHandler(nullptr, FALSE);
     CloseHandle(stop_event);
     WSACleanup();
    
     return EXIT_FAILURE;
   }
    
 #if DEBUG
   log("Created listener socket %llu.\n", listener);
 #endif
    
   GUID guid = WSAID_ACCEPTEX;
   LPFN_ACCEPTEX acceptex;
   DWORD bytes;
    
   // Get address of the AcceptEx() function.
   if (WSAIoctl(listener,
                SIO_GET_EXTENSION_FUNCTION_POINTER,
                &guid,
                sizeof(GUID),
                &acceptex,
                sizeof(LPFN_ACCEPTEX),
                &bytes,
                nullptr,
                nullptr) != 0) {
     fprintf(stderr,
             "Error getting the address of the AcceptEx() function.\n");
    
     closesocket(listener);
     CloseHandle(io_completion_port);
     SetConsoleCtrlHandler(nullptr, FALSE);
     CloseHandle(stop_event);
     WSACleanup();
    
     return EXIT_FAILURE;
   }
    
   guid = WSAID_DISCONNECTEX;
   LPFN_DISCONNECTEX disconnectex;
    
   // Get address of the DisconnectEx() function.
   if (WSAIoctl(listener,
                SIO_GET_EXTENSION_FUNCTION_POINTER,
                &guid,
                sizeof(GUID),
                &disconnectex,
                sizeof(LPFN_DISCONNECTEX),
                &bytes,
                nullptr,
                nullptr) != 0) {
     fprintf(stderr,
             "Error getting the address of the DisconnectEx() function.\n");
    
     closesocket(listener);
     CloseHandle(io_completion_port);
     SetConsoleCtrlHandler(nullptr, FALSE);
     CloseHandle(stop_event);
     WSACleanup();
    
     return EXIT_FAILURE;
   }
    
   struct sockaddr_in addr;
   addr.sin_family = AF_INET;
   addr.sin_addr.S_un.S_addr = htonl(INADDR_LOOPBACK);
   //addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
   addr.sin_port = htons(port);
    
   memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    
   // Bind.
   if (bind(listener,
            reinterpret_cast<const struct sockaddr*>(&addr),
            sizeof(struct sockaddr_in)) != 0) {
     fprintf(stderr, "Error binding to localhost:%u.\n", port);
    
     closesocket(listener);
     CloseHandle(io_completion_port);
     SetConsoleCtrlHandler(nullptr, FALSE);
     CloseHandle(stop_event);
     WSACleanup();
    
     return EXIT_FAILURE;
   }
    
   // Listen.
   if (listen(listener, SOMAXCONN) != 0) {
     fprintf(stderr, "Error listening on localhost:%u.\n", port);
    
     closesocket(listener);
     CloseHandle(io_completion_port);
     SetConsoleCtrlHandler(nullptr, FALSE);
     CloseHandle(stop_event);
     WSACleanup();
    
     return EXIT_FAILURE;
   }
    
   // Associate listener with the I/O completion port.
   if (!CreateIoCompletionPort(reinterpret_cast<HANDLE>(listener),
                               io_completion_port,
                               static_cast<ULONG_PTR>(
                                 completion_key::io
                               ),
                               0)) {
     fprintf(stderr,
             "Error associating listener socket with the I/O completion "
             "port.\n");
    
     closesocket(listener);
     CloseHandle(io_completion_port);
     SetConsoleCtrlHandler(nullptr, FALSE);
     CloseHandle(stop_event);
     WSACleanup();
    
     return EXIT_FAILURE;
   }
    
 #if DEBUG
   log("Associated listener socket %llu with the I/O completion port 0x%p.\n",
       listener,
       static_cast<const void*>(io_completion_port));
 #endif // DEBUG
    
   static constexpr const size_t number_connections = 4;
   connection* connections[number_connections];
    
   // Set address of the AcceptEx() function.
   connection::acceptex(acceptex);
    
   // Set address of the DisconnectEx() function.
   connection::disconnectex(disconnectex);
    
   // Create connections.
   for (size_t i = 0; i < number_connections; i++) {
     // Create connection.
     connections[i] = connection::create(listener, io_completion_port);
    
     // If the connection could not be created...
     if (!connections[i]) {
       for (size_t j = 0; j < i; j++) {
         delete connections[j];
       }
    
       closesocket(listener);
       CloseHandle(io_completion_port);
       SetConsoleCtrlHandler(nullptr, FALSE);
       CloseHandle(stop_event);
       WSACleanup();
    
       return EXIT_FAILURE;
     }
   }
    
   static constexpr const size_t number_threads = 2;
   HANDLE threads[number_threads];
   thread_data thread_data[number_threads];
    
   // Create threads.
   for (size_t i = 0; i < number_threads; i++) {
     thread_data[i].nthread = i;
     thread_data[i].io_completion_port = io_completion_port;
    
     // Create thread.
     threads[i] = reinterpret_cast<HANDLE>(_beginthreadex(nullptr,
                                                          0,
                                                          worker,
                                                          &thread_data[i],
                                                          0,
                                                          nullptr));
    
     // If the thread could not be created...
     if (threads[i] == reinterpret_cast<HANDLE>(-1)) {
       fprintf(stderr, "Error creating thread %zu.\n", i + 1);
    
       // Save number of threads.
       const size_t nthreads = i;
    
       // Notify threads that they have to stop.
       for (i = 0; i < nthreads; i++) {
         PostQueuedCompletionStatus(io_completion_port,
                                    0,
                                    static_cast<ULONG_PTR>(
                                      completion_key::shutdown
                                    ),
                                    nullptr);
       }
    
       // Wait for the worker threads.
       WaitForMultipleObjects(nthreads, threads, TRUE, INFINITE);
    
       for (i = 0; i < nthreads; i++) {
         CloseHandle(threads[i]);
       }
    
       for (i = 0; i < number_connections; i++) {
         delete connections[i];
       }
    
       closesocket(listener);
       CloseHandle(io_completion_port);
       SetConsoleCtrlHandler(nullptr, FALSE);
       CloseHandle(stop_event);
       WSACleanup();
    
       return EXIT_FAILURE;
     }
   }
    
   log("Waiting for signal to arrive.\n");
    
   // Wait for signal to arrive.
   WaitForSingleObject(stop_event, INFINITE);
    
   log("Signal received.\n");
    
   // Notify threads that they have to stop.
   for (size_t i = 0; i < number_threads; i++) {
     PostQueuedCompletionStatus(io_completion_port,
                                0,
                                static_cast<ULONG_PTR>(completion_key::shutdown),
                                nullptr);
   }
    
   // Wait for the worker threads.
   WaitForMultipleObjects(number_threads, threads, TRUE, INFINITE);
    
   for (size_t i = 0; i < number_threads; i++) {
     CloseHandle(threads[i]);
   }
    
   for (size_t i = 0; i < number_connections; i++) {
     delete connections[i];
   }
    
   closesocket(listener);
   CloseHandle(io_completion_port);
   SetConsoleCtrlHandler(nullptr, FALSE);
   CloseHandle(stop_event);
   WSACleanup();
    
   return EXIT_SUCCESS;
 }
    
 bool init_winsock()
 {
   WORD version = MAKEWORD(2, 2);
   WSADATA wsadata;
   if (WSAStartup(version, &wsadata) == 0) {
     if ((LOBYTE(wsadata.wVersion) == 2) &&
         (HIBYTE(wsadata.wVersion) == 2)) {
       return true;
     }
    
     WSACleanup();
   }
    
   return false;
 }
    
 BOOL WINAPI signal_handler(DWORD control_type)
 {
   switch (control_type) {
     case CTRL_C_EVENT:
     case CTRL_CLOSE_EVENT:
       SetEvent(stop_event);
       return TRUE;
     default:
       return FALSE;
   }
 }
    
 void log(const char* format, ...)
 {
   time_t now = time(nullptr);
   const struct tm* const tm = localtime(&now);
    
   printf("[%04u/%02u/%02u %02u:%02u:%02u] ",
          1900 + tm->tm_year,
          1 + tm->tm_mon,
          tm->tm_mday,
          tm->tm_hour,
          tm->tm_min,
          tm->tm_sec);
    
   va_list ap;
   va_start(ap, format);
   vprintf(format, ap);
   va_end(ap);
 }
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

noname-1014 avatar image
0 Votes"
noname-1014 answered

I have just tested starting two complete-client-code and it also works. Then there might be something strange when testing with the browser.
Thank you very much for your help.


5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.