I/O复用 epoll系统调用
epoll内核事件表
epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll 有很大差异。首先,epoll 使用一组函数来完成任务,而不是单个函数。其次,epoll 把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传人文件描述符集或事件集。但epoll需要使用一个额外的文件描述符, 来唯一标识内核中的这个事件表。这个文件描述符使用如下epoll_create 函数来创建:
#include <sys/epoll.h>
int epoll_create(int size);
size参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大。该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。
epoll_create()成功返回内核事件表的文件描述符,失败返回-1
下面的函数用来操作epoll的内核事件表:
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll_ctl()成功返回 0,失败返回-1。
epfd 参数指定要操作的内核事件表的文件描述符。
op 参数指定操作类型:
- EPOLL_CTL_ADD 往内核事件表中注册 fd 上的事件
- EPOLL_CTL_MOD 修改 fd 上的注册事件
- EPOLL_CTL_DEL 删除 fd 上的注册事件
fd 参数指定要操作的文件描述符。
event 参数指定事件,它是 epoll_event 结构指针类型,epoll_event 的定义如下:
struct epoll_event
{
_uint32_t events; // epoll 事件
epoll_data_t data; // 用户数据
};
其中,events 成员描述事件类型,epoll 支持的事件类型与 poll 基本相同,表示epoll 事件的宏是在 poll 对应的宏前加上E
,比如 epoll 的数据可读事件是EPOLLIN。但是 epoll 有两个额外的事件类型:EPOLLET 和 EPOLLONESHOT。data 成员用于存储用户数据,是一个联合体,其定义如下:
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;
其中 fd 成员使用的最多,它指定事件所从属的目标文件描述符。
epoll_wait函数
epoll系列系统调用的主要接口是epoll_wait 函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:
#include <sys/epoll.h>
int epoll_wait( int epfd, struct epo11_event* events, int maxevents, int timeout );
该函数成功时返回就绪的文件描述符的个数,失败时返回-1,超时返回 0。
epfd 参数指定要操作的内核事件表的文件描述符。
events 参数是一个用户数组,这个数组仅仅在epoll_wait 返回时保存内核检测到的所有就绪事件,而不像 select 和 poll 的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件。这就极大地提高了应用程序索引就绪文件
描述符的效率。
maxevents 参数指定用户数组的大小,即指定最多监听多少个事件,它必须大于0。
timeout 参数指定超时时间,单位为毫秒,如果 timeout 为 0,则 epoll_wait 会立即
返回,如果 timeout 为-1,则 epoll_wait 会一直阻塞,直到有事件就绪。
LT和ET模式
epoll 对文件描述符有两种操作模式:LT(Level Trigger,电平触发)模式和 ET(Edge
Trigger,边沿触发)模式。LT 模式是默认的工作模式。当往 epoll 内核事件表中注册一个文件描述符上的 EPOLLET 事件时,epoll 将以高效的 ET 模式来操作该文件描述符。
对于 LT 模式操作的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用 epoll_wait 时,还会再次向应用程序通告此事件,直到该事件被处理。
对于 ET 模式操作的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的 epoll_wait 调用将不再向应用程序通知这一事件。所以 ET 模式在很大程度上降低了同一个 epoll 事件被重复触发的次数,因此效率比 LT 模式高。
即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程(或进程)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN 再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们期望的。我们期望的是一个socket连接在任一时刻都只被一个线程处理。这一点可以使用epoll的EPOLLONESHOT事件实现。
对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次, 除非我们使用epoll_ctl 函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket。
每个使用ET模式的文件描述符都应该是非阻塞的。如果文件描述符是阻塞的,那么读或写操作将会因为没有后续的事件而一直处于阻塞状态(饥渴状态)。
epoll内核实现
执行epoll_create会在内核的高速cache区中建立一颗红黑树以及就绪链表(该链表存储已经就绪的文件描述符)。接着用户执行的epoll_ctl函数添加文件描述符会在红黑树上增加相应的结点。
epoll采用回调机制。在执行epoll_ctl的add操作时,不仅将文件描述符放到红黑树上,而且也注册了回调函数,内核在检测到某文件描述符可读/可写时会调用回调函数,该回调函数将文件描述符放在就绪链表中。
epoll_wait只用观察就绪链表中有无数据即可,最后将链表的数据返回给数组并返回就绪的数量。内核将就绪的文件描述符放在传入的数组中,所以只用遍历依次处理即可。这里返回的文件描述符是通过mmap让内核和用户空间共享同一块内存实现传递的,减少了不必要的拷贝。
代码示例
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#define MAX_FD 128
#define DATALEN 1024
#define EPOLLSIZE 5
#define LT 0
#define ET 1
// 初始化服务器端的 sockfd 套接字
int InitSocket()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd == -1) return -1;
struct sockaddr_in saddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(6000);
saddr.sin_addr.s_addr = inet_addr("127.0.0.1");
int res = bind(sockfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(res == -1) return -1;
res = listen(sockfd, 5);
if(res == -1) return -1;
return sockfd;
}
// 设置文件为非阻塞模式
void SetNoWait(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
}
//关闭客户端连接
void CloseClient(int epfd, int fd)
{
close(fd);
printf("A Client disconnected\n");
if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL) == -1)
{
printf("epoll_ctl del error\n");
}
}
//获取一个新的客户端连接,如果 flag 为 ET,则以 ET 模式处理此客户端
void GetClientLink(int sockfd, int epfd, int flag)
{
struct sockaddr_in caddr;
socklen_t len = sizeof(caddr);
int c = accept(sockfd, (struct sockaddr*)&caddr, &len);
if (c < 0)
{
printf("Client Link error\n");
return;
}
struct epoll_event ev;
ev.data.fd = c;
if (flag)
{
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
SetNoWait(c);
}
else
{
ev.events = EPOLLIN | EPOLLRDHUP;
}
if (epoll_ctl(epfd, EPOLL_CTL_ADD, c, &ev) == -1)
{
printf("epoll_ctl add error\n");
}
}
//LT 模式的客户端数据处理方式
void LTDealClientData(int epfd, int fd)
{
char buff[DATALEN] = { 0 };
int n = recv(fd, buff, DATALEN - 1, 0);
if (n <= 0)
{
CloseClient(epfd, fd);
return;
}
printf("%d: %s\n", fd, buff);
send(fd, "OK", 2, 0);
}
//ET 模式的客户端数据处理方式
void ETDealClientData(int epfd, int fd)
{
while (1)
{
char buff[DATALEN] = { 0 };
int n = recv(fd, buff, DATALEN - 1, 0);
if (n == -1)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
printf("read later\n");
break;
}
else
{
CloseClient(epfd, fd);
break;
}
}
else if (n == 0)
{
CloseClient(epfd, fd);
break;
}
else
{
printf("%d: %s\n", fd, buff);
send(fd, "OK", 2, 0);
}
}
}
// 处理就绪的文件描述符上的数据
void DealReadyEvent(struct epoll_event *events,int n,int sockfd,int epfd)
{
int i = 0;
for (; i < n; ++i)
{
int fd = events[i].data.fd;
if (fd == sockfd)
{
GetClientLink(sockfd, epfd, LT); // 设置为 LT 模式
// GetClientLink(sockfd, epfd, ET); // 设置为 ET 模式
}
else if (events[i].events & EPOLLRDHUP)
{
CloseClient(epfd, fd);
}
else if (events[i].events & EPOLLIN)
{
LTDealClientData(epfd, fd); // 以 LT 模式处理客户端数据
//ETDealClientData(epfd, fd); //以 ET 模式处理客户端数据
}
else
{
printf("error\n");
}
}
}
int main()
{
int sockfd = InitSocket();
assert(sockfd != -1);
int epfd = epoll_create(EPOLLSIZE);
assert(epfd != -1);
struct epoll_event ev;
ev.data.fd = sockfd;
ev.events = EPOLLIN;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev) == -1)
{
printf("epoll_ctl add error\n");
exit(0);
}
while (1)
{
struct epoll_event events[MAX_FD];
int n = epoll_wait(epfd, events, MAX_FD, 2000);
if (n < 0)
{
printf("epoll_wait error\n");
continue;
}
else if (n == 0)
{
printf("timeout\n");
continue;
}
else
{
DealReadyEvent(events, n, sockfd, epfd);
}
}
}
LT模式测试结果
ET模式测试结果