阅读本文之前您可能需要知道onethreadoneloop思想,如果您还不熟悉,可以参考这篇《onethreadoneloop思想》。
为了行文方便,以下将侦听socket称之为listenfd,将由调用accept函数返回的socket称之为clientfd。
我们知道如果需要使用IO复用函数统一管理各个fd,需要将clientfd设置成非阻塞的,那么listenfd一定要设置成非阻塞的吗?答案是不一定的——只要不用IO复用函数去管理listenfd就可以了,listenfd如果不设置成非阻塞的,那么accept函数在没有新连接时就会阻塞。
1.结构一listenfd设置为阻塞模式,为了listenfd独立分配一个接受连接线程有很多的服务器程序结构确实采用的就是阻塞的listenfd,为了不让accept函数在没有连接时阻塞对程序其他逻辑执行流造成影响,我们通常将accept函数放在一个独立的线程中,这个线程的伪码如下:
1//接受连接线程2void*accept_thread_func(void*param)3{4//可以在这里做一些初始化工作...56while(退出标志)7{8structsockaddr_inclientaddr;9socklen_tclientaddrlen=sizeof(clientaddr);10//没有连接时,线程会阻塞在accept函数处11intclientfd=accept(listenfd,(structsockaddr*)clientaddr,clientaddrlen);12if(clientfd!=-1)13{14//出错了,可以在此做一些清理资源动作,如关闭listenfd15break;16}//将clientfd交给其他IO线程的IO复用函数19//由于跨线程操作,可以需要一些锁对公共操作的资源进行保护20}21}
其他IO线程的结构还是利用IO复用函数处理clientfd的onethreadoneloop结构,这里以epoll_wait为例,即:
1//其他IO线程2void*io_thread_func(void*param)3{4//可以在这里做一些初始化工作56while(退出标志)7{8epoll_eventepoll_events[];9//所有的clientfd都挂载到epollfd由epoll_wait统一检测读写事件10n=epoll_wait(epollfd,epoll_events,,);//epoll_wait返回时处理对应clientfd上的读写事件//其他一些操作15}16}
当然,这里的IO线程可以存在多个,这种结构示意图如下:
将clientfd从accept_thread_func交给io_thread_func方法也很多,这里以使用一个互斥锁来实现为例:
1//存储accept函数产生的clientfd的多线程共享变量2std::vectorintg_vecClientfds;3//保护g_vecClientfds的互斥体4std::mutexg_clientfdMutex;56//接受连接线程7void*accept_thread_func(void*param)8{9//可以在这里做一些初始化工作...while(退出标志)12{13structsockaddr_inclientaddr;14socklen_tclientaddrlen=sizeof(clientaddr);15//没有连接时,线程会阻塞在accept函数处16intclientfd=accept(listenfd,(structsockaddr*)clientaddr,clientaddrlen);17if(clientfd!=-1)18{19//出错了,可以在此做一些清理资源动作,如关闭listenfd20break;21}//将clientfd交给其他IO线程的IO复用函数24//由于跨线程操作,可以需要一些锁对公共操作的资源进行保护25std::lock_guardstd::mutexscopedLock(g_clientfdMutex);26g_vecClientfds.push_back(clientfd);27}28}//其他IO线程31void*io_thread_func(void*param)32{33//可以在这里做一些初始化工作while(退出标志)36{37epoll_eventepoll_events[];38//所有的clientfd都挂载到epollfd由epoll_wait统一检测读写事件39n=epoll_wait(epollfd,epoll_events,,);//epoll_wait返回时处理对应clientfd上的读写事件//其他一些操作//从共享变量g_vecClientfds取出新的clientfd46retrieveNewClientfds(epollfd);47}48}voidretrieveNewClientfds(intepollfd)51{52std::lock_guardstd::mutexscopedLock(g_clientfdMutex);53if(!g_vecClientfds.empty())54{55//遍历g_vecClientfds取出各个fd,然后将fd设置挂载到所在线程的epollfd上//全部取出后,清空g_vecClientfds58g_vecClientfds.clear();59}60}
注意上述代码中,由于要求clientfd是非阻塞的,设置clientfd为非阻塞的这段逻辑你可以放在accept_thread_func或io_thread_func中均可。
上述代码有点效率问题,某个时刻accept_thread_func往g_vecClientfds添加了一个clientfd,但此时如果io_thread_func函数正阻塞在epoll_wait处,所以此时我们要唤醒epoll_wait,我们已经在《onethreadoneloop思想》中介绍了如何设计这个唤醒逻辑,这里就不再赘述了。
2.结构二listenfd为阻塞模式,使用同一个onethreadoneloop结构去处理listenfd的事件单独为listenfd分配一个线程毕竟是对资源的一种浪费,有读者可能说,listenfd虽然设置成了阻塞模式,但我可以将listenfd挂载在到某个loop的epollfd上,当epoll_wait返回且listenfd上有读事件时调用accept函数时,此时accept就不会阻塞了。伪码如下:
1void*io_thread_func(void*param)2{3//可以在这里做一些初始化工作45while(退出标志)6{7epoll_eventepoll_events[];8//listenfd和clientfd都挂载到epollfd由epoll_wait统一检测读写事件9n=epoll_wait(epollfd,epoll_events,,);if(listenfd上有事件)12{13//此时调用accept函数不会阻塞14intclientfd=accept(listenfd,...);//对clientfd作进一步处理17}//其他一些操作20}21}
如上述代码所示,这种情况下确实可以将listenfd设置成阻塞模式,调用accept函数也不会造成流程阻塞。
但是,问题是这样的设计存在严重的效率问题:这种设计在每一轮循环中只能一次接受一个连接(每次循环仅调用了一次accept),如果连接数较多,这种处理速度可能跟不上,所以要在一个循环里面处理accept,但是实际情形是我们没法确定下一轮调用accept时backlog队列中是否还有新连接呀,如果没有,由于listenfd是阻塞模式的,accept会阻塞。
3.结构三listenfd为非阻塞模式,使用同一个onethreadoneloop结构去处理listenfd的事件当将listenfd设置成非阻塞模式,我们就不会存在这种窘境了。伪码如下:
1void*io_thread_func(void*param)2{3//可以在这里做一些初始化工作45while(退出标志)6{7epoll_eventepoll_events[];8//listenfd和clientfd都挂载到epollfd由epoll_wait统一检测读写事件9n=epoll_wait(epollfd,epoll_events,,);if(listenfd上有事件)12{13while(true)14{15//此时调用accept函数不会阻塞16intclientfd=accept(listenfd,...);17if(clientfd==-1)18{19//错误码是EWOULDBLOCK说明此时已经没有新连接了20//可以退出内层的while循环了21if(errno==EWOULDBLOCK)22break;23//被信号中断重新调用一次accept即可24elseif(errno==EINTR)25continue;26else27{28//其他情况认为出错29//做一次错误处理逻辑30}31}else{32//正常接受连接33//对clientfd作进一步处理34}//endinner-if35}//endinner-while-loop}//endouter-if//其他一些操作40}//endouter-while-loop41}
将listenfd设置成非阻塞模式还有一个好处时,我们可以自己定义一次listenfd读事件时最大接受多少连接数,这个逻辑也很容易实现,只需要将上述代码的内层while循环的判断条件从true改成特定的次数就可以:
1void*io_thread_func(void*param)2{3//可以在这里做一些初始化工作45//每次处理的最大连接数目6constintMAX_ACCEPTS_PER_CALL=;7//当前数量8intcurrentAccept;while(退出标志)11{12epoll_eventepoll_events[];13//listenfd和clientfd都挂载到epollfd由epoll_wait统一检测读写事件14n=epoll_wait(epollfd,epoll_events,,);if(listenfd上有事件)17{18currentAccept=0;19while(currentAccept=MAX_ACCEPTS_PER_CALL)20{21//此时调用accept函数不会阻塞22intclientfd=accept(listenfd,...);23if(clientfd==-1)24{25//错误码是EWOULDBLOCK说明此时已经没有新连接了26//可以退出内层的while循环了27if(errno==EWOULDBLOCK)28break;29//被信号中断重新调用一次accept即可30elseif(errno==EINTR)31continue;32else33{34//其他情况认为出错35//做一次错误处理逻辑36}37}else{38//累加处理数量39++currentAccept;40//正常接受连接41//对clientfd作进一步处理42}//endinner-if43}//endinner-while-loop}//endouter-if//其他一些操作48}//endouter-while-loop49}
这是一段比较常用的逻辑,我们以redis-server的源码中的使用为例:
1//