一、写在前面

前面的文章已经介绍了套接字 I/O 的同步模型、WSAAsyncSelect模型、WSAEventSelect模型,到目前为止套接字I/O还剩下2个模型没有介绍:重叠模型,完成端口模型。

如果程序对性能和并发要求不高,可以使用前面介绍的WSAEventSelect模型;如果对性能和并发有要求,可以使用本文介绍的完成端口模型。
因为完成端口模型是基于重叠模型的,且在易用性、可伸缩性等方面都高于重叠模型,在一般选择重叠模型的场合,都可以用完成端口模型来替代,强烈建议使用完成端口模型。

“完成端口模型”是 Windows 系统上面套接字 I/O 的终极模型,可以用它代替前面的所有模型。如果对完成端口模型有一个好的封装,基本上可以“一招鲜,吃遍天”,免去重复造轮子的麻烦。所以这里对完成端口的模型的介绍和比前面的几篇篇幅更长,示例代码也更加复杂和全面。

Reactor和Proactor

在网络编程中,我们常听到的2种I/O多路复用的模式:reactor和proactor。
对于这2种模式的区别通俗来说就是:

1
2
Reactor: 能收了你跟我说一声。
Proactor: 你帮我最多收十个字节,收好了跟我说一声。

Windows提供的完成端口模型就是Proactor模式;而Linux上面由于没有操作系统的支持,只能使用Reactor模式,如epoll等。

二、完成端口模型介绍

完成端口模型说白了就是,您要做什么事情(如接收连接AcceptEx、发送数据WSASend、接收数据WSARecv、连接服务端ConnectEx),您告诉我,我做完了通知您。这里的“我”指的是操作系统,“您”指的是应用程序。
如应用程序需要接收其他端发来的数据,可以调用WSARecv,并指定接收数据的缓冲区及大小,等其他端发来数据时,操作系统自动将数据放入到应用程序指定的缓冲区中,然后通知应用程序数据来啦。 这个和WSAEventSelect模型最大的不同就是,WSAEventSelect模型只是通知程序数据来了,并没有将数据接收,还需要程序调用recv来接收数据。

2.1 完成端口创建和绑定

1
2
3
4
5
6
HANDLE WINAPI CreateIoCompletionPort(
_In_ HANDLE FileHandle,
_In_opt_ HANDLE ExistingCompletionPort,
_In_ ULONG_PTR CompletionKey,
_In_ DWORD NumberOfConcurrentThreads
);

CreateIoCompletionPort这个函数比较特殊,根据传入的参数不同,它可以实现2个功能:创建一个完成端口;将完成端口和设备(套接字)相绑定。一般对该函数进行如下封装来实现这2个功能:

1
2
3
4
5
6
7
8
HANDLE CreateNewCompletionPort() {
return CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
}

BOOL AssociateDeviceWithCompletionPort(HANDLE completion_port, HANDLE device, DWORD completion_key) {
HANDLE h = CreateIoCompletionPort(device, completion_port, completion_key, 0);
return (h == completion_port);
}

2.2 线程池

前面说到了,完成端口模型就是应用程序等着操作系统把事情做完了通知它。那既然是的操作肯定是阻塞住了的,所以不能在主线程中,我们需要启动子线程去等。可是启动多个子线程去了,一个连接一个线程吗?我们知道,线程越多,占用的系统资源也就越多,而且线程的切换也是消耗CPU时间的。所以线程不是越多越好,这里有一个经验法则就是:线程数量 = CPU数量 * 2

线程数量和CPU数量相同是最理想的环境,这样免去了CPU在各个线程之间切换,但现实情况下,难免某些线程执行某些任务耗时较长,导致CPU将时间片从该线程分拨出去。所以这里用CPU数量乘以2,最大限度的利用CPU资源。这也是完成端口的目标,即最大限度的利用CPU资源。

获取CPU数量的方式:

1
2
3
4
5
int GetNumberOfProcesser() {
SYSTEM_INFO si;
GetSystemInfo(&si);
return si.dwNumberOfProcessors;
}

2.3 AcceptEx等

1
2
3
4
5
6
7
8
9
10
BOOL AcceptEx(
_In_ SOCKET sListenSocket,
_In_ SOCKET sAcceptSocket,
_In_ PVOID lpOutputBuffer,
_In_ DWORD dwReceiveDataLength,
_In_ DWORD dwLocalAddressLength,
_In_ DWORD dwRemoteAddressLength,
_Out_ LPDWORD lpdwBytesReceived,
_In_ LPOVERLAPPED lpOverlapped
);

