I have tested the server program successfully with the client code. I only add Sleep
after send
ing.
I/O completions ports - Completion packets not seen with simultaneous connections
I have written a test TCP server using IOCP. The server just listens on a port and dumps on the screen the received data (it sends nothing back to the client).
The problem I am facing is that, if the server receives a second connection while there is one connection open, the second connection is not accepted until the first one is closed.
The program works as follows:
- Create an I/O completion port (CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0)).
- Create listener socket.
- Bind listener socket.
- Listen.
- Associate listener socket with the I/O completion port.
- Create 4 sockets (maximum number of simultaneous connections) (4 x WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, WSA_FLAG_OVERLAPPED)).
- Associate the 4 sockets with the I/O completion port.
- Start an asynchronous accept for each socket (4 x AcceptEx()).
- Create two threads which loop calling GetQueuedCompletionStatus().
- When a connection is accepted, I call setsockopt() with the option SO_UPDATE_ACCEPT_CONTEXT and start an asynchronous read (WSARecv()).
- After data has been received, I start another asynchronous read (WSARecv()).
- When the client closes the connection, I call DisconnectEx() and, once DisconnectEx() has completed, I call AcceptEx() again.
Why don't I get notified in GetQueuedCompletionStatus() when a second connection is received?
The output of the program and the source code can be found here:
https://stackoverflow.com/questions/69064482/i-o-completions-ports-completion-packets-not-seen-with-simultaneous-connection
-
Xiaopo Yang - MSFT 11,496 Reputation points Microsoft Vendor
2021-09-15T07:55:18.523+00:00
5 additional answers
Sort by: Most helpful
-
Xiaopo Yang - MSFT 11,496 Reputation points Microsoft Vendor
2021-09-13T07:31:51.267+00:00 There is a example which uses overlapped I/O and completion ports. And I found that the attached code is lack of creating an accepting socket.
-
noname 141 Reputation points
2021-09-14T06:07:54.223+00:00 Thank you for your answer.
I have changed the order (AcceptEx, CreateIoCompletionPort and WSARecv) but still doesn't work.
Only when I close the first connection is the second connection processed.I have added timestamps to the test program so it is easier to see what's going on. I have added a couple of comments in the output (search for <<<):
[2021/09/14 07:46:41] Created I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] Created listener socket 204.
[2021/09/14 07:46:41] Associated listener socket 204 with the I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] Created socket 184.
[2021/09/14 07:46:41] Created connection 0x000002d13c285960 (fd: 184, listener: 204).
[2021/09/14 07:46:41] [conn: 0x000002d13c285960] Starting asynchronous accept (fd: 184, listener: 204).
[2021/09/14 07:46:41] Associated socket 184 with the I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] Created socket 212.
[2021/09/14 07:46:41] Created connection 0x000002d13c2a2490 (fd: 212, listener: 204).
[2021/09/14 07:46:41] [conn: 0x000002d13c2a2490] Starting asynchronous accept (fd: 212, listener: 204).
[2021/09/14 07:46:41] Associated socket 212 with the I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] Created socket 188.
[2021/09/14 07:46:41] Created connection 0x000002d13c2869e0 (fd: 188, listener: 204).
[2021/09/14 07:46:41] [conn: 0x000002d13c2869e0] Starting asynchronous accept (fd: 188, listener: 204).
[2021/09/14 07:46:41] Associated socket 188 with the I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] Created socket 192.
[2021/09/14 07:46:41] Created connection 0x000002d13c287a60 (fd: 192, listener: 204).
[2021/09/14 07:46:41] [conn: 0x000002d13c287a60] Starting asynchronous accept (fd: 192, listener: 204).
[2021/09/14 07:46:41] Associated socket 192 with the I/O completion port 0x00000000000000b4.
[2021/09/14 07:46:41] [thread-0] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:46:41] [thread-1] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:46:41] Waiting for signal to arrive.<<< [2021/09/14 07:47:02] Here comes the first connection (fd: 192) >>>
[2021/09/14 07:47:02] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Processing I/O, state: waiting-accept, transferred: 0 (fd: 192, listener: 204).
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Accepted connection (fd: 192, listener: 204).
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Starting asynchronous read (fd: 192, listener: 204).
[2021/09/14 07:47:02] [thread-1] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:02] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Processing I/O, state: accepted, transferred: 660 (fd: 192, listener: 204).
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Got 660 byte(s) (fd: 192, listener: 204).
[2021/09/14 07:47:02] [conn: 0x000002d13c287a60] Starting asynchronous read (fd: 192, listener: 204).
[2021/09/14 07:47:02] [thread-0] About to wait in GetQueuedCompletionStatus().<<< [2021/09/14 07:47:10] I start the second connection but nothing happens >>>
<<< [2021/09/14 07:47:23] I close the first connection (fd: 192) >>>[2021/09/14 07:47:23] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Processing I/O, state: accepted, transferred: 0 (fd: 192, listener: 204).
[2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Got 0 byte(s) (fd: 192, listener: 204).
[2021/09/14 07:47:23] [thread-0] Error processing I/O for connection 0x000002d13c287a60.
[2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Disconnecting socket (fd: 192, listener: 204).<<< After having closed the first connection (fd: 192) is the second connection (fd: 188) processed >>>
[2021/09/14 07:47:23] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Processing I/O, state: waiting-accept, transferred: 0 (fd: 188, listener: 204).
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Accepted connection (fd: 188, listener: 204).
[2021/09/14 07:47:23] [thread-0] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:23] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Processing I/O, state: disconnecting, transferred: 0 (fd: 192, listener: 204).
[2021/09/14 07:47:23] [conn: 0x000002d13c287a60] Starting asynchronous accept (fd: 192, listener: 204).
[2021/09/14 07:47:23] [thread-0] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Starting asynchronous read (fd: 188, listener: 204).
[2021/09/14 07:47:23] [thread-1] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:23] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Processing I/O, state: accepted, transferred: 660 (fd: 188, listener: 204).
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Got 660 byte(s) (fd: 188, listener: 204).
[2021/09/14 07:47:23] [conn: 0x000002d13c2869e0] Starting asynchronous read (fd: 188, listener: 204).
[2021/09/14 07:47:23] [thread-1] About to wait in GetQueuedCompletionStatus().<<< [2021/09/14 07:47:32] Here I close the second connection >>>
[2021/09/14 07:47:32] [thread-1] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Processing I/O, state: accepted, transferred: 0 (fd: 188, listener: 204).
[2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Got 0 byte(s) (fd: 188, listener: 204).
[2021/09/14 07:47:32] [thread-1] Error processing I/O for connection 0x000002d13c2869e0.
[2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Disconnecting socket (fd: 188, listener: 204).
[2021/09/14 07:47:32] [thread-0] GetQueuedCompletionStatus() succeeded, key: 1.
[2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Processing I/O, state: disconnecting, transferred: 0 (fd: 188, listener: 204).
[2021/09/14 07:47:32] [conn: 0x000002d13c2869e0] Starting asynchronous accept (fd: 188, listener: 204).
[2021/09/14 07:47:32] [thread-1] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:32] [thread-0] About to wait in GetQueuedCompletionStatus().
[2021/09/14 07:47:35] Signal received.
[2021/09/14 07:47:35] [thread-0] GetQueuedCompletionStatus() succeeded, key: 2.
[2021/09/14 07:47:35] [thread-0] Exiting loop, key: 2.
[2021/09/14 07:47:35] [thread-1] GetQueuedCompletionStatus() succeeded, key: 2.
[2021/09/14 07:47:35] [thread-1] Exiting loop, key: 2. -
noname 141 Reputation points
2021-09-15T08:37:03.493+00:00 This also works for me. The problem I am facing is when I open a second connection and send data. This second connection is not processed until I close the first connection.
The tcp_receiver listens on 127.0.0.1:2000. The easiest way to test it is:- Open the browser.
- Visit http://127.0.0.1:2000
- Open another tab and visit http://127.0.0.1:2000
- In the debug messages of the tcp_receiver you will see only the data sent by the first tab.
- Close the first tab, now you might see the data sent by the second tab.
As I call AcceptEx() twice, I would expect that I can process both connections.
-
noname 141 Reputation points
2021-09-15T09:01:15.713+00:00 I am posting here the whole test program, maybe it is easier for you to test it.
I have changed the following:- Added timestamps to the debug messages.
- Changed order (WSASocket() / AcceptEx() / CreateIoCompletionPort()
Source code:
#include <stdint.h> #include <stdlib.h> #include <string.h> #include <stdio.h> #include <stdarg.h> #include <time.h> #include <winsock2.h> #include <ws2tcpip.h> #include <mswsock.h> #include <windows.h> #include <process.h> #include <new> enum class completion_key { io = 1, shutdown }; struct thread_data { size_t nthread; HANDLE io_completion_port; }; static HANDLE stop_event = nullptr; static bool init_winsock(); static BOOL WINAPI signal_handler(DWORD control_type); static void log(const char* format, ...); class connection : private OVERLAPPED { private: // Connection state. enum class state { disconnected, waiting_accept, accepted, disconnecting }; public: // Set address of the AcceptEx() function. static void acceptex(LPFN_ACCEPTEX fn) { _M_acceptex = fn; } // Set address of the DisconnectEx() function. static void disconnectex(LPFN_DISCONNECTEX fn) { _M_disconnectex = fn; } // Create connection. static connection* create(SOCKET listener, HANDLE io_completion_port) { // Create socket. const SOCKET fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, WSA_FLAG_OVERLAPPED); // If the socket could be created... if (fd != INVALID_SOCKET) { #if DEBUG log("Created socket %llu.\n", fd); #endif // Create connection. connection* const conn = new (std::nothrow) connection(fd, listener); // If the connection could be created... if (conn) { #if DEBUG log("Created connection 0x%p (fd: %llu, listener: %llu).\n", static_cast<const void*>(conn), fd, listener); #endif // DEBUG // Start an asynchronous accept. if (conn->accept()) { // Associate socket with the I/O completion port. if (CreateIoCompletionPort(reinterpret_cast<HANDLE>(fd), io_completion_port, static_cast<ULONG_PTR>(completion_key::io), 0)) { #if DEBUG log("Associated socket %llu with the I/O completion port 0x%p.\n", fd, static_cast<const void*>(io_completion_port)); #endif // DEBUG return conn; } } delete conn; return nullptr; } closesocket(fd); } return nullptr; } // Constructor. connection(SOCKET fd, SOCKET listener) : OVERLAPPED{0}, _M_fd(fd), _M_listener(listener) { } // Destructor. ~connection() { shutdown(_M_fd, SD_BOTH); closesocket(_M_fd); } // Start an asynchronous accept. bool accept() { #if DEBUG log("[conn: 0x%p] Starting asynchronous accept (fd: %llu, " "listener: %llu).\n", static_cast<const void*>(this), _M_fd, _M_listener); #endif // DEBUG DWORD bytes; if ((_M_acceptex(_M_listener, _M_fd, _M_addresses, 0, sizeof(struct sockaddr_in) + 16, sizeof(struct sockaddr_in) + 16, &bytes, this)) || (WSAGetLastError() == WSA_IO_PENDING)) { _M_state = state::waiting_accept; return true; } return false; } // Connection has been accepted. bool accepted() { #if DEBUG log("[conn: 0x%p] Accepted connection (fd: %llu, listener: %llu).\n", static_cast<const void*>(this), _M_fd, _M_listener); #endif // DEBUG if ((setsockopt(_M_fd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, reinterpret_cast<const char*>(&_M_listener), sizeof(SOCKET)) == 0) && (read())) { _M_state = state::accepted; return true; } return false; } // Start an asynchronous read. bool read() { #if DEBUG log("[conn: 0x%p] Starting asynchronous read (fd: %llu, " "listener: %llu).\n", static_cast<const void*>(this), _M_fd, _M_listener); #endif // DEBUG WSABUF buf; buf.buf = reinterpret_cast<char*>(_M_buf); buf.len = sizeof(_M_buf); DWORD flags = 0; return ((WSARecv(_M_fd, &buf, 1, nullptr, &flags, this, nullptr) == 0) || (WSAGetLastError() == WSA_IO_PENDING)); } // Data has been read. bool read(DWORD transferred) { #if DEBUG log("[conn: 0x%p] Got %lu byte(s) (fd: %llu, listener: %llu).\n", static_cast<const void*>(this), transferred, _M_fd, _M_listener); #endif // DEBUG if (transferred > 0) { log("Read %lu byte(s):\n", transferred); log("%.*s\n", static_cast<int>(transferred), reinterpret_cast<const char*>(_M_buf)); log("===========================================================\n"); // Start another read operation. return read(); } else { return false; } } // Close connection. void close() { #if DEBUG log("[conn: 0x%p] Disconnecting socket (fd: %llu, listener: %llu).\n", static_cast<const void*>(this), _M_fd, _M_listener); #endif // DEBUG // Close connection. if (_M_disconnectex(_M_fd, this, TF_REUSE_SOCKET, 0)) { _M_state = state::disconnected; } else { _M_state = (WSAGetLastError() == WSA_IO_PENDING) ? state::disconnecting : state::disconnected; } } // Process I/O. bool process_io(DWORD transferred) { #if DEBUG log("[conn: 0x%p] Processing I/O, state: %s, transferred: %lu " "(fd: %llu, listener: %llu).\n", static_cast<const void*>(this), to_string(_M_state), transferred, _M_fd, _M_listener); #endif // DEBUG switch (_M_state) { case state::waiting_accept: return accepted(); case state::accepted: return read(transferred); case state::disconnecting: return accept(); case state::disconnected: default: return true; } } static const char* to_string(state s) { switch (s) { case state::disconnected: return "disconnected"; case state::waiting_accept: return "waiting-accept"; case state::accepted: return "accepted"; case state::disconnecting: return "disconnecting"; default: return "(unknown)"; } } private: // Socket descriptor. SOCKET _M_fd; // Listener socket. SOCKET _M_listener; // Socket addresses. uint8_t _M_addresses[2 * (sizeof(struct sockaddr_in) + 16)] = {0}; // Buffer. static constexpr const size_t buffer_size = 4 * 1024; uint8_t _M_buf[buffer_size]; // Connection state. state _M_state; // Address of the AcceptEx() function. static LPFN_ACCEPTEX _M_acceptex; // Address of the DisconnectEx() function. static LPFN_DISCONNECTEX _M_disconnectex; // Disable copy constructor and assignment operator. connection(const connection&) = delete; connection& operator=(const connection&) = delete; }; LPFN_ACCEPTEX connection::_M_acceptex = nullptr; LPFN_DISCONNECTEX connection::_M_disconnectex = nullptr; static unsigned __stdcall worker(void* arg) { const thread_data* const data = static_cast<const thread_data*>(arg); do { #if DEBUG log("[thread-%zu] About to wait in GetQueuedCompletionStatus().\n", data->nthread); #endif // DEBUG DWORD transferred; ULONG_PTR key; OVERLAPPED* overlapped; if (GetQueuedCompletionStatus(data->io_completion_port, &transferred, &key, &overlapped, INFINITE)) { #if DEBUG log("[thread-%zu] GetQueuedCompletionStatus() succeeded, key: %u.\n", data->nthread, static_cast<unsigned>(key)); #endif // DEBUG if (static_cast<completion_key>(key) == completion_key::io) { connection* const conn = reinterpret_cast<connection*>(overlapped); if (!conn->process_io(transferred)) { #if DEBUG log("[thread-%zu] Error processing I/O for connection 0x%p.\n", data->nthread, static_cast<const void*>(conn)); #endif // DEBUG conn->close(); } } else { #if DEBUG log("[thread-%zu] Exiting loop, key: %u.\n", data->nthread, static_cast<unsigned>(key)); #endif // DEBUG _endthreadex(0); return 0; } } else { #if DEBUG log("[thread-%zu] GetQueuedCompletionStatus() failed, key: %u, " "overlapped: 0x%p.\n", data->nthread, static_cast<unsigned>(key), static_cast<const void*>(overlapped)); #endif // DEBUG if (overlapped) { reinterpret_cast<connection*>(overlapped)->close(); } } } while (true); } int main() { static constexpr const unsigned short port = 2000; // Initialize Winsock library. if (!init_winsock()) { fprintf(stderr, "Error initializing Winsock library.\n"); return EXIT_FAILURE; } // Create event. stop_event = CreateEvent(nullptr, TRUE, FALSE, nullptr); // If the event could not be created... if (!stop_event) { fprintf(stderr, "Error creating stop event.\n"); WSACleanup(); return EXIT_FAILURE; } // Install signal handler. if (!SetConsoleCtrlHandler(signal_handler, TRUE)) { fprintf(stderr, "Error installing signal handler.\n"); CloseHandle(stop_event); WSACleanup(); return EXIT_FAILURE; } // Create I/O completion port. const HANDLE io_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); // If the I/O completion port could not be created... if (!io_completion_port) { fprintf(stderr, "Error creating I/O completion port.\n"); SetConsoleCtrlHandler(nullptr, FALSE); CloseHandle(stop_event); WSACleanup(); return EXIT_FAILURE; } #if DEBUG log("Created I/O completion port 0x%p.\n", static_cast<const void*>(io_completion_port)); #endif // Create listener socket. const SOCKET listener = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, WSA_FLAG_OVERLAPPED); // If the listener socket could not be created... if (listener == INVALID_SOCKET) { fprintf(stderr, "Error creating listener socket.\n"); CloseHandle(io_completion_port); SetConsoleCtrlHandler(nullptr, FALSE); CloseHandle(stop_event); WSACleanup(); return EXIT_FAILURE; } #if DEBUG log("Created listener socket %llu.\n", listener); #endif GUID guid = WSAID_ACCEPTEX; LPFN_ACCEPTEX acceptex; DWORD bytes; // Get address of the AcceptEx() function. if (WSAIoctl(listener, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(GUID), &acceptex, sizeof(LPFN_ACCEPTEX), &bytes, nullptr, nullptr) != 0) { fprintf(stderr, "Error getting the address of the AcceptEx() function.\n"); closesocket(listener); CloseHandle(io_completion_port); SetConsoleCtrlHandler(nullptr, FALSE); CloseHandle(stop_event); WSACleanup(); return EXIT_FAILURE; } guid = WSAID_DISCONNECTEX; LPFN_DISCONNECTEX disconnectex; // Get address of the DisconnectEx() function. if (WSAIoctl(listener, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(GUID), &disconnectex, sizeof(LPFN_DISCONNECTEX), &bytes, nullptr, nullptr) != 0) { fprintf(stderr, "Error getting the address of the DisconnectEx() function.\n"); closesocket(listener); CloseHandle(io_completion_port); SetConsoleCtrlHandler(nullptr, FALSE); CloseHandle(stop_event); WSACleanup(); return EXIT_FAILURE; } struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.S_un.S_addr = htonl(INADDR_LOOPBACK); //addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY); addr.sin_port = htons(port); memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); // Bind. if (bind(listener, reinterpret_cast<const struct sockaddr*>(&addr), sizeof(struct sockaddr_in)) != 0) { fprintf(stderr, "Error binding to localhost:%u.\n", port); closesocket(listener); CloseHandle(io_completion_port); SetConsoleCtrlHandler(nullptr, FALSE); CloseHandle(stop_event); WSACleanup(); return EXIT_FAILURE; } // Listen. if (listen(listener, SOMAXCONN) != 0) { fprintf(stderr, "Error listening on localhost:%u.\n", port); closesocket(listener); CloseHandle(io_completion_port); SetConsoleCtrlHandler(nullptr, FALSE); CloseHandle(stop_event); WSACleanup(); return EXIT_FAILURE; } // Associate listener with the I/O completion port. if (!CreateIoCompletionPort(reinterpret_cast<HANDLE>(listener), io_completion_port, static_cast<ULONG_PTR>( completion_key::io ), 0)) { fprintf(stderr, "Error associating listener socket with the I/O completion " "port.\n"); closesocket(listener); CloseHandle(io_completion_port); SetConsoleCtrlHandler(nullptr, FALSE); CloseHandle(stop_event); WSACleanup(); return EXIT_FAILURE; } #if DEBUG log("Associated listener socket %llu with the I/O completion port 0x%p.\n", listener, static_cast<const void*>(io_completion_port)); #endif // DEBUG static constexpr const size_t number_connections = 4; connection* connections[number_connections]; // Set address of the AcceptEx() function. connection::acceptex(acceptex); // Set address of the DisconnectEx() function. connection::disconnectex(disconnectex); // Create connections. for (size_t i = 0; i < number_connections; i++) { // Create connection. connections[i] = connection::create(listener, io_completion_port); // If the connection could not be created... if (!connections[i]) { for (size_t j = 0; j < i; j++) { delete connections[j]; } closesocket(listener); CloseHandle(io_completion_port); SetConsoleCtrlHandler(nullptr, FALSE); CloseHandle(stop_event); WSACleanup(); return EXIT_FAILURE; } } static constexpr const size_t number_threads = 2; HANDLE threads[number_threads]; thread_data thread_data[number_threads]; // Create threads. for (size_t i = 0; i < number_threads; i++) { thread_data[i].nthread = i; thread_data[i].io_completion_port = io_completion_port; // Create thread. threads[i] = reinterpret_cast<HANDLE>(_beginthreadex(nullptr, 0, worker, &thread_data[i], 0, nullptr)); // If the thread could not be created... if (threads[i] == reinterpret_cast<HANDLE>(-1)) { fprintf(stderr, "Error creating thread %zu.\n", i + 1); // Save number of threads. const size_t nthreads = i; // Notify threads that they have to stop. for (i = 0; i < nthreads; i++) { PostQueuedCompletionStatus(io_completion_port, 0, static_cast<ULONG_PTR>( completion_key::shutdown ), nullptr); } // Wait for the worker threads. WaitForMultipleObjects(nthreads, threads, TRUE, INFINITE); for (i = 0; i < nthreads; i++) { CloseHandle(threads[i]); } for (i = 0; i < number_connections; i++) { delete connections[i]; } closesocket(listener); CloseHandle(io_completion_port); SetConsoleCtrlHandler(nullptr, FALSE); CloseHandle(stop_event); WSACleanup(); return EXIT_FAILURE; } } log("Waiting for signal to arrive.\n"); // Wait for signal to arrive. WaitForSingleObject(stop_event, INFINITE); log("Signal received.\n"); // Notify threads that they have to stop. for (size_t i = 0; i < number_threads; i++) { PostQueuedCompletionStatus(io_completion_port, 0, static_cast<ULONG_PTR>(completion_key::shutdown), nullptr); } // Wait for the worker threads. WaitForMultipleObjects(number_threads, threads, TRUE, INFINITE); for (size_t i = 0; i < number_threads; i++) { CloseHandle(threads[i]); } for (size_t i = 0; i < number_connections; i++) { delete connections[i]; } closesocket(listener); CloseHandle(io_completion_port); SetConsoleCtrlHandler(nullptr, FALSE); CloseHandle(stop_event); WSACleanup(); return EXIT_SUCCESS; } bool init_winsock() { WORD version = MAKEWORD(2, 2); WSADATA wsadata; if (WSAStartup(version, &wsadata) == 0) { if ((LOBYTE(wsadata.wVersion) == 2) && (HIBYTE(wsadata.wVersion) == 2)) { return true; } WSACleanup(); } return false; } BOOL WINAPI signal_handler(DWORD control_type) { switch (control_type) { case CTRL_C_EVENT: case CTRL_CLOSE_EVENT: SetEvent(stop_event); return TRUE; default: return FALSE; } } void log(const char* format, ...) { time_t now = time(nullptr); const struct tm* const tm = localtime(&now); printf("[%04u/%02u/%02u %02u:%02u:%02u] ", 1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec); va_list ap; va_start(ap, format); vprintf(format, ap); va_end(ap); }