I/O多路转接
I/O多路转接,也叫I/O多路复用,I/O Multiplexing,是操作系统提供的一种高级I/O功能,只有当描述符准备好进行I/O时,进程或线程才去执行I/O操作,避免阻塞或者做无用功。select/poll/epoll 时Unix系统为我们提供系统函数接口。
IO模型
非阻塞I/O
通常在谈起I/O时(文件I/O,网络I/O),如没有特殊说明,通常指阻塞式I/O(blcking I/O),即当调用I/O时,若I/O不可用,当前进程或者线程会被挂起,直到I/O可用。如下图(图片来源)所示,

阻塞式I/O会造成系统资源浪费。比如,一台服务器需要处理1000个连接,则需要1000个进程或者线程处理连接,如果1000个连接只有少部分是连接忙碌的,则1000个线程大部分是被阻塞挂起的。假设CPU是4核,为了要跑1000个线程,则每个线程的时间槽非常短,这样就会导致线程切换非常频繁。频繁切换线程是有问题的:
1. 线程是有内存开销的,1个线程可能需要512K(或2M)存放栈,那么1000个线程就要512M(或2G)内存
2. 线程的切换,或者说上下文切换是有CPU开销的,当大量时间花在上下文切换的时候,分配给真正的操作的CPU就要少很多
非阻塞I/O(nonblocking I/O),如下图(图片来源)所示,

进程发起一个read操作时,如果数据还没有准备好,系统不再阻塞进程,而是返回一个error信息,此时进程知道所需数据还没有准备好,于是它可以等待一定时间再次发起read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的系统调用,那么它马上就将数据拷贝到了用户内存,然后返回。所以,用户进程其实是需要不断地主动询问kernel数据是否准备好,也称作轮询,polling。非阻塞I/O的一个问题就是进程需要不断询问kernel数据是否准备好,而大多数时间实际上是数据还没有准备好,所以执行系统调用浪费了浪费了CPU时间,并且每次查询后等待多长时间再进行下一次查询也很难确定。
异步I/O
异步I/O是一种高级I/O技术。当进程发起一个异步read操作时(如aio_read),用户进程可以立刻去做其他事情,kernel不会对用户进程产生任何阻塞,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。注意的是操作的内存缓冲区需要保持稳定并且始终合法。下图(图片来源)是异步I/O的示意图。

为了简化程序设计,常见异步I/O一般采用多线程或者多进程方式,使用同步模型编写程序,但是异步运行这些线程或进程。但是,如果需要大量I/O操作(比如并发网络连接很多),这种异步I/O可能会需要频繁新建、调度、销毁这些线程进程,系统开销较大。
I/O多路转接
一种比较好的技术就是I/O多路转接,为了使用这个技术,首先构造一个文件描述符集合,然后调用 select, poll, epoll 函数,直到这些描述符中的一个准备好进行I/O时(或者超时)才返回。如下图(图片来源)。

