I/O completions ports - Completion packets not seen with simultaneous connections

noname 141 Reputation points
2021-09-11T06:19:58.337+00:00

I have written a test TCP server using IOCP. The server just listens on a port and dumps on the screen the received data (it sends nothing back to the client).

The problem I am facing is that, if the server receives a second connection while there is one connection open, the second connection is not accepted until the first one is closed.

The program works as follows:

  • Create an I/O completion port (CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0)).
  • Create listener socket.
  • Bind listener socket.
  • Listen.
  • Associate listener socket with the I/O completion port.
  • Create 4 sockets (maximum number of simultaneous connections) (4 x WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, WSA_FLAG_OVERLAPPED)).
  • Associate the 4 sockets with the I/O completion port.
  • Start an asynchronous accept for each socket (4 x AcceptEx()).
  • Create two threads which loop calling GetQueuedCompletionStatus().
  • When a connection is accepted, I call setsockopt() with the option SO_UPDATE_ACCEPT_CONTEXT and start an asynchronous read (WSARecv()).
  • After data has been received, I start another asynchronous read (WSARecv()).
  • When the client closes the connection, I call DisconnectEx() and, once DisconnectEx() has completed, I call AcceptEx() again.

Why don't I get notified in GetQueuedCompletionStatus() when a second connection is received?

The output of the program and the source code can be found here:
https://stackoverflow.com/questions/69064482/i-o-completions-ports-completion-packets-not-seen-with-simultaneous-connection

Windows API - Win32
Windows API - Win32
A core set of Windows application programming interfaces (APIs) for desktop and server applications. Previously known as Win32 API.
2,426 questions
C++
C++
A high-level, general-purpose programming language, created as an extension of the C programming language, that has object-oriented, generic, and functional features in addition to facilities for low-level memory manipulation.
3,537 questions
0 comments No comments
{count} votes

5 additional answers