AcceptEx和WSARecv、WSASend、ConnectEx等函数类似,最后一个参数都是LPOVERLAPPED,需要调用者提供一个重叠结构。
但这个AcceptEx、ConnectEx等函数比较特别,他们是微软专门在Windows操作系统里面提供的扩展函数,不是在Winsock2标准里面提供的,是微软为了方便使用重叠I/O机制,额外提供的一些函数。

以AcceptEx为例,微软的实现是通过mswsock.dll中提供的,所以我们可以通过静态链接mswsock.lib来直接调用AcceptEx。但不推荐使用这种方式,因为每次直接调用AcceptEx时,Service Provider都得要通过WSAIoctl()获取一次该函数指针,这样效率比较低。所以我们一般都是在直接代码中先获取到这个函数指针,并保存下来,后面直接使用这个函数指针就好了。

获取AcceptEx等函数的指针的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
LPFN_ACCEPTEX GetAcceptExFnPointer(SOCKET s)
{
LPFN_ACCEPTEX fn = NULL;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD bytes = 0;

if (SOCKET_ERROR == WSAIoctl(
s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&fn,
sizeof(fn),
&bytes,
NULL,
NULL)) {
return NULL;
}
return fn;
}

LPFN_CONNECTEX GetConnectExFnPointer(SOCKET s)
{
LPFN_CONNECTEX fn = NULL;
GUID GuidConnectEx = WSAID_CONNECTEX;
DWORD bytes = 0;

if (SOCKET_ERROR == WSAIoctl(
s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidConnectEx,
sizeof(GuidConnectEx),
&fn,
sizeof(fn),
&bytes,
NULL,
NULL)) {
return NULL;
}
return fn;
}

LPFN_GETACCEPTEXSOCKADDRS GetAcceptExSockAddrsFnPointer(SOCKET s)
{
LPFN_GETACCEPTEXSOCKADDRS fn = NULL;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes = 0;

if (SOCKET_ERROR == WSAIoctl(
s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs),
&fn,
sizeof(fn),
&bytes,
NULL,
NULL)) {
return NULL;
}
return fn;
}

2.4 GetQueuedCompletionStatus

函数原型如下:

1
2
3
4
5
6
7
BOOL WINAPI GetQueuedCompletionStatus(
_In_ HANDLE CompletionPort,
_Out_ LPDWORD lpNumberOfBytes,
_Out_ PULONG_PTR lpCompletionKey,
_Out_ LPOVERLAPPED *lpOverlapped,
_In_ DWORD dwMilliseconds
);

前面说到了,完成端口模型需要在子线程中等待操作系统做完事情之后的通知。而GetQueuedCompletionStatus函数就是用来等待这个通知的。通过该函数可以获取的本次传输的字节数lpNumberOfBytes、一个用户绑定在套接字上的自定义整数lpCompletionKey、用户调用WSASend等函数时指定的OVERLAPPED结构的指针lpOverlapped

我们比较关注的是lpCompletionKey、lpOverlapped这2个参数:
lpCompletionKey是调用CreateIoCompletionPort函数绑定完成端口和套接字时指定的,每个套接字(SOCKET)对应一个lpCompletionKey。lpCompletionKey可以是包括指针在内的任何整数。

lpOverlapped是每次调用WSASend等函数时指定的,每一次操作(也就是每一次调用,如WSASend, WSARecv, AcceptEx, ConnectEx)对应的lpOverlapped都不一样,所以一次操作对应一个lpOverlapped。一个SOCKET可以有多次操作,多以对应多个lpOverlapped。

对应关系如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
+-----------+           +--------------------+
| | 1 --> 1 | |
| SOCKET +-----------> lpCompletionKey |
| | | |
+-----+-----+ +--------------------+
|
|1 --> n
|
+-----v------------------+
| |
| Send,Recv,Accept... |
| |
+-----+------------------+
|
|1 --> 1
|
+-----v------------------+
| |
| lpOverlapped |
| |
+------------------------+

注:示例代码中的PER_SOCKET_CONTEXT结构对应图中的SOCKET,PER_IO_CONTEXT结构对应图中的lpOverlapped。知道这个对理解示例代码会有很大的帮助。

Windows还提供了一个辅助宏CONTAINING_RECORD,该宏可以根据结构体中的某成员的地址来推算出该结构体整体的地址。
知道了这个功能,我们就可以在lpOverlapped参数上做文章了(扩展),具体见示例。

上面对完成端口模型只做了一个简单的介绍,关于完成端口的详细介绍可以参考《windows核心编程 第5版》 10.3节。

2.5 CONTAINING_RECORD宏的实现原理

该宏的作用就是:根据结构体中的某成员的地址来推算出该结构体整体的地址,相当于一个万能公式。