注意的是,当进程调用 select时,进程是被阻塞的。kernel会“监视”应用程序感兴趣的文件描述符,当任何一个数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
select
select函数定义如下:
int select(int maxfdp1, fd_set *restrict readfds, fdset *restrict writefds, fdset *restrict execptfds, struct timeval *restrict tvptr);
maxfdp1: 最大描述符编号值+1。在#include <sys/select.h>定义中,指定的最大描述符编号值为1024readfds,writefds,exceptfds: 所关心的可读、可写或处于异常条件的描述符集合tvptr:tvptr == NULL: 永远等待,直到当所指定的描述符中的一个已经准备好或收到中断信号。tvptr->tv_sec == 0 && tvptr->tv_usec == 0: 不等待,测试所有指定的描述符并立即返回。等同于轮询。tvptr->tv_sec > 0 || tvptr->tv_usec > 0: 等待指定的秒数或者微妙数。当指定的描述符之一准备好或者超时,则返回,-1表示中断,0表示超时,>0表示有描述符已经准备好。
此外,#include <sys/select.h>还定义了一些函数用来操作描述符集合:
int FD_ISSET(int fd, fd_set *fdset); // 测试fd是否在集合之中 void FD_CLR(int fd fd_set *fdset); // 将fd从集合中清除 void FD_SET(int fd, fd_set *fdset); // 将fd加入集合之中 void FD_ZERO(fd_set *fdset); // 清空集合
使用select完成socket非阻塞IO
#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
const int BUFFER_SIZE = 4096;
const int SERVER_PORT = 8888;
const int CONNECTIONS = 5;
int init_and_listen(const int port)
{
int socketfd = socket(AF_INET, SOCK_STREAM, 0);
if (socketfd == -1) {
fprintf(stderr, "get socket error\n");
return -1;
}
sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int ret = bind(socketfd, (sockaddr *)&server_addr, sizeof(sockaddr_in));
if (ret == -1) {
fprintf(stderr, "bind socket fail\n");
return -1;
}
ret = listen(socketfd, CONNECTIONS);
if (ret == -1) {
fprintf(stderr, "listen socket error\n");
return -1;
}
return socketfd;
}
int do_select(int listenfd)
{
const int timeout = 5; /*s*/
int server_fds[CONNECTIONS + 1];
for (int i = 0; i <= CONNECTIONS; ++i) {
server_fds[i] = -1;
}
fd_set rfds;
timeval tv;
int max_fd;
while (1) {
FD_ZERO(&rfds);
server_fds[0] = listenfd;
max_fd = -1;
for (int i = 0; i <= CONNECTIONS; ++i) {
if (server_fds[i] > -1) {
FD_SET(server_fds[i], &rfds);
max_fd = std::max(max_fd, server_fds[i]);
}
}
tv.tv_sec = timeout;
tv.tv_usec = 0;
int nready = select(max_fd + 1, &rfds, NULL, NULL, &tv);
if (nready == -1) {
fprintf(stderr, "select error\n");
exit(-1);
} else if (nready == 0) {
fprintf(stdout, "select timeout\n");
} else {
if (FD_ISSET(server_fds[0], &rfds)) {
sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(sockaddr_in);
int conn_fd = accept(server_fds[0], (sockaddr *)&client_addr,
&client_addr_len);
if (conn_fd < 0) {
fprintf(stderr, "error connection\n");
} else {
fprintf(stdout, "connect from %s:%d\n",
inet_ntoa(client_addr.sin_addr),
ntohs(client_addr.sin_port));
int next;
for (next = 1; next <= CONNECTIONS; ++next) {
if (server_fds[next] == -1) break;
}
if (next > CONNECTIONS) {
fprintf(stderr, "too many connections\n");
close(conn_fd);
} else {
server_fds[next] = conn_fd;
}
}
--nready;
}
if (nready > 0) {
for (int i = 1; i <= CONNECTIONS; ++i) {
if (server_fds[i] != -1 && FD_ISSET(server_fds[i], &rfds)) {
char buf[BUFFER_SIZE+1];
int n = recv(server_fds[i], buf, BUFFER_SIZE, 0);
if (n > 0) {
buf[n] = '\0';
fprintf(stdout, "connection %d: %s, n=%d\n", i, buf, n);
} else {
close(server_fds[i]);
server_fds[i] = -1;
}
}
}
}
}
}
return 0;
}
int main()
{
int listenfd = init_and_listen(SERVER_PORT);
if (listenfd == -1) {
fprintf(stderr, "init fail\n");
exit(-1);
}
do_select(listenfd);
return 0;
}
POSIX.1也定义了一个select函数的变体,pselect,定义如下:
#include <sys/select.h> int pselect(int maxfdp1, fd_set *restrict readfds, fs_set *restrict writefds, fd_set *restrict execptfds, const struct timespec *restrict tsptr, const sigset_t *restrict sigmask);
从函数参数上来看,pselect和select功能基本相同,但是pselect提供了以下几点功能:
pselect超时值使用timespec结构,该结构以秒和纳秒表示超时值,可以提供更精准的超时时间。pselect的超时值被声明为const,这保证了调用pselect不会修改此值。pselect可选用信号屏蔽字。若sigmask为NULL,那么在信号方面,select和pselect的运行状况相同。否则,sigmask指向一信号屏蔽字,在调用函数时,以原子操作方式安装信号屏蔽字;在返回时,恢复之前的信号屏蔽字。
poll
select存在以下4个问题:
- fd数目限制
- 3个集合在函数返回时会被内核修改,所以每次调用
select之前都需要重新设置 - 内核需要遍历fdset中的所有fd,查看哪些fd事件实际发生
- 用户需要检查所有注册的fd,查看哪些fd事件发生,在大量连接下,可能实际发生的读写事件fd较少
poll函数功能类似于select,同样提供多路转接技术。poll对select作出了改进,
int poll(struct pollfd *fds, int nfds, int timeout)
struct pollfd
{
int fd;
short events; // 等待的事件
short revents; // 实际发生的事件
};
可以看到poll传入一个pollfd的动态数组,所以不再有fd大小的限制;pollfd定义了等待的事件events和实际发生的事件revents,内核只修改revents,也就解决了select每次需要重新设置的缺陷。但是3和4的问题仍然存在。
使用poll完成socket非阻塞IO
#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
const int BUFFER_SIZE = 4096;
const int SERVER_PORT = 8888;
const int CONNECTIONS = 5;
int init_and_listen(const int port)
{
int socketfd = socket(AF_INET, SOCK_STREAM, 0);
if (socketfd == -1) {
fprintf(stderr, "get socket error\n");
return -1;
}
sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int ret = bind(socketfd, (sockaddr *)&server_addr, sizeof(sockaddr_in));
if (ret == -1) {
fprintf(stderr, "bind socket fail\n");
return -1;
}
ret = listen(socketfd, CONNECTIONS);
if (ret == -1) {
fprintf(stderr, "listen socket error\n");
return -1;
}
return socketfd;
}
int do_poll(int listenfd)
{
const int timeout = 5000; /*ms*/
struct pollfd server_fds[CONNECTIONS + 1];
for (int i = 0; i <= CONNECTIONS; ++i) {
server_fds[i].fd = -1;
}
server_fds[0].fd = listenfd;
server_fds[0].events = POLLIN;
int max_fds_num = 1;
while (1) {
int nready = poll(server_fds, max_fds_num, timeout);
if (nready < 0) {
fprintf(stderr, "poll error\n");
exit(-1);
} else if (nready == 0) {
fprintf(stdout, "poll timeout\n");
} else {
if (server_fds[0].revents & POLLIN) {
sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(sockaddr_in);
int conn_fd = accept(server_fds[0].fd, (sockaddr *)&client_addr,
&client_addr_len);
if (conn_fd < 0) {
fprintf(stderr, "error connection\n");
} else {
fprintf(stdout, "connect from %s:%d\n",
inet_ntoa(client_addr.sin_addr),
ntohs(client_addr.sin_port));
int next;
for (next = 1; next <= CONNECTIONS; ++next) {
if (server_fds[next].fd == -1) {
break;
}
}
if (next > CONNECTIONS) {
fprintf(stderr, "too many connections\n");
close(conn_fd);
} else {
server_fds[next].fd = conn_fd;
server_fds[next].events = POLLIN;
max_fds_num = std::max(max_fds_num, next+1);
}
--nready;
}
}
if (nready > 0) {
for (int i = 1; i < max_fds_num; ++i) {
if (server_fds[i].fd != -1 && server_fds[i].revents & POLLIN) {
char buf[BUFFER_SIZE+1];
int n = recv(server_fds[i].fd, buf, BUFFER_SIZE, 0);
if (n > 0) {
buf[n] = '\0';
fprintf(stdout, "connection %d: %s, n=%d\n", i, buf, n);
} else {
close(server_fds[i].fd);
server_fds[i].fd = -1;
}
}
}
}
}
}
return 0;
}
int main ()
{
int listenfd = init_and_listen(SERVER_PORT);
if (listenfd == -1) {
fprintf(stderr, "init fail\n");
exit(-1);
}
do_poll(listenfd);
return 0;
}
epoll
epoll则解决了select和poll的缺陷。先来看epoll相关接口。
int epoll_create (int __size);
创建一个epoll的句柄,size用来告诉内核需要监听的数目。创建好epoll句柄后,它会占用一个fd,在linux下查看/proc/process_id/fd/,是能够看到这个fd的。所以,使用完epoll后,必须调用close()关闭。
int epoll_ctl (int __epfd, int __op, int __fd, struct epoll_event *__event);
- 第一个参数epfd,epoll句柄
- 第二个参数op表示对fd的操作,包括
/* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */ #define EPOLL_CTL_ADD 1 /* Add a file descriptor to the interface. */ #define EPOLL_CTL_DEL 2 /* Remove a file descriptor from the interface. */ #define EPOLL_CTL_MOD 3 /* Change file descriptor epoll_event structure. */ - 第三个参数需要监听的fd
- 第四个参数需要监听的事件,epoll event定义如下
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
// events可以是以下宏的组合
enum EPOLL_EVENTS
{
EPOLLIN = 0x001,
#define EPOLLIN EPOLLIN // fd可读
EPOLLPRI = 0x002,
#define EPOLLPRI EPOLLPRI // fd有紧急数据可读
EPOLLOUT = 0x004,
#define EPOLLOUT EPOLLOUT // fd可写
EPOLLRDNORM = 0x040,
#define EPOLLRDNORM EPOLLRDNORM //
EPOLLRDBAND = 0x080,
#define EPOLLRDBAND EPOLLRDBAND //
EPOLLWRNORM = 0x100,
#define EPOLLWRNORM EPOLLWRNORM //
EPOLLWRBAND = 0x200,
#define EPOLLWRBAND EPOLLWRBAND //
EPOLLMSG = 0x400,
#define EPOLLMSG EPOLLMSG //
EPOLLERR = 0x008,
#define EPOLLERR EPOLLERR // fd发生错误
EPOLLHUP = 0x010,
#define EPOLLHUP EPOLLHUP // fd被中断
EPOLLRDHUP = 0x2000,
#define EPOLLRDHUP EPOLLRDHUP //
EPOLLWAKEUP = 1u << 29,
#define EPOLLWAKEUP EPOLLWAKEUP //
EPOLLONESHOT = 1u << 30,
#define EPOLLONESHOT EPOLLONESHOT // 只监听一次事件,当监听完这次事件之后,就会把这个fd从epoll的队列中删除,如果还需要继续监听这个socket的话,需要再次把这个fd加入到EPOLL队列里
EPOLLET = 1u << 31
#define EPOLLET EPOLLET // 设置为边缘触发
};
int epoll_wait (int __epfd, struct epoll_event *__events, int __maxevents, int __timeout);
等待事件的发生,并把需要处理的事件通过events返回,函数返回值为需要处理事件的数量。此处可以看出,epoll解决了select/poll需要用户查询所有感兴趣fd的缺陷,尽管有大量连接,但epoll只处理active的连接。
epoll在初始化的时候(OS启动)会开辟自己的内核高速cache,用于安置需要监听的fd,并以红黑树进行管理,支持快速查找、插入、删除。
使用epoll完成socket非阻塞IO(水平触发)
#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
const int BUFFER_SIZE = 4096;
const int SERVER_PORT = 8888;
const int CONNECTIONS = 5;
int init_and_listen(int port)
{
int socketfd = socket(AF_INET, SOCK_STREAM, 0);
if (socketfd == -1) {
fprintf(stderr, "get socket error\n");
return -1;
}
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int ret = bind(socketfd, (sockaddr *)&server_addr, sizeof(sockaddr_in));
if (ret == -1) {
fprintf(stderr, "bind socket fail\n");
return -1;
}
ret = listen(socketfd, CONNECTIONS);
if (ret == -1) {
fprintf(stderr, "listen socket error\n");
return -1;
}
return socketfd;
}
int do_epoll(int listenfd)
{
const int timeout = 5000; /*ms*/
int epfd = epoll_create(CONNECTIONS+1);
struct epoll_event events[CONNECTIONS+1];
events[0].data.fd = listenfd;
events[0].events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &(events[0]));
while (1) {
int nready = epoll_wait(epfd, events, CONNECTIONS+1, timeout);
if (nready < 0) {
fprintf(stderr, "epoll error\n");
exit(-1);
} else if (nready == 0) {
fprintf(stdout, "epoll timeout\n");
} else {
for (int i = 0; i < nready; ++i) {
if (events[i].data.fd == listenfd) {
sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(sockaddr_in);
int conn_fd = accept(events[i].data.fd, (sockaddr *)&client_addr, &client_addr_len);
if (conn_fd < 0) {
fprintf(stderr, "error connection\n");
} else {
fprintf(stdout, "connect from %s:%d\n",
inet_ntoa(client_addr.sin_addr),
ntohs(client_addr.sin_port));
struct epoll_event ev;
ev.data.fd = conn_fd;
ev.events = EPOLLIN;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {
fprintf(stderr, "add connection event error\n");
close(conn_fd);
}
}
} else if (events[i].events & EPOLLIN) {
char buf[BUFFER_SIZE+1];
int n = recv(events[i].data.fd, buf, BUFFER_SIZE, 0);
if (n > 0) {
buf[n] = '\0';
fprintf(stdout, "connection %d: %s, n=%d\n", i, buf, n);
} else {
if (epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &(events[i])) == -1) {
fprintf(stderr, "del connection event error\n");
}
close(events[i].data.fd);
}
} else {
// remove unknow event
epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &(events[i]));
}
}
}
}
close(epfd);
return 0;
}
int main ()
{
int listenfd = init_and_listen(SERVER_PORT);
if (listenfd == -1) {
fprintf(stderr, "init fail\n");
exit(-1);
}
do_epoll(listenfd);
return 0;
}
边缘触发(ET) vs. 水平触发(LT)
转自极客时间专栏
- 水平触发:只要文件描述符可以非阻塞地执行I/O ,就会触发通知。也就是说,应用程序可以随时检查文件描述符的状态,然后再根据状态,进行 I/O 操作。
- 边缘触发:只有文件描述符的状态发生改变时(也就是I/O请求到达时)才发送一次通知。这时候,应用程序应该尽可能多地执行I/O,直到无法继续读写,才可以停止。如果I/O没执行完,或者因为某种原因没来得及处理,那么这次通知就丢失了。
写在最后
最后附测试client代码:
#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include <sstream>
#include <thread>
using namespace std;
const char* SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 8888;
const int CONNECTIONS = 16;
const int BUFFER_SIZE = 4096;
mutex sock_mtx;
int main()
{
int socketfds[CONNECTIONS];
for (int i = 0; i < CONNECTIONS; ++i) {
socketfds[i] = socket(AF_INET, SOCK_STREAM, 0);
assert(socketfds[i] != -1);
}
sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = inet_addr(SERVER_IP);
fprintf(stdout, "connecting\n");
int ret;
for (int i = 0; i < CONNECTIONS; ++i) {
ret = connect(socketfds[i], (sockaddr*)&server_addr, sizeof(sockaddr_in));
assert(ret != -1);
}
fprintf(stdout, "sending\n");
thread sender[CONNECTIONS];
for (int i = 0; i < CONNECTIONS; ++i) {
sender[i] = thread(
[i](int socketfd) {
char buf[BUFFER_SIZE];
sprintf(buf, "message from thread(%d)", i);
int ret;
{
lock_guard<mutex> lock(sock_mtx);
ret = send(socketfd, buf, sizeof(buf), 0);
}
if (ret != -1) {
fprintf(stdout, "send success for thread(%d), ret=%d\n", i, ret);
} else {
fprintf(stdout, "send fail for thread(%d)\n", i);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
},
socketfds[i]);
}
for (int i = 0; i < CONNECTIONS; ++i) {
sender[i].join();
}
for (int i = 0; i < CONNECTIONS; ++i) {
close(socketfds[i]);
}
return 0;
}
goudan-er SHARE · SYSTEM
system programming unix