I/O多路转接

I/O多路转接,也叫I/O多路复用,I/O Multiplexing,是操作系统提供的一种高级I/O功能,只有当描述符准备好进行I/O时,进程或线程才去执行I/O操作,避免阻塞或者做无用功。select/poll/epoll 时Unix系统为我们提供系统函数接口。

IO模型

非阻塞I/O

通常在谈起I/O时(文件I/O,网络I/O),如没有特殊说明,通常指阻塞式I/O(blcking I/O),即当调用I/O时,若I/O不可用,当前进程或者线程会被挂起,直到I/O可用。如下图(图片来源)所示,

blocking_io.gif

阻塞式I/O会造成系统资源浪费。比如,一台服务器需要处理1000个连接,则需要1000个进程或者线程处理连接,如果1000个连接只有少部分是连接忙碌的,则1000个线程大部分是被阻塞挂起的。假设CPU是4核,为了要跑1000个线程,则每个线程的时间槽非常短,这样就会导致线程切换非常频繁。频繁切换线程是有问题的:
1. 线程是有内存开销的,1个线程可能需要512K(或2M)存放栈,那么1000个线程就要512M(或2G)内存
2. 线程的切换,或者说上下文切换是有CPU开销的,当大量时间花在上下文切换的时候,分配给真正的操作的CPU就要少很多

非阻塞I/O(nonblocking I/O),如下图(图片来源)所示,

nonblocking_io.gif

进程发起一个read操作时,如果数据还没有准备好,系统不再阻塞进程,而是返回一个error信息,此时进程知道所需数据还没有准备好,于是它可以等待一定时间再次发起read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的系统调用,那么它马上就将数据拷贝到了用户内存,然后返回。所以,用户进程其实是需要不断地主动询问kernel数据是否准备好,也称作轮询,polling。非阻塞I/O的一个问题就是进程需要不断询问kernel数据是否准备好,而大多数时间实际上是数据还没有准备好,所以执行系统调用浪费了浪费了CPU时间,并且每次查询后等待多长时间再进行下一次查询也很难确定。

异步I/O

异步I/O是一种高级I/O技术。当进程发起一个异步read操作时(如aio_read),用户进程可以立刻去做其他事情,kernel不会对用户进程产生任何阻塞,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。注意的是操作的内存缓冲区需要保持稳定并且始终合法。下图(图片来源)是异步I/O的示意图。

async_io.gif

为了简化程序设计,常见异步I/O一般采用多线程或者多进程方式,使用同步模型编写程序,但是异步运行这些线程或进程。但是,如果需要大量I/O操作(比如并发网络连接很多),这种异步I/O可能会需要频繁新建、调度、销毁这些线程进程,系统开销较大。

I/O多路转接

一种比较好的技术就是I/O多路转接,为了使用这个技术,首先构造一个文件描述符集合,然后调用 select, poll, epoll 函数,直到这些描述符中的一个准备好进行I/O时(或者超时)才返回。如下图(图片来源)。

io_multiplexing.gif

注意的是,当进程调用 select时,进程是被阻塞的。kernel会“监视”应用程序感兴趣的文件描述符,当任何一个数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

select

select函数定义如下:

int select(int maxfdp1, fd_set *restrict readfds, 
			fdset *restrict writefds, fdset *restrict execptfds, 
			struct timeval *restrict tvptr);
  • maxfdp1: 最大描述符编号值+1。在#include <sys/select.h>定义中,指定的最大描述符编号值为1024
  • readfds, writefds, exceptfds: 所关心的可读、可写或处于异常条件的描述符集合
  • tvptr:
    • tvptr == NULL: 永远等待,直到当所指定的描述符中的一个已经准备好或收到中断信号。
    • tvptr->tv_sec == 0 && tvptr->tv_usec == 0: 不等待,测试所有指定的描述符并立即返回。等同于轮询。
    • tvptr->tv_sec > 0 || tvptr->tv_usec > 0: 等待指定的秒数或者微妙数。当指定的描述符之一准备好或者超时,则返回,-1表示中断,0表示超时,>0表示有描述符已经准备好。

此外,#include <sys/select.h>还定义了一些函数用来操作描述符集合:

int FD_ISSET(int fd, fd_set *fdset);	// 测试fd是否在集合之中
void FD_CLR(int fd fd_set *fdset);	// 将fd从集合中清除
void FD_SET(int fd, fd_set *fdset);	// 将fd加入集合之中
void FD_ZERO(fd_set *fdset);	// 清空集合