Sort by: Most helpful
  1. Xiaopo Yang - MSFT 11,496 Reputation points Microsoft Vendor
    2021-09-13T07:31:51.267+00:00

    There is a example which uses overlapped I/O and completion ports. And I found that the attached code is lack of creating an accepting socket.


  2. noname 141 Reputation points
    2021-09-14T06:07:54.223+00:00

    Thank you for your answer.
    I have changed the order (AcceptEx, CreateIoCompletionPort and WSARecv) but still doesn't work.
    Only when I close the first connection is the second connection processed.

    I have added timestamps to the test program so it is easier to see what's going on. I have added a couple of comments in the output (search for <<<):
    [2021/09/14 07:46:41] Created I/O completion port 0x00000000000000b4.
    [2021/09/14 07:46:41] Created listener socket 204.
    [2021/09/14 07:46:41] Associated listener socket 204 with the I/O completion port 0x00000000000000b4.
    [2021/09/14 07:46:41] Created socket 184.
    [2021/09/14 07:46:41] Created connection 0x000002d13c285960 (fd: 184, listener: 204).
    [2021/09/14 07:46:41] [conn: 0x000002d13c285960] Starting asynchronous accept (fd: 184, listener: 204).
    [2021/09/14 07:46:41] Associated socket 184 with the I/O completion port 0x00000000000000b4.
    [2021/09/14 07:46:41] Created socket 212.
    [2021/09/14 07:46:41] Created connection 0x000002d13c2a2490 (fd: 212, listener: 204).
    [2021/09/14 07:46:41] [conn: 0x000002d13c2a2490] Starting asynchronous accept (fd: 212, listener: 204).
    [2021/09/14 07:46:41] Associated socket 212 with the I/O completion port 0x00000000000000b4.
    [2021/09/14 07:46:41] Created socket 188.
    [2021/09/14 07:46:41] Created connection 0x000002d13c2869e0 (fd: 188, listener: 204).
    [2021/09/14 07:46:41] [conn: 0x000002d13c2869e0] Starting asynchronous accept (fd: 188, listener: 204).
    [2021/09/14 07:46:41] Associated socket 188 with the I/O completion port 0x00000000000000b4.
    [2021/09/14 07:46:41] Created socket 192.
    [2021/09/14 07:46:41] Created connection 0x000002d13c287a60 (fd: 192, listener: 204).
    [2021/09/14 07:46:41] [conn: 0x000002d13c287a60] Starting asynchronous accept (fd: 192, listener: 204).
    [2021/09/14 07:46:41] Associated socket 192 with the I/O completion port 0x00000000000000b4.
    [2021/09/14 07:46:41] [thread-0] About to wait in GetQueuedCompletionStatus().
    [2021/09/14 07:46:41] [thread-1] About to wait in GetQueuedCompletionStatus().
    [2021/09/14 07:46:41] Waiting for signal to arrive.

    <<< [2021/09/14 07:47:02] Here comes the first connection (fd: 192) >>>

    [2021/09/14 07:47:02] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
    [2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Processing I/O, state: waiting-accept, transferred: 0 (fd: 192, listener: 204).
    [2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Accepted connection (fd: 192, listener: 204).
    [2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Starting asynchronous read (fd: 192, listener: 204).
    [2021/09/14 07:47:02] [thread-1] About to wait in GetQueuedCompletionStatus().
    [2021/09/14 07:47:02] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
    [2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Processing I/O, state: accepted, transferred: 660 (fd: 192, listener: 204).
    [2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Got 660 byte(s) (fd: 192, listener: 204).
    [2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Starting asynchronous read (fd: 192, listener: 204).
    [2021/09/14 07:47:02] [thread-0] About to wait in GetQueuedCompletionStatus().

    <<< [2021/09/14 07:47:10] I start the second connection but nothing happens >>>
    <<< [2021/09/14 07:47:23] I close the first connection (fd: 192) >>>

    [2021/09/14 07:47:23] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
    [2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Processing I/O, state: accepted, transferred: 0 (fd: 192, listener: 204).
    [2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Got 0 byte(s) (fd: 192, listener: 204).
    [2021/09/14 07:47:23] [thread-0] Error processing I/O for connection 0x000002d13c287a60.
    [2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Disconnecting socket (fd: 192, listener: 204).

    <<< After having closed the first connection (fd: 192) is the second connection (fd: 188) processed >>>

    [2021/09/14 07:47:23] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
    [2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Processing I/O, state: waiting-accept, transferred: 0 (fd: 188, listener: 204).
    [2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Accepted connection (fd: 188, listener: 204).
    [2021/09/14 07:47:23] [thread-0] About to wait in GetQueuedCompletionStatus().
    [2021/09/14 07:47:23] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
    [2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Processing I/O, state: disconnecting, transferred: 0 (fd: 192, listener: 204).
    [2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Starting asynchronous accept (fd: 192, listener: 204).
    [2021/09/14 07:47:23] [thread-0] About to wait in GetQueuedCompletionStatus().
    [2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Starting asynchronous read (fd: 188, listener: 204).
    [2021/09/14 07:47:23] [thread-1] About to wait in GetQueuedCompletionStatus().
    [2021/09/14 07:47:23] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
    [2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Processing I/O, state: accepted, transferred: 660 (fd: 188, listener: 204).
    [2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Got 660 byte(s) (fd: 188, listener: 204).
    [2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Starting asynchronous read (fd: 188, listener: 204).
    [2021/09/14 07:47:23] [thread-1] About to wait in GetQueuedCompletionStatus().

    <<< [2021/09/14 07:47:32] Here I close the second connection >>>

    [2021/09/14 07:47:32] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
    [2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Processing I/O, state: accepted, transferred: 0 (fd: 188, listener: 204).
    [2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Got 0 byte(s) (fd: 188, listener: 204).
    [2021/09/14 07:47:32] [thread-1] Error processing I/O for connection 0x000002d13c2869e0.
    [2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Disconnecting socket (fd: 188, listener: 204).
    [2021/09/14 07:47:32] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
    [2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Processing I/O, state: disconnecting, transferred: 0 (fd: 188, listener: 204).
    [2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Starting asynchronous accept (fd: 188, listener: 204).
    [2021/09/14 07:47:32] [thread-1] About to wait in GetQueuedCompletionStatus().
    [2021/09/14 07:47:32] [thread-0] About to wait in GetQueuedCompletionStatus().
    [2021/09/14 07:47:35] Signal received.
    [2021/09/14 07:47:35] [thread-0] GetQueuedCompletionStatus() succeeded, key: 2.
    [2021/09/14 07:47:35] [thread-0] Exiting loop, key: 2.
    [2021/09/14 07:47:35] [thread-1] GetQueuedCompletionStatus() succeeded, key: 2.
    [2021/09/14 07:47:35] [thread-1] Exiting loop, key: 2.


  3. noname 141 Reputation points
    2021-09-15T08:37:03.493+00:00

    This also works for me. The problem I am facing is when I open a second connection and send data. This second connection is not processed until I close the first connection.
    The tcp_receiver listens on 127.0.0.1:2000. The easiest way to test it is:

    1. Open the browser.
    2. Visit http://127.0.0.1:2000
    3. Open another tab and visit http://127.0.0.1:2000
    4. In the debug messages of the tcp_receiver you will see only the data sent by the first tab.
    5. Close the first tab, now you might see the data sent by the second tab.

    As I call AcceptEx() twice, I would expect that I can process both connections.


  4. noname 141 Reputation points
    2021-09-15T09:01:15.713+00:00

    I am posting here the whole test program, maybe it is easier for you to test it.
    I have changed the following:

    • Added timestamps to the debug messages.
    • Changed order (WSASocket() / AcceptEx() / CreateIoCompletionPort()

    Source code:

    #include <stdint.h>
    #include <stdlib.h>
    #include <string.h>
    #include <stdio.h>
    #include <stdarg.h>
    #include <time.h>
    #include <winsock2.h>
    #include <ws2tcpip.h>
    #include <mswsock.h>
    #include <windows.h>
    #include <process.h>
    #include <new>
    
    enum class completion_key {
      io = 1,
      shutdown
    };
    
    struct thread_data {
      size_t nthread;
      HANDLE io_completion_port;
    };
    
    static HANDLE stop_event = nullptr;
    
    static bool init_winsock();
    static BOOL WINAPI signal_handler(DWORD control_type);
    static void log(const char* format, ...);
    
    class connection : private OVERLAPPED {
      private:
        // Connection state.
        enum class state {
          disconnected,
          waiting_accept,
          accepted,
          disconnecting
        };
    
      public:
        // Set address of the AcceptEx() function.
        static void acceptex(LPFN_ACCEPTEX fn)
        {
          _M_acceptex = fn;
        }
    
        // Set address of the DisconnectEx() function.
        static void disconnectex(LPFN_DISCONNECTEX fn)
        {
          _M_disconnectex = fn;
        }
    
        // Create connection.
        static connection* create(SOCKET listener, HANDLE io_completion_port)
        {
          // Create socket.
          const SOCKET fd = WSASocket(AF_INET,
                                      SOCK_STREAM,
                                      IPPROTO_TCP,
                                      nullptr,
                                      0,
                                      WSA_FLAG_OVERLAPPED);
    
          // If the socket could be created...
          if (fd != INVALID_SOCKET) {
    #if DEBUG
            log("Created socket %llu.\n", fd);
    #endif
    
            // Create connection.
            connection* const conn = new (std::nothrow) connection(fd, listener);
    
            // If the connection could be created...
            if (conn) {
    #if DEBUG
              log("Created connection 0x%p (fd: %llu, listener: %llu).\n",
                  static_cast<const void*>(conn),
                  fd,
                  listener);
    #endif // DEBUG
    
              // Start an asynchronous accept.
              if (conn->accept()) {
                // Associate socket with the I/O completion port.
                if (CreateIoCompletionPort(reinterpret_cast<HANDLE>(fd),
                                           io_completion_port,
                                           static_cast<ULONG_PTR>(completion_key::io),
                                           0)) {
    #if DEBUG
                  log("Associated socket %llu with the I/O completion port 0x%p.\n",
                      fd,
                      static_cast<const void*>(io_completion_port));
    #endif // DEBUG
    
                  return conn;
                }
              }
    
              delete conn;
    
              return nullptr;
            }
    
            closesocket(fd);
          }
    
          return nullptr;
        }
    
        // Constructor.
        connection(SOCKET fd, SOCKET listener)
          : OVERLAPPED{0},
            _M_fd(fd),
            _M_listener(listener)
        {
        }
    
        // Destructor.
        ~connection()
        {
          shutdown(_M_fd, SD_BOTH);
          closesocket(_M_fd);
        }
    
        // Start an asynchronous accept.
        bool accept()
        {
    #if DEBUG
          log("[conn: 0x%p] Starting asynchronous accept (fd: %llu, "
              "listener: %llu).\n",
              static_cast<const void*>(this),
              _M_fd,
              _M_listener);
    #endif // DEBUG
    
          DWORD bytes;
          if ((_M_acceptex(_M_listener,
                           _M_fd,
                           _M_addresses,
                           0,
                           sizeof(struct sockaddr_in) + 16,
                           sizeof(struct sockaddr_in) + 16,
                           &bytes,
                           this)) ||
              (WSAGetLastError() == WSA_IO_PENDING)) {
            _M_state = state::waiting_accept;
            return true;
          }
    
          return false;
        }
    
        // Connection has been accepted.
        bool accepted()
        {
    #if DEBUG
          log("[conn: 0x%p] Accepted connection (fd: %llu, listener: %llu).\n",
              static_cast<const void*>(this),
              _M_fd,
              _M_listener);
    #endif // DEBUG
    
          if ((setsockopt(_M_fd,
                          SOL_SOCKET,
                          SO_UPDATE_ACCEPT_CONTEXT,
                          reinterpret_cast<const char*>(&_M_listener),
                          sizeof(SOCKET)) == 0) &&
              (read())) {
            _M_state = state::accepted;
            return true;
          }
    
          return false;
        }
    
        // Start an asynchronous read.
        bool read()
        {
    #if DEBUG
          log("[conn: 0x%p] Starting asynchronous read (fd: %llu, "
              "listener: %llu).\n",
              static_cast<const void*>(this),
              _M_fd,
              _M_listener);
    #endif // DEBUG
    
          WSABUF buf;
          buf.buf = reinterpret_cast<char*>(_M_buf);
          buf.len = sizeof(_M_buf);
    
          DWORD flags = 0;
    
          return ((WSARecv(_M_fd,
                           &buf,
                           1,
                           nullptr,
                           &flags,
                           this,
                           nullptr) == 0) ||
                  (WSAGetLastError() == WSA_IO_PENDING));
        }
    
        // Data has been read.
        bool read(DWORD transferred)
        {
    #if DEBUG
          log("[conn: 0x%p] Got %lu byte(s) (fd: %llu, listener: %llu).\n",
              static_cast<const void*>(this),
              transferred,
              _M_fd,
              _M_listener);
    #endif // DEBUG
    
          if (transferred > 0) {
            log("Read %lu byte(s):\n", transferred);
            log("%.*s\n",
                static_cast<int>(transferred),
                reinterpret_cast<const char*>(_M_buf));
    
            log("===========================================================\n");
    
            // Start another read operation.
            return read();
          } else {
            return false;
          }
        }
    
        // Close connection.
        void close()
        {
    #if DEBUG
          log("[conn: 0x%p] Disconnecting socket (fd: %llu, listener: %llu).\n",
              static_cast<const void*>(this),
              _M_fd,
              _M_listener);
    #endif // DEBUG
    
          // Close connection.
          if (_M_disconnectex(_M_fd, this, TF_REUSE_SOCKET, 0)) {
            _M_state = state::disconnected;
          } else {
            _M_state = (WSAGetLastError() == WSA_IO_PENDING) ? state::disconnecting :
                                                               state::disconnected;
          }
        }
    
        // Process I/O.
        bool process_io(DWORD transferred)
        {
    #if DEBUG
          log("[conn: 0x%p] Processing I/O, state: %s, transferred: %lu "
              "(fd: %llu, listener: %llu).\n",
              static_cast<const void*>(this),
              to_string(_M_state),
              transferred,
              _M_fd,
              _M_listener);
    #endif // DEBUG
    
          switch (_M_state) {
            case state::waiting_accept:
              return accepted();
            case state::accepted:
              return read(transferred);
            case state::disconnecting:
              return accept();
            case state::disconnected:
            default:
              return true;
          }
        }
    
        static const char* to_string(state s)
        {
          switch (s) {
            case state::disconnected:
              return "disconnected";
            case state::waiting_accept:
              return "waiting-accept";
            case state::accepted:
              return "accepted";
            case state::disconnecting:
              return "disconnecting";
            default:
              return "(unknown)";
          }
        }
    
      private:
        // Socket descriptor.
        SOCKET _M_fd;
    
        // Listener socket.
        SOCKET _M_listener;
    
        // Socket addresses.
        uint8_t _M_addresses[2 * (sizeof(struct sockaddr_in) + 16)] = {0};
    
        // Buffer.
        static constexpr const size_t buffer_size = 4 * 1024;
        uint8_t _M_buf[buffer_size];
    
        // Connection state.
        state _M_state;
    
        // Address of the AcceptEx() function.
        static LPFN_ACCEPTEX _M_acceptex;
    
        // Address of the DisconnectEx() function.
        static LPFN_DISCONNECTEX _M_disconnectex;
    
        // Disable copy constructor and assignment operator.
        connection(const connection&) = delete;
        connection& operator=(const connection&) = delete;
    };
    
    LPFN_ACCEPTEX connection::_M_acceptex = nullptr;
    LPFN_DISCONNECTEX connection::_M_disconnectex = nullptr;
    
    static unsigned __stdcall worker(void* arg)
    {
      const thread_data* const data = static_cast<const thread_data*>(arg);
    
      do {
    #if DEBUG
        log("[thread-%zu] About to wait in GetQueuedCompletionStatus().\n", 
            data->nthread);
    #endif // DEBUG
    
        DWORD transferred;
        ULONG_PTR key;
        OVERLAPPED* overlapped;
        if (GetQueuedCompletionStatus(data->io_completion_port,
                                      &transferred,
                                      &key,
                                      &overlapped,
                                      INFINITE)) {
    #if DEBUG
          log("[thread-%zu] GetQueuedCompletionStatus() succeeded, key: %u.\n",
              data->nthread,
              static_cast<unsigned>(key));
    #endif // DEBUG
    
          if (static_cast<completion_key>(key) == completion_key::io) {
            connection* const conn = reinterpret_cast<connection*>(overlapped);
    
            if (!conn->process_io(transferred)) {
    #if DEBUG
              log("[thread-%zu] Error processing I/O for connection 0x%p.\n",
                  data->nthread,
                  static_cast<const void*>(conn));
    #endif // DEBUG
    
              conn->close();
            }
          } else {
    #if DEBUG
            log("[thread-%zu] Exiting loop, key: %u.\n",
                data->nthread,
                static_cast<unsigned>(key));
    #endif // DEBUG
    
            _endthreadex(0);
            return 0;
          }
        } else {
    #if DEBUG
          log("[thread-%zu] GetQueuedCompletionStatus() failed, key: %u, "
              "overlapped: 0x%p.\n",
              data->nthread,
              static_cast<unsigned>(key),
              static_cast<const void*>(overlapped));
    #endif // DEBUG
    
          if (overlapped) {
            reinterpret_cast<connection*>(overlapped)->close();
          }
        }
      } while (true);
    }
    
    int main()
    {
      static constexpr const unsigned short port = 2000;
    
      // Initialize Winsock library.
      if (!init_winsock()) {
        fprintf(stderr, "Error initializing Winsock library.\n");
        return EXIT_FAILURE;
      }
    
      // Create event.
      stop_event = CreateEvent(nullptr, TRUE, FALSE, nullptr);
    
      // If the event could not be created...
      if (!stop_event) {
        fprintf(stderr, "Error creating stop event.\n");
    
        WSACleanup();
    
        return EXIT_FAILURE;
      }
    
      // Install signal handler.
      if (!SetConsoleCtrlHandler(signal_handler, TRUE)) {
        fprintf(stderr, "Error installing signal handler.\n");
    
        CloseHandle(stop_event);
        WSACleanup();
    
        return EXIT_FAILURE;
      }
    
      // Create I/O completion port.
      const HANDLE
        io_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
    
      // If the I/O completion port could not be created...
      if (!io_completion_port) {
        fprintf(stderr, "Error creating I/O completion port.\n");
    
        SetConsoleCtrlHandler(nullptr, FALSE);
        CloseHandle(stop_event);
        WSACleanup();
    
        return EXIT_FAILURE;
      }
    
    #if DEBUG
      log("Created I/O completion port 0x%p.\n",
          static_cast<const void*>(io_completion_port));
    #endif
    
      // Create listener socket.
      const SOCKET listener = WSASocket(AF_INET,
                                        SOCK_STREAM,
                                        IPPROTO_TCP,
                                        nullptr,
                                        0,
                                        WSA_FLAG_OVERLAPPED);
    
      // If the listener socket could not be created...
      if (listener == INVALID_SOCKET) {
        fprintf(stderr, "Error creating listener socket.\n");
    
        CloseHandle(io_completion_port);
        SetConsoleCtrlHandler(nullptr, FALSE);
        CloseHandle(stop_event);
        WSACleanup();
    
        return EXIT_FAILURE;
      }
    
    #if DEBUG
      log("Created listener socket %llu.\n", listener);
    #endif
    
      GUID guid = WSAID_ACCEPTEX;
      LPFN_ACCEPTEX acceptex;
      DWORD bytes;
    
      // Get address of the AcceptEx() function.
      if (WSAIoctl(listener,
                   SIO_GET_EXTENSION_FUNCTION_POINTER,
                   &guid,
                   sizeof(GUID),
                   &acceptex,
                   sizeof(LPFN_ACCEPTEX),
                   &bytes,
                   nullptr,
                   nullptr) != 0) {
        fprintf(stderr,
                "Error getting the address of the AcceptEx() function.\n");
    
        closesocket(listener);
        CloseHandle(io_completion_port);
        SetConsoleCtrlHandler(nullptr, FALSE);
        CloseHandle(stop_event);
        WSACleanup();
    
        return EXIT_FAILURE;
      }
    
      guid = WSAID_DISCONNECTEX;
      LPFN_DISCONNECTEX disconnectex;
    
      // Get address of the DisconnectEx() function.
      if (WSAIoctl(listener,
                   SIO_GET_EXTENSION_FUNCTION_POINTER,
                   &guid,
                   sizeof(GUID),
                   &disconnectex,
                   sizeof(LPFN_DISCONNECTEX),
                   &bytes,
                   nullptr,
                   nullptr) != 0) {
        fprintf(stderr,
                "Error getting the address of the DisconnectEx() function.\n");
    
        closesocket(listener);
        CloseHandle(io_completion_port);
        SetConsoleCtrlHandler(nullptr, FALSE);
        CloseHandle(stop_event);
        WSACleanup();
    
        return EXIT_FAILURE;
      }
    
      struct sockaddr_in addr;
      addr.sin_family = AF_INET;
      addr.sin_addr.S_un.S_addr = htonl(INADDR_LOOPBACK);
      //addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
      addr.sin_port = htons(port);
    
      memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    
      // Bind.
      if (bind(listener,
               reinterpret_cast<const struct sockaddr*>(&addr),
               sizeof(struct sockaddr_in)) != 0) {
        fprintf(stderr, "Error binding to localhost:%u.\n", port);
    
        closesocket(listener);
        CloseHandle(io_completion_port);
        SetConsoleCtrlHandler(nullptr, FALSE);
        CloseHandle(stop_event);
        WSACleanup();
    
        return EXIT_FAILURE;
      }
    
      // Listen.
      if (listen(listener, SOMAXCONN) != 0) {
        fprintf(stderr, "Error listening on localhost:%u.\n", port);
    
        closesocket(listener);
        CloseHandle(io_completion_port);
        SetConsoleCtrlHandler(nullptr, FALSE);
        CloseHandle(stop_event);
        WSACleanup();
    
        return EXIT_FAILURE;
      }
    
      // Associate listener with the I/O completion port.
      if (!CreateIoCompletionPort(reinterpret_cast<HANDLE>(listener),
                                  io_completion_port,
                                  static_cast<ULONG_PTR>(
                                    completion_key::io
                                  ),
                                  0)) {
        fprintf(stderr,
                "Error associating listener socket with the I/O completion "
                "port.\n");
    
        closesocket(listener);
        CloseHandle(io_completion_port);
        SetConsoleCtrlHandler(nullptr, FALSE);
        CloseHandle(stop_event);
        WSACleanup();
    
        return EXIT_FAILURE;
      }
    
    #if DEBUG
      log("Associated listener socket %llu with the I/O completion port 0x%p.\n",
          listener,
          static_cast<const void*>(io_completion_port));
    #endif // DEBUG
    
      static constexpr const size_t number_connections = 4;
      connection* connections[number_connections];
    
      // Set address of the AcceptEx() function.
      connection::acceptex(acceptex);
    
      // Set address of the DisconnectEx() function.
      connection::disconnectex(disconnectex);
    
      // Create connections.
      for (size_t i = 0; i < number_connections; i++) {
        // Create connection.
        connections[i] = connection::create(listener, io_completion_port);
    
        // If the connection could not be created...
        if (!connections[i]) {
          for (size_t j = 0; j < i; j++) {
            delete connections[j];
          }
    
          closesocket(listener);
          CloseHandle(io_completion_port);
          SetConsoleCtrlHandler(nullptr, FALSE);
          CloseHandle(stop_event);
          WSACleanup();
    
          return EXIT_FAILURE;
        }
      }
    
      static constexpr const size_t number_threads = 2;
      HANDLE threads[number_threads];
      thread_data thread_data[number_threads];
    
      // Create threads.
      for (size_t i = 0; i < number_threads; i++) {
        thread_data[i].nthread = i;
        thread_data[i].io_completion_port = io_completion_port;
    
        // Create thread.
        threads[i] = reinterpret_cast<HANDLE>(_beginthreadex(nullptr,
                                                             0,
                                                             worker,
                                                             &thread_data[i],
                                                             0,
                                                             nullptr));
    
        // If the thread could not be created...
        if (threads[i] == reinterpret_cast<HANDLE>(-1)) {
          fprintf(stderr, "Error creating thread %zu.\n", i + 1);
    
          // Save number of threads.
          const size_t nthreads = i;
    
          // Notify threads that they have to stop.
          for (i = 0; i < nthreads; i++) {
            PostQueuedCompletionStatus(io_completion_port,
                                       0,
                                       static_cast<ULONG_PTR>(
                                         completion_key::shutdown
                                       ),
                                       nullptr);
          }
    
          // Wait for the worker threads.
          WaitForMultipleObjects(nthreads, threads, TRUE, INFINITE);
    
          for (i = 0; i < nthreads; i++) {
            CloseHandle(threads[i]);
          }
    
          for (i = 0; i < number_connections; i++) {
            delete connections[i];
          }
    
          closesocket(listener);
          CloseHandle(io_completion_port);
          SetConsoleCtrlHandler(nullptr, FALSE);
          CloseHandle(stop_event);
          WSACleanup();
    
          return EXIT_FAILURE;
        }
      }
    
      log("Waiting for signal to arrive.\n");
    
      // Wait for signal to arrive.
      WaitForSingleObject(stop_event, INFINITE);
    
      log("Signal received.\n");
    
      // Notify threads that they have to stop.
      for (size_t i = 0; i < number_threads; i++) {
        PostQueuedCompletionStatus(io_completion_port,
                                   0,
                                   static_cast<ULONG_PTR>(completion_key::shutdown),
                                   nullptr);
      }
    
      // Wait for the worker threads.
      WaitForMultipleObjects(number_threads, threads, TRUE, INFINITE);
    
      for (size_t i = 0; i < number_threads; i++) {
        CloseHandle(threads[i]);
      }
    
      for (size_t i = 0; i < number_connections; i++) {
        delete connections[i];
      }
    
      closesocket(listener);
      CloseHandle(io_completion_port);
      SetConsoleCtrlHandler(nullptr, FALSE);
      CloseHandle(stop_event);
      WSACleanup();
    
      return EXIT_SUCCESS;
    }
    
    bool init_winsock()
    {
      WORD version = MAKEWORD(2, 2);
      WSADATA wsadata;
      if (WSAStartup(version, &wsadata) == 0) {
        if ((LOBYTE(wsadata.wVersion) == 2) &&
            (HIBYTE(wsadata.wVersion) == 2)) {
          return true;
        }
    
        WSACleanup();
      }
    
      return false;
    }
    
    BOOL WINAPI signal_handler(DWORD control_type)
    {
      switch (control_type) {
        case CTRL_C_EVENT:
        case CTRL_CLOSE_EVENT:
          SetEvent(stop_event);
          return TRUE;
        default:
          return FALSE;
      }
    }
    
    void log(const char* format, ...)
    {
      time_t now = time(nullptr);
      const struct tm* const tm = localtime(&now);
    
      printf("[%04u/%02u/%02u %02u:%02u:%02u] ",
             1900 + tm->tm_year,
             1 + tm->tm_mon,
             tm->tm_mday,
             tm->tm_hour,
             tm->tm_min,
             tm->tm_sec);
    
      va_list ap;
      va_start(ap, format);
      vprintf(format, ap);
      va_end(ap);
    }
    
    0 comments No comments