下面代码的注释中,讲解了该宏的实现原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <Windows.h>
#include <stdio.h>
int main()
{
struct T
{
int a;
int b;
int c;
};
//CONTAINING_RECORD宏的作用就是根据结构体中的某成员的地址来推算出该结构体整体的地址。
T t = { 1, 2, 3 };

//假设我们知道T结构体中b的地址和名称,求t的指针
T *pT = CONTAINING_RECORD(&t.b, T, b);
printf("a:%d b:%d c:%d\n", pT->a, pT->b, pT->c);

//CONTAINING_RECORD的定义:
//((type *)( (PCHAR)(address) - (ULONG_PTR)(&((type *)0)->field)))
//最后一部分(&((type *)0)->field) 将0(空指针)转成type,再取地址。
//在本例中就是将空指针转成T*, 然后指向b这个变量, 然后再取地址。
//这个操作的作用就是:假设T开始在0x000000内存位置上分配内存,在此基础上求b的内存地址,
//这样等同于求得b的内存结构体对齐偏移量, 求得b的地址我们转成ULONG_PTR类型,
//然后用实际b的内存地址减去b的结构体偏移量求得结构体首地址。
//
//分解开来就是:

//这种情况是允许的。
//这个大前提很重要!!!
T *pTemp = (T*)0;

//求b的内存地址,在结构体首地址为0的情况下b的内存地址其实就是自身的对齐大小偏移量!!!
//CONTAINING_RECORD宏的核心!!!
int *pB = &pTemp->b;

ULONG_PTR Offset = (ULONG_PTR)pB; //转成数字, 就是b的偏移量。
printf("b的偏移量:%d\n", Offset);

//因为各个成员的地址是递增的,最后用实际b的地址减b的偏移量的到结构体首地址
T *pFinal = (T*)(((char*)&t.b) - Offset);
printf("T中a:%d b:%d c:%d\n", pFinal->a, pFinal->b, pFinal->c);

return 0;
}

三、示例

示例代码实现如下功能:

  1. 服务端和客户端都使用完成端口模型来实现。
  2. 服务端和客户端之间通过发送消息来模拟TCP的三次握手机制。

3.1 辅助函数

iocp.h和iocp.cpp中实现了IOCP相关的结构体定义和一些通用的辅助函数:
iocp.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// iocp.h
#ifndef IOCP_H_
#define IOCP_H_

#include <winsock2.h>
#include <MSWSock.h>
#include <vector>

#define MAX_BUFFER_LEN 8192
#define EXIT_CODE 0

namespace IOCP {

typedef enum _OPERATION_TYPE
{
ACCEPT_POSTED,
CONNECT_POSTED,
SEND_POSTED,
RECV_POSTED,
NULL_POSTED
}OPERATION_TYPE;


typedef struct _PER_IO_CONTEXT
{
OVERLAPPED overlapped;
SOCKET socket;
WSABUF wsa_buffer;
char buffer[MAX_BUFFER_LEN];
OPERATION_TYPE operation_type;


_PER_IO_CONTEXT() {
ZeroMemory(&overlapped, sizeof(overlapped));
ZeroMemory(buffer, MAX_BUFFER_LEN);
socket = INVALID_SOCKET;
wsa_buffer.buf = buffer;
wsa_buffer.len = MAX_BUFFER_LEN;
operation_type = NULL_POSTED;
}

~_PER_IO_CONTEXT() {
if (socket != INVALID_SOCKET) {
closesocket(socket);
socket = INVALID_SOCKET;
}
}

void ResetBuffer() {
ZeroMemory(buffer, MAX_BUFFER_LEN);
}

} PER_IO_CONTEXT;



typedef struct _PER_SOCKET_CONTEXT {
SOCKET socket;
SOCKADDR_IN client_addr;
std::vector<_PER_IO_CONTEXT*> io_ctx_array;

_PER_SOCKET_CONTEXT() {
socket = INVALID_SOCKET;
memset(&client_addr, 0, sizeof(client_addr));
}

~_PER_SOCKET_CONTEXT()
{
if (socket != INVALID_SOCKET) {
closesocket(socket);
socket = INVALID_SOCKET;
}

for (size_t i = 0; i < io_ctx_array.size(); i++) {
delete io_ctx_array[i];
}
io_ctx_array.clear();
}


_PER_IO_CONTEXT* GetNewIoContext() {
_PER_IO_CONTEXT* p = new _PER_IO_CONTEXT;

io_ctx_array.push_back(p);

return p;
}

void RemoveContext(_PER_IO_CONTEXT* pContext) {
for (std::vector<_PER_IO_CONTEXT*>::iterator it = io_ctx_array.begin();
it != io_ctx_array.end(); it++) {
if (pContext == *it) {
delete pContext;
pContext = NULL;
io_ctx_array.erase(it);
break;
}
}
}
} PER_SOCKET_CONTEXT;

int GetNumberOfProcesser();
HANDLE CreateNewCompletionPort();
BOOL AssociateDeviceWithCompletionPort(HANDLE completion_port, HANDLE device, DWORD completion_key);

LPFN_ACCEPTEX GetAcceptExFnPointer(SOCKET s);
LPFN_CONNECTEX GetConnectExFnPointer(SOCKET s);
LPFN_GETACCEPTEXSOCKADDRS GetAcceptExSockAddrsFnPointer(SOCKET s);
};

#endif // IOCP_H_

iocp.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// iocp.cpp
#include "iocp.h"

namespace IOCP {

int GetNumberOfProcesser() {
SYSTEM_INFO si;
GetSystemInfo(&si);
return si.dwNumberOfProcessors;
}

HANDLE CreateNewCompletionPort() {
return CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
}

BOOL AssociateDeviceWithCompletionPort(HANDLE completion_port, HANDLE device, DWORD completion_key) {
HANDLE h = CreateIoCompletionPort(device, completion_port, completion_key, 0);
return (h == completion_port);
}


LPFN_ACCEPTEX GetAcceptExFnPointer(SOCKET s) {
LPFN_ACCEPTEX fn = NULL;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD bytes = 0;

if (SOCKET_ERROR == WSAIoctl(
s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&fn,
sizeof(fn),
&bytes,
NULL,
NULL)) {
return NULL;
}
return fn;
}

LPFN_CONNECTEX GetConnectExFnPointer(SOCKET s) {
LPFN_CONNECTEX fn = NULL;
GUID GuidConnectEx = WSAID_CONNECTEX;
DWORD bytes = 0;

if (SOCKET_ERROR == WSAIoctl(
s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidConnectEx,
sizeof(GuidConnectEx),
&fn,
sizeof(fn),
&bytes,
NULL,
NULL)) {
return NULL;
}
return fn;
}

LPFN_GETACCEPTEXSOCKADDRS GetAcceptExSockAddrsFnPointer(SOCKET s) {
LPFN_GETACCEPTEXSOCKADDRS fn = NULL;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes = 0;

if (SOCKET_ERROR == WSAIoctl(
s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs),
&fn,
sizeof(fn),
&bytes,
NULL,
NULL)) {
return NULL;
}
return fn;
}
}

3.1 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
#include <winsock2.h>
#include <iostream>
#include <assert.h>
#include <vector>
#include <process.h>
#include "iocp.h"

using namespace std;

const u_short kPort = 10001;
const std::string kSYN = "(SYN) hello server, I'm client. Can you hear me?";
const std::string kSYN_ACK = "(SYN+ACK) hello client, I'm server. I can hear you, can you hear me?";
const std::string kACK = "(ACK) hello server, I'm client. I can hear you!";

#pragma comment(lib, "Ws2_32.lib")

HANDLE g_IOCP = INVALID_HANDLE_VALUE;
HANDLE g_exit = NULL;

int g_work_thread_num = 0;
HANDLE *g_work_threads = NULL;

IOCP::PER_SOCKET_CONTEXT *g_listen_ctx = NULL;

CRITICAL_SECTION g_cs_socket_ctx_array;
std::vector<IOCP::PER_SOCKET_CONTEXT*> g_socket_ctx_array;

LPFN_ACCEPTEX g_AcceptExFn = NULL;
LPFN_GETACCEPTEXSOCKADDRS g_AcceptExSockAddrsFn = NULL;

// 管理g_socket_ctx_array
//
void AddSocketContext(IOCP::PER_SOCKET_CONTEXT *socket_ctx) {
EnterCriticalSection(&g_cs_socket_ctx_array);
g_socket_ctx_array.push_back(socket_ctx);
LeaveCriticalSection(&g_cs_socket_ctx_array);
}

void RemoveSocketContext(IOCP::PER_SOCKET_CONTEXT *socket_ctx) {
EnterCriticalSection(&g_cs_socket_ctx_array);
for (std::vector<IOCP::PER_SOCKET_CONTEXT*>::iterator it = g_socket_ctx_array.begin(); it != g_socket_ctx_array.end(); it++) {
if (*it == socket_ctx) {
delete *it;
g_socket_ctx_array.erase(it);
break;
}
}
LeaveCriticalSection(&g_cs_socket_ctx_array);
}

void ClearSocketContextArray() {
EnterCriticalSection(&g_cs_socket_ctx_array);
for (std::vector<IOCP::PER_SOCKET_CONTEXT*>::iterator it = g_socket_ctx_array.begin(); it != g_socket_ctx_array.end(); it++) {
closesocket((*it)->socket);
delete *it;
}
g_socket_ctx_array.clear();
LeaveCriticalSection(&g_cs_socket_ctx_array);
}

// 发送Accept、Recv、Send请求
//
bool PostAccept(IOCP::PER_IO_CONTEXT* io_ctx) {
if (io_ctx == NULL)
return false;

io_ctx->operation_type = IOCP::ACCEPT_POSTED;
io_ctx->ResetBuffer();
io_ctx->socket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);

if (io_ctx->socket == INVALID_SOCKET) {
printf("WSASocket failed with code: %d\n", WSAGetLastError());
return false;
}

DWORD bytes = 0;
if (g_AcceptExFn(g_listen_ctx->socket,
io_ctx->socket,
io_ctx->wsa_buffer.buf,
0,
sizeof(SOCKADDR_IN) + 16,
sizeof(SOCKADDR_IN) + 16,
&bytes,
&io_ctx->overlapped) == FALSE) {
int gle = WSAGetLastError();
if (gle != WSA_IO_PENDING) {
printf("AcceptEx failed with code: %d\n", gle);
return false;
}
}

return true;
}

bool PostRecv(IOCP::PER_IO_CONTEXT* io_ctx) {
if (io_ctx == NULL)
return false;

io_ctx->operation_type = IOCP::RECV_POSTED;
io_ctx->ResetBuffer();

DWORD recv_bytes = 0;
DWORD flags = 0;
int ret = WSARecv(io_ctx->socket, &io_ctx->wsa_buffer, 1, &recv_bytes, &flags, &io_ctx->overlapped, NULL);
if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
return false;
}

return true;
}

bool PostSend(IOCP::PER_IO_CONTEXT* io_ctx, const char* msg, int msg_len) {
if (io_ctx == NULL)
return false;

io_ctx->operation_type = IOCP::SEND_POSTED;
memcpy(io_ctx->wsa_buffer.buf, msg, msg_len);
io_ctx->wsa_buffer.len = msg_len;

DWORD sent_bytes = 0;
int ret = WSASend(io_ctx->socket, &io_ctx->wsa_buffer, 1, &sent_bytes, 0, &io_ctx->overlapped, NULL);
if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
return false;
}

return true;
}

// 处理Accept、Recv、Send完成之后的通知
//
bool DoAccept(IOCP::PER_SOCKET_CONTEXT *socket_ctx, IOCP::PER_IO_CONTEXT *io_ctx) {
SOCKADDR_IN* ClientAddr = NULL;
SOCKADDR_IN* LocalAddr = NULL;
int remoteLen = sizeof(SOCKADDR_IN);
int localLen = sizeof(SOCKADDR_IN);

g_AcceptExSockAddrsFn(io_ctx->wsa_buffer.buf, io_ctx->wsa_buffer.len - ((sizeof(SOCKADDR_IN) + 16) * 2),
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&ClientAddr, &remoteLen);

printf("* new connection(%s:%d): %s\n", inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port), io_ctx->wsa_buffer.buf);

// 此次创建一个新的PER_SOCKET_CONTEXT,之前老的PER_SOCKET_CONTEXT继续用作接收客户端连接
//
IOCP::PER_SOCKET_CONTEXT *new_socket_ctx = new IOCP::PER_SOCKET_CONTEXT();
new_socket_ctx->socket = io_ctx->socket;

if (!IOCP::AssociateDeviceWithCompletionPort(g_IOCP, (HANDLE)new_socket_ctx->socket, (DWORD)new_socket_ctx)) {
printf("AssociateDeviceWithCompletionPort failed\n");
delete new_socket_ctx;
new_socket_ctx = NULL;
return false;
}

AddSocketContext(new_socket_ctx);

// post recv
IOCP::PER_IO_CONTEXT *new_io_ctx = new_socket_ctx->GetNewIoContext();
new_io_ctx->socket = new_socket_ctx->socket;
if (!PostRecv(new_io_ctx)) {
printf("PostRecv failed\n");
return false;
}

// post new accept
if (!PostAccept(io_ctx)) {
printf("PostAccept failed\n");
return false;
}

return true;
}

bool DoRecv(IOCP::PER_SOCKET_CONTEXT *socket_ctx, IOCP::PER_IO_CONTEXT *io_ctx) {
printf("recv: %s\n", io_ctx->wsa_buffer.buf);

if (strcmp(io_ctx->wsa_buffer.buf, kSYN.c_str()) == 0) {
// SYN+ACK
IOCP::PER_IO_CONTEXT * new_io_ctx = socket_ctx->GetNewIoContext();
new_io_ctx->socket = socket_ctx->socket;

if (!PostSend(new_io_ctx, kSYN_ACK.c_str(), kSYN_ACK.length())) {
printf("PostSend failed\n");
return false;
}
}

// post new recv
if (!PostRecv(io_ctx)) {
printf("PostRecv failed\n");
return false;
}

return true;
}

bool DoSend(IOCP::PER_SOCKET_CONTEXT *socket_ctx, IOCP::PER_IO_CONTEXT *io_ctx) {
printf("send: %s\n", io_ctx->wsa_buffer.buf);
return true;
}

// 工作线程
unsigned int __stdcall WorkThreadProc(void *arg) {
DWORD transferred_bytes = 0;
IOCP::PER_SOCKET_CONTEXT *socket_ctx = NULL;
OVERLAPPED *overlapped = NULL;
DWORD gle;

while (WaitForSingleObject(g_exit, 0) != WAIT_OBJECT_0) {
BOOL ret = GetQueuedCompletionStatus(g_IOCP, &transferred_bytes, (PULONG_PTR)&socket_ctx, &overlapped, INFINITE);
gle = GetLastError();

if (socket_ctx == EXIT_CODE) {
break;
}

if (ret == FALSE) {
if (gle == WAIT_TIMEOUT) {
continue;
}
else if (gle == ERROR_NETNAME_DELETED) {
printf("client exit\n");

RemoveSocketContext(socket_ctx);

continue;
}
else {
RemoveSocketContext(socket_ctx);
break;
}
}
else {
// http://blog.csdn.net/china_jeffery/article/details/78801331
IOCP::PER_IO_CONTEXT *io_ctx = CONTAINING_RECORD(overlapped, IOCP::PER_IO_CONTEXT, overlapped);

if ((transferred_bytes == 0) && (io_ctx->operation_type == IOCP::RECV_POSTED || io_ctx->operation_type == IOCP::SEND_POSTED)) {
printf("client disconnect\n");
RemoveSocketContext(socket_ctx);
continue;
}

switch (io_ctx->operation_type)
{
case IOCP::ACCEPT_POSTED:
DoAccept(socket_ctx, io_ctx);

break;
case IOCP::RECV_POSTED:
DoRecv(socket_ctx, io_ctx);

break;
case IOCP::SEND_POSTED:
DoSend(socket_ctx, io_ctx);

break;
default:
assert(false);
}
}
}
return 0;
}

int main()
{
WSADATA wsaData;
WORD wVersionRequested = MAKEWORD(2, 2);
WSAStartup(wVersionRequested, &wsaData);

do
{
InitializeCriticalSection(&g_cs_socket_ctx_array);
g_IOCP = IOCP::CreateNewCompletionPort();
g_exit = CreateEvent(NULL, FALSE, FALSE, NULL);

g_work_thread_num = IOCP::GetNumberOfProcesser() * 2;

g_work_threads = new HANDLE[g_work_thread_num];
for (int i = 0; i < g_work_thread_num; i++) {
g_work_threads[i] = (HANDLE)_beginthreadex(NULL, 0, WorkThreadProc, NULL, 0, NULL);
}

g_listen_ctx = new IOCP::PER_SOCKET_CONTEXT;
g_listen_ctx->socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

if (!IOCP::AssociateDeviceWithCompletionPort(g_IOCP, (HANDLE)g_listen_ctx->socket, (DWORD)g_listen_ctx)) {
printf("AssociateDeviceWithCompletionPort failed with code: %d\n", GetLastError());
break;
}

struct sockaddr_in addr = { 0 };
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(kPort);
if (bind(g_listen_ctx->socket, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
printf("bind failed with code: %d\n", WSAGetLastError());
break;
}

if (listen(g_listen_ctx->socket, SOMAXCONN) == SOCKET_ERROR) {
printf("listen failed with code: %d\n", WSAGetLastError());
break;
}

g_AcceptExFn = IOCP::GetAcceptExFnPointer(g_listen_ctx->socket);
if (g_AcceptExFn == NULL) {
printf("GetAcceptExFnPointer failed\n");
break;
}

g_AcceptExSockAddrsFn = IOCP::GetAcceptExSockAddrsFnPointer(g_listen_ctx->socket);
if (g_AcceptExSockAddrsFn == NULL) {
printf("GetAcceptExSockAddrsFnPointer failed\n");
break;
}

int i = 0;
for (; i < 10; i++) {
IOCP::PER_IO_CONTEXT *io_ctx = g_listen_ctx->GetNewIoContext();
if (PostAccept(io_ctx) == FALSE) {
break;
}
}
if(i != 10)
break;


} while (FALSE);


printf("\npress any ket to stop server...\n");
getchar();

SetEvent(g_exit);
for (int i = 0; i < g_work_thread_num; i++) {
PostQueuedCompletionStatus(g_IOCP, 0, (DWORD)EXIT_CODE, NULL);
}
WaitForMultipleObjects(g_work_thread_num, g_work_threads, TRUE, INFINITE);

ClearSocketContextArray();

printf("\npress any ket to exit...\n");
getchar();

DeleteCriticalSection(&g_cs_socket_ctx_array);
WSACleanup();
return 0;
}

3.2 客户端

客户端代码和服务端类似,唯一需要注意的是ConnectEx函数调用之前,需要将SOCKET进行bind操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#include <winsock2.h>
#include <iostream>
#include <assert.h>
#include <vector>
#include <process.h>
#include "iocp.h"

using namespace std;

const std::string kIP = "127.0.0.1";
const u_short kPort = 10001;

const std::string kSYN = "(SYN) hello server, I'm client. Can you hear me?";
const std::string kSYN_ACK = "(SYN+ACK) hello client, I'm server. I can hear you, can you hear me?";
const std::string kACK = "(ACK) hello server, I'm client. I can hear you!";

#pragma comment(lib, "Ws2_32.lib")

HANDLE g_IOCP = INVALID_HANDLE_VALUE;
HANDLE g_exit = NULL;

int g_work_thread_num = 0;
HANDLE *g_work_threads = NULL;

IOCP::PER_SOCKET_CONTEXT *g_client_ctx = NULL;


LPFN_CONNECTEX g_ConnectExFn = NULL;


bool PostConnect(IOCP::PER_IO_CONTEXT* io_ctx, const std::string &ip, int port) {
if (io_ctx == NULL)
return false;
io_ctx->operation_type = IOCP::CONNECT_POSTED;
io_ctx->ResetBuffer();

// ConnectEx requires the socket to be initially bound.
struct sockaddr_in addr0 = { 0 };
addr0.sin_family = AF_INET;
addr0.sin_addr.s_addr = INADDR_ANY;
addr0.sin_port = 0;
int ret = bind(io_ctx->socket, (SOCKADDR*)&addr0, sizeof(addr0));
if (ret != 0) {
printf("bind failed: %d\n", WSAGetLastError());
return false;
}

struct sockaddr_in addr1 = { 0 };
addr1.sin_family = AF_INET;
addr1.sin_addr.s_addr = inet_addr(ip.c_str());
addr1.sin_port = htons(port);

ret = g_ConnectExFn(io_ctx->socket,
reinterpret_cast<const sockaddr*>(&addr1),
sizeof(addr1),
NULL,
0,
NULL,
&io_ctx->overlapped);
int gle = WSAGetLastError();
if (ret == SOCKET_ERROR && gle != WSA_IO_PENDING) {
return false;
}

return true;
}

bool PostRecv(IOCP::PER_IO_CONTEXT* io_ctx) {
if (io_ctx == NULL)
return false;

io_ctx->operation_type = IOCP::RECV_POSTED;
io_ctx->ResetBuffer();

DWORD recv_bytes = 0;
DWORD flags = 0;
int ret = WSARecv(io_ctx->socket, &io_ctx->wsa_buffer, 1, &recv_bytes, &flags, &io_ctx->overlapped, NULL);
if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
return false;
}

return true;
}

bool PostSend(IOCP::PER_IO_CONTEXT* io_ctx, const char* msg, int msg_len) {
if (io_ctx == NULL)
return false;

io_ctx->operation_type = IOCP::SEND_POSTED;
memcpy(io_ctx->wsa_buffer.buf, msg, msg_len);
io_ctx->wsa_buffer.len = msg_len;

DWORD sent_bytes = 0;
int ret = WSASend(io_ctx->socket, &io_ctx->wsa_buffer, 1, &sent_bytes, 0, &io_ctx->overlapped, NULL);
int gle = WSAGetLastError();
if (ret == SOCKET_ERROR && gle != WSA_IO_PENDING) {
printf("WSASend failed with code: %d\n", gle);
return false;
}

return true;
}

bool DoConnect(IOCP::PER_SOCKET_CONTEXT *socket_ctx, IOCP::PER_IO_CONTEXT *io_ctx) {
printf("connect to server\n");

if (!PostRecv(io_ctx)) {
printf("PostRecv failed\n");
return false;
}

IOCP::PER_IO_CONTEXT* new_io_ctx = socket_ctx->GetNewIoContext();
new_io_ctx->socket = socket_ctx->socket;

if (!PostSend(new_io_ctx, kSYN.c_str(), kSYN.length())) {
printf("PostSend failed\n");
return false;
}

return true;
}

bool DoRecv(IOCP::PER_SOCKET_CONTEXT *socket_ctx, IOCP::PER_IO_CONTEXT *io_ctx) {
printf("recv: %s\n", io_ctx->wsa_buffer.buf);

if (strcmp(io_ctx->wsa_buffer.buf, kSYN_ACK.c_str()) == 0) {
// ACK
IOCP::PER_IO_CONTEXT * new_io_ctx = socket_ctx->GetNewIoContext();
new_io_ctx->socket = socket_ctx->socket;

if (!PostSend(new_io_ctx, kACK.c_str(), kACK.length())) {
printf("PostSend failed\n");
return false;
}
}

// post new recv
if (!PostRecv(io_ctx)) {
printf("PostRecv failed\n");
return false;
}

return true;
}

bool DoSend(IOCP::PER_SOCKET_CONTEXT *socket_ctx, IOCP::PER_IO_CONTEXT *io_ctx) {
printf("send: %s\n", io_ctx->wsa_buffer.buf);
return true;
}

unsigned int __stdcall WorkThreadProc(void *arg) {
DWORD transferred_bytes = 0;
IOCP::PER_SOCKET_CONTEXT *socket_ctx = NULL;
OVERLAPPED *overlapped = NULL;
DWORD gle;

while (WaitForSingleObject(g_exit, 0) != WAIT_OBJECT_0) {
BOOL ret = GetQueuedCompletionStatus(g_IOCP, &transferred_bytes, (PULONG_PTR)&socket_ctx, &overlapped, INFINITE);
gle = GetLastError();

if (socket_ctx == EXIT_CODE) {
break;
}

if (ret == FALSE) {
if (gle == WAIT_TIMEOUT) {
continue;
}
else if (gle == ERROR_NETNAME_DELETED) {
printf("server exit\n");
closesocket(socket_ctx->socket);
socket_ctx->socket = INVALID_SOCKET;
break;
}
else {
closesocket(socket_ctx->socket);
socket_ctx->socket = INVALID_SOCKET;
break;
}
}
else {
IOCP::PER_IO_CONTEXT *io_ctx = CONTAINING_RECORD(overlapped, IOCP::PER_IO_CONTEXT, overlapped);

switch (io_ctx->operation_type)
{
case IOCP::CONNECT_POSTED:
DoConnect(socket_ctx, io_ctx);

break;
case IOCP::RECV_POSTED:
DoRecv(socket_ctx, io_ctx);

break;
case IOCP::SEND_POSTED:
DoSend(socket_ctx, io_ctx);

break;
default:
assert(false);
}
}
}
return 0;
}


int main()
{
WSADATA wsaData;
WORD wVersionRequested = MAKEWORD(2, 2);
WSAStartup(wVersionRequested, &wsaData);

do
{
g_IOCP = IOCP::CreateNewCompletionPort();
g_exit = CreateEvent(NULL, FALSE, FALSE, NULL);

g_work_thread_num = IOCP::GetNumberOfProcesser() * 2;

g_work_threads = new HANDLE[g_work_thread_num];
for (int i = 0; i < g_work_thread_num; i++) {
g_work_threads[i] = (HANDLE)_beginthreadex(NULL, 0, WorkThreadProc, NULL, 0, NULL);
}

g_client_ctx = new IOCP::PER_SOCKET_CONTEXT;
g_client_ctx->socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

if (!IOCP::AssociateDeviceWithCompletionPort(g_IOCP, (HANDLE)g_client_ctx->socket, (DWORD)g_client_ctx)) {
printf("AssociateDeviceWithCompletionPort failed with code: %d\n", GetLastError());
break;
}

g_ConnectExFn = IOCP::GetConnectExFnPointer(g_client_ctx->socket);
if (g_ConnectExFn == NULL) {
printf("GetConnectExFnPointer failed\n");
break;
}

IOCP::PER_IO_CONTEXT* io_ctx = g_client_ctx->GetNewIoContext();
io_ctx->socket = g_client_ctx->socket;
if (!PostConnect(io_ctx, kIP, kPort)) {
printf("PostConnect failed\n");
}

} while (FALSE);


printf("press any key to exit client...\n");
getchar();

SetEvent(g_exit);
closesocket(g_client_ctx->socket);

getchar();
WSACleanup();
return 0;
}

完整工程下载地址:https://github.com/winsoft666/CodeSnippet/tree/main/CompletionPort-Sample