使用select完成socket非阻塞IO

#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>

const int BUFFER_SIZE = 4096;
const int SERVER_PORT = 8888;
const int CONNECTIONS = 5;

int init_and_listen(const int port)
{
    int socketfd = socket(AF_INET, SOCK_STREAM, 0);
    if (socketfd == -1) {
        fprintf(stderr, "get socket error\n");
        return -1;
    }

    sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int ret = bind(socketfd, (sockaddr *)&server_addr, sizeof(sockaddr_in));
    if (ret == -1) {
        fprintf(stderr, "bind socket fail\n");
        return -1;
    }

    ret = listen(socketfd, CONNECTIONS);
    if (ret == -1) {
        fprintf(stderr, "listen socket error\n");
        return -1;
    }

    return socketfd;
}

int do_select(int listenfd)
{
    const int timeout = 5; /*s*/
    int server_fds[CONNECTIONS + 1];
    for (int i = 0; i <= CONNECTIONS; ++i) {
        server_fds[i] = -1;
    }

    fd_set rfds;
    timeval tv;
    int max_fd;

    while (1) {
        FD_ZERO(&rfds);
        server_fds[0] = listenfd;
        max_fd = -1;
        for (int i = 0; i <= CONNECTIONS; ++i) {
            if (server_fds[i] > -1) {
                FD_SET(server_fds[i], &rfds);
                max_fd = std::max(max_fd, server_fds[i]);
            }
        }

        tv.tv_sec = timeout;
        tv.tv_usec = 0;
        int nready = select(max_fd + 1, &rfds, NULL, NULL, &tv);

        if (nready == -1) {
            fprintf(stderr, "select error\n");
            exit(-1);
        } else if (nready == 0) {
            fprintf(stdout, "select timeout\n");
        } else {
            if (FD_ISSET(server_fds[0], &rfds)) {
                sockaddr_in client_addr;
                socklen_t client_addr_len = sizeof(sockaddr_in);
                int conn_fd = accept(server_fds[0], (sockaddr *)&client_addr,
                                        &client_addr_len);
                if (conn_fd < 0) {
                    fprintf(stderr, "error connection\n");
                } else {
                    fprintf(stdout, "connect from %s:%d\n",
                            inet_ntoa(client_addr.sin_addr),
                            ntohs(client_addr.sin_port));
                    int next;
                    for (next = 1; next <= CONNECTIONS; ++next) {
                        if (server_fds[next] == -1) break;
                    }
                    if (next > CONNECTIONS) {
                        fprintf(stderr, "too many connections\n");
                        close(conn_fd);
                    } else {
                        server_fds[next] = conn_fd;
                    }
                }
                --nready;
            }

            if (nready > 0) {
                for (int i = 1; i <= CONNECTIONS; ++i) {
                    if (server_fds[i] != -1 && FD_ISSET(server_fds[i], &rfds)) {
                        char buf[BUFFER_SIZE+1];
                        int n = recv(server_fds[i], buf, BUFFER_SIZE, 0);
                        if (n > 0) {
                            buf[n] = '\0';
                            fprintf(stdout, "connection %d: %s, n=%d\n", i, buf, n);
                        } else {
                            close(server_fds[i]);
                            server_fds[i] = -1;
                        }
                    }
                }
            }
        }
    }

    return 0;
}

int main()
{
    int listenfd = init_and_listen(SERVER_PORT);
    if (listenfd == -1) {
        fprintf(stderr, "init fail\n");
        exit(-1);
    }
    do_select(listenfd);
    return 0;
}

POSIX.1也定义了一个select函数的变体,pselect,定义如下:

#include <sys/select.h>

int pselect(int maxfdp1, fd_set *restrict readfds,
			fs_set *restrict writefds, fd_set *restrict execptfds,
			const struct timespec *restrict tsptr,
			const sigset_t *restrict sigmask);

从函数参数上来看,pselectselect功能基本相同,但是pselect提供了以下几点功能:

  • pselect超时值使用timespec结构,该结构以秒和纳秒表示超时值,可以提供更精准的超时时间。
  • pselect的超时值被声明为const,这保证了调用pselect不会修改此值。
  • pselect可选用信号屏蔽字。若sigmaskNULL,那么在信号方面,selectpselect的运行状况相同。否则,sigmask指向一信号屏蔽字,在调用函数时,以原子操作方式安装信号屏蔽字;在返回时,恢复之前的信号屏蔽字。

poll

select存在以下4个问题:

  1. fd数目限制
  2. 3个集合在函数返回时会被内核修改,所以每次调用select之前都需要重新设置
  3. 内核需要遍历fdset中的所有fd,查看哪些fd事件实际发生
  4. 用户需要检查所有注册的fd,查看哪些fd事件发生,在大量连接下,可能实际发生的读写事件fd较少

poll函数功能类似于select,同样提供多路转接技术。pollselect作出了改进,

int poll(struct pollfd *fds, int nfds, int timeout)

struct pollfd
{
	int     fd;
	short   events;     // 等待的事件
	short   revents;    // 实际发生的事件
};

可以看到poll传入一个pollfd的动态数组,所以不再有fd大小的限制;pollfd定义了等待的事件events和实际发生的事件revents,内核只修改revents,也就解决了select每次需要重新设置的缺陷。但是3和4的问题仍然存在。

使用poll完成socket非阻塞IO

#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>

const int BUFFER_SIZE = 4096;
const int SERVER_PORT = 8888;
const int CONNECTIONS = 5;

int init_and_listen(const int port)
{
    int socketfd = socket(AF_INET, SOCK_STREAM, 0);
    if (socketfd == -1) {
        fprintf(stderr, "get socket error\n");
        return -1;
    }

    sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int ret = bind(socketfd, (sockaddr *)&server_addr, sizeof(sockaddr_in));
    if (ret == -1) {
        fprintf(stderr, "bind socket fail\n");
        return -1;
    }

    ret = listen(socketfd, CONNECTIONS);
    if (ret == -1) {
        fprintf(stderr, "listen socket error\n");
        return -1;
    }

    return socketfd;
}

int do_poll(int listenfd)
{
    const int timeout = 5000; /*ms*/
    struct pollfd server_fds[CONNECTIONS + 1];
    for (int i = 0; i <= CONNECTIONS; ++i) {
        server_fds[i].fd = -1;
    }
    server_fds[0].fd = listenfd;
    server_fds[0].events = POLLIN;

    int max_fds_num = 1;
    while (1) {
        int nready = poll(server_fds, max_fds_num, timeout);
        if (nready < 0) {
            fprintf(stderr, "poll error\n");
            exit(-1);
        } else if (nready == 0) {
            fprintf(stdout, "poll timeout\n");
        } else {
            if (server_fds[0].revents & POLLIN) {
                sockaddr_in client_addr;
                socklen_t client_addr_len = sizeof(sockaddr_in);
                int conn_fd = accept(server_fds[0].fd, (sockaddr *)&client_addr,
                                        &client_addr_len);
                if (conn_fd < 0) {
                    fprintf(stderr, "error connection\n");
                } else {
                    fprintf(stdout, "connect from %s:%d\n",
                            inet_ntoa(client_addr.sin_addr),
                            ntohs(client_addr.sin_port));
                    int next;
                    for (next = 1; next <= CONNECTIONS; ++next) {
                        if (server_fds[next].fd == -1) {
                            break;
                        }
                    }
                    if (next > CONNECTIONS) {
                        fprintf(stderr, "too many connections\n");
                        close(conn_fd);
                    } else {
                        server_fds[next].fd = conn_fd;
                        server_fds[next].events = POLLIN;
                        max_fds_num = std::max(max_fds_num, next+1);
                    }
                    --nready;
                }
            }

            if (nready > 0) {
                for (int i = 1; i < max_fds_num; ++i) {
                    if (server_fds[i].fd != -1 && server_fds[i].revents & POLLIN) {
                        char buf[BUFFER_SIZE+1];
                        int n = recv(server_fds[i].fd, buf, BUFFER_SIZE, 0);
                        if (n > 0) {
                            buf[n] = '\0';
                            fprintf(stdout, "connection %d: %s, n=%d\n", i, buf, n);
                        } else {
                            close(server_fds[i].fd);
                            server_fds[i].fd = -1;
                        }
                    }
                }
            }
        }
    }

    return 0;
}

int main ()
{
    int listenfd = init_and_listen(SERVER_PORT);
    if (listenfd == -1) {
        fprintf(stderr, "init fail\n");
        exit(-1);
    }
    do_poll(listenfd);
    return 0;
}

epoll

epoll则解决了selectpoll的缺陷。先来看epoll相关接口。

int epoll_create (int __size);

创建一个epoll的句柄,size用来告诉内核需要监听的数目。创建好epoll句柄后,它会占用一个fd,在linux下查看/proc/process_id/fd/,是能够看到这个fd的。所以,使用完epoll后,必须调用close()关闭。

int epoll_ctl (int __epfd, int __op, int __fd, struct epoll_event *__event);
  • 第一个参数epfd,epoll句柄
  • 第二个参数op表示对fd的操作,包括
    /* Valid opcodes ( "op" parameter ) to issue to epoll_ctl().  */
    #define EPOLL_CTL_ADD 1 /* Add a file descriptor to the interface.  */
    #define EPOLL_CTL_DEL 2 /* Remove a file descriptor from the interface.  */
    #define EPOLL_CTL_MOD 3 /* Change file descriptor epoll_event structure.  */
    
  • 第三个参数需要监听的fd
  • 第四个参数需要监听的事件,epoll event定义如下
typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

struct epoll_event
{
  uint32_t events;      /* Epoll events */
  epoll_data_t data;    /* User data variable */
} __EPOLL_PACKED;

// events可以是以下宏的组合
enum EPOLL_EVENTS
  {
    EPOLLIN = 0x001,
#define EPOLLIN EPOLLIN             // fd可读
    EPOLLPRI = 0x002,
#define EPOLLPRI EPOLLPRI           // fd有紧急数据可读
    EPOLLOUT = 0x004,
#define EPOLLOUT EPOLLOUT           // fd可写
    EPOLLRDNORM = 0x040,
#define EPOLLRDNORM EPOLLRDNORM     // 
    EPOLLRDBAND = 0x080,
#define EPOLLRDBAND EPOLLRDBAND     //
    EPOLLWRNORM = 0x100,
#define EPOLLWRNORM EPOLLWRNORM     //
    EPOLLWRBAND = 0x200,
#define EPOLLWRBAND EPOLLWRBAND     //
    EPOLLMSG = 0x400,
#define EPOLLMSG EPOLLMSG           //
    EPOLLERR = 0x008,
#define EPOLLERR EPOLLERR           // fd发生错误
    EPOLLHUP = 0x010,
#define EPOLLHUP EPOLLHUP           // fd被中断
    EPOLLRDHUP = 0x2000,
#define EPOLLRDHUP EPOLLRDHUP       //
    EPOLLWAKEUP = 1u << 29,
#define EPOLLWAKEUP EPOLLWAKEUP     //
    EPOLLONESHOT = 1u << 30,
#define EPOLLONESHOT EPOLLONESHOT   // 只监听一次事件,当监听完这次事件之后,就会把这个fd从epoll的队列中删除,如果还需要继续监听这个socket的话,需要再次把这个fd加入到EPOLL队列里
    EPOLLET = 1u << 31
#define EPOLLET EPOLLET             // 设置为边缘触发
  };
int epoll_wait (int __epfd, struct epoll_event *__events, int __maxevents, int __timeout);

等待事件的发生,并把需要处理的事件通过events返回,函数返回值为需要处理事件的数量。此处可以看出,epoll解决了selectpoll需要用户查询所有感兴趣fd的缺陷,尽管有大量连接,但epoll只处理active的连接。

epoll在初始化的时候(OS启动)会开辟自己的内核高速cache,用于安置需要监听的fd,并以红黑树进行管理,支持快速查找、插入、删除。

使用epoll完成socket非阻塞IO(水平触发)

#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>

const int BUFFER_SIZE = 4096;
const int SERVER_PORT = 8888;
const int CONNECTIONS = 5;

int init_and_listen(int port)
{
    int socketfd = socket(AF_INET, SOCK_STREAM, 0);
    if (socketfd == -1) {
        fprintf(stderr, "get socket error\n");
        return -1;
    }

    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int ret = bind(socketfd, (sockaddr *)&server_addr, sizeof(sockaddr_in));
    if (ret == -1) {
        fprintf(stderr, "bind socket fail\n");
        return -1;
    }

    ret = listen(socketfd, CONNECTIONS);
    if (ret == -1) {
        fprintf(stderr, "listen socket error\n");
        return -1;
    }

    return socketfd;
}

int do_epoll(int listenfd)
{
    const int timeout = 5000; /*ms*/
    int epfd = epoll_create(CONNECTIONS+1);
    struct epoll_event events[CONNECTIONS+1];
    events[0].data.fd = listenfd;
    events[0].events = EPOLLIN;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &(events[0]));

    while (1) {
        int nready = epoll_wait(epfd, events, CONNECTIONS+1, timeout);
        if (nready < 0) {
            fprintf(stderr, "epoll error\n");
            exit(-1);
        } else if (nready == 0) {
            fprintf(stdout, "epoll timeout\n");
        } else {
            for (int i = 0; i < nready; ++i) {
                if (events[i].data.fd == listenfd) {
                    sockaddr_in client_addr;
                    socklen_t client_addr_len = sizeof(sockaddr_in);
                    int conn_fd = accept(events[i].data.fd, (sockaddr *)&client_addr, &client_addr_len);
                    if (conn_fd < 0) {
                        fprintf(stderr, "error connection\n");
                    } else {
                        fprintf(stdout, "connect from %s:%d\n",
                                inet_ntoa(client_addr.sin_addr),
                                ntohs(client_addr.sin_port));

                        struct epoll_event ev;
                        ev.data.fd = conn_fd;
                        ev.events = EPOLLIN;
                        if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {
                            fprintf(stderr, "add connection event error\n");
                            close(conn_fd);
                        }
                    }
                } else if (events[i].events & EPOLLIN) {
                    char buf[BUFFER_SIZE+1];
                    int n = recv(events[i].data.fd, buf, BUFFER_SIZE, 0);
                    if (n > 0) {
                        buf[n] = '\0';
                        fprintf(stdout, "connection %d: %s, n=%d\n", i, buf, n);
                    } else {
                        if (epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &(events[i])) == -1) {
                            fprintf(stderr, "del connection event error\n");
                        }
                        close(events[i].data.fd);
                    }
                } else {
                    // remove unknow event
                    epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &(events[i]));
                }
            }
        }
    }

    close(epfd);

    return 0;
}

int main ()
{
    int listenfd = init_and_listen(SERVER_PORT);
    if (listenfd == -1) {	
        fprintf(stderr, "init fail\n");
        exit(-1);
    }
    do_epoll(listenfd);
    return 0;
}

边缘触发(ET) vs. 水平触发(LT)

转自极客时间专栏

  • 水平触发:只要文件描述符可以非阻塞地执行I/O ,就会触发通知。也就是说,应用程序可以随时检查文件描述符的状态,然后再根据状态,进行 I/O 操作。
  • 边缘触发:只有文件描述符的状态发生改变时(也就是I/O请求到达时)才发送一次通知。这时候,应用程序应该尽可能多地执行I/O,直到无法继续读写,才可以停止。如果I/O没执行完,或者因为某种原因没来得及处理,那么这次通知就丢失了。

写在最后

最后附测试client代码:

#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include <iostream>
#include <sstream>
#include <thread>

using namespace std;

const char* SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 8888;
const int CONNECTIONS = 16;
const int BUFFER_SIZE = 4096;

mutex sock_mtx;

int main()
{
    int socketfds[CONNECTIONS];
    for (int i = 0; i < CONNECTIONS; ++i) {
        socketfds[i] = socket(AF_INET, SOCK_STREAM, 0);
        assert(socketfds[i] != -1);
    }

    sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(SERVER_PORT);
    server_addr.sin_addr.s_addr = inet_addr(SERVER_IP);

    fprintf(stdout, "connecting\n");

    int ret;
    for (int i = 0; i < CONNECTIONS; ++i) {
        ret = connect(socketfds[i], (sockaddr*)&server_addr, sizeof(sockaddr_in));
        assert(ret != -1);
    }

    fprintf(stdout, "sending\n");

    thread sender[CONNECTIONS];
    for (int i = 0; i < CONNECTIONS; ++i) {
        sender[i] = thread(
            [i](int socketfd) {
                char buf[BUFFER_SIZE];
                sprintf(buf, "message from thread(%d)", i);
                int ret;
                {
                    lock_guard<mutex> lock(sock_mtx);
                    ret = send(socketfd, buf, sizeof(buf), 0);
                }
                if (ret != -1) {
                    fprintf(stdout, "send success for thread(%d), ret=%d\n", i, ret);
                } else {
                    fprintf(stdout, "send fail for thread(%d)\n", i);
                }
                std::this_thread::sleep_for(std::chrono::seconds(1));
            },
            socketfds[i]);
    }

    for (int i = 0; i < CONNECTIONS; ++i) {
        sender[i].join();
    }

    for (int i = 0; i < CONNECTIONS; ++i) {
        close(socketfds[i]);
    }

    return 0;
}

SHARE · SYSTEM
system programming unix

对话与讨论