0
点赞
收藏
分享

微信扫一扫

I/O复用 epoll系统调用

水墨_青花 2022-02-11 阅读 97

I/O复用 epoll系统调用

epoll内核事件表

epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll 有很大差异。首先,epoll 使用一组函数来完成任务,而不是单个函数。其次,epoll 把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传人文件描述符集或事件集。但epoll需要使用一个额外的文件描述符, 来唯一标识内核中的这个事件表。这个文件描述符使用如下epoll_create 函数来创建:

#include <sys/epoll.h>
int epoll_create(int size);

size参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大。该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。
epoll_create()成功返回内核事件表的文件描述符,失败返回-1

下面的函数用来操作epoll的内核事件表:

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctl()成功返回 0,失败返回-1。
epfd 参数指定要操作的内核事件表的文件描述符。
op 参数指定操作类型:

  • EPOLL_CTL_ADD 往内核事件表中注册 fd 上的事件
  • EPOLL_CTL_MOD 修改 fd 上的注册事件
  • EPOLL_CTL_DEL 删除 fd 上的注册事件
    fd 参数指定要操作的文件描述符。
    event 参数指定事件,它是 epoll_event 结构指针类型,epoll_event 的定义如下:
struct epoll_event
{
	_uint32_t events; // epoll 事件
	epoll_data_t data; // 用户数据
};

其中,events 成员描述事件类型,epoll 支持的事件类型与 poll 基本相同,表示epoll 事件的宏是在 poll 对应的宏前加上E,比如 epoll 的数据可读事件是EPOLLIN。但是 epoll 有两个额外的事件类型:EPOLLET 和 EPOLLONESHOT。data 成员用于存储用户数据,是一个联合体,其定义如下:

typedef union epoll_data
{
	void *ptr;
	int fd;
	uint32_t u32;
	uint64_t u64;
}epoll_data_t;

其中 fd 成员使用的最多,它指定事件所从属的目标文件描述符。

epoll_wait函数

epoll系列系统调用的主要接口是epoll_wait 函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:

#include <sys/epoll.h>
int epoll_wait( int epfd, struct epo11_event* events, int maxevents, int timeout );

该函数成功时返回就绪的文件描述符的个数,失败时返回-1,超时返回 0。
epfd 参数指定要操作的内核事件表的文件描述符。
events 参数是一个用户数组,这个数组仅仅在epoll_wait 返回时保存内核检测到的所有就绪事件,而不像 select 和 poll 的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件。这就极大地提高了应用程序索引就绪文件
描述符的效率。
maxevents 参数指定用户数组的大小,即指定最多监听多少个事件,它必须大于0。
timeout 参数指定超时时间,单位为毫秒,如果 timeout 为 0,则 epoll_wait 会立即
返回,如果 timeout 为-1,则 epoll_wait 会一直阻塞,直到有事件就绪。

LT和ET模式

epoll 对文件描述符有两种操作模式:LT(Level Trigger,电平触发)模式和 ET(Edge
Trigger,边沿触发)模式。LT 模式是默认的工作模式。当往 epoll 内核事件表中注册一个文件描述符上的 EPOLLET 事件时,epoll 将以高效的 ET 模式来操作该文件描述符。
对于 LT 模式操作的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用 epoll_wait 时,还会再次向应用程序通告此事件,直到该事件被处理。
对于 ET 模式操作的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的 epoll_wait 调用将不再向应用程序通知这一事件。所以 ET 模式在很大程度上降低了同一个 epoll 事件被重复触发的次数,因此效率比 LT 模式高。

即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程(或进程)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN 再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们期望的。我们期望的是一个socket连接在任一时刻都只被一个线程处理。这一点可以使用epoll的EPOLLONESHOT事件实现。
对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次, 除非我们使用epoll_ctl 函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket。

每个使用ET模式的文件描述符都应该是非阻塞的。如果文件描述符是阻塞的,那么读或写操作将会因为没有后续的事件而一直处于阻塞状态(饥渴状态)。

epoll内核实现

执行epoll_create会在内核的高速cache区中建立一颗红黑树以及就绪链表(该链表存储已经就绪的文件描述符)。接着用户执行的epoll_ctl函数添加文件描述符会在红黑树上增加相应的结点。
epoll采用回调机制。在执行epoll_ctl的add操作时,不仅将文件描述符放到红黑树上,而且也注册了回调函数,内核在检测到某文件描述符可读/可写时会调用回调函数,该回调函数将文件描述符放在就绪链表中。
epoll_wait只用观察就绪链表中有无数据即可,最后将链表的数据返回给数组并返回就绪的数量。内核将就绪的文件描述符放在传入的数组中,所以只用遍历依次处理即可。这里返回的文件描述符是通过mmap让内核和用户空间共享同一块内存实现传递的,减少了不必要的拷贝。

代码示例

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>

#define MAX_FD 128
#define DATALEN 1024
#define EPOLLSIZE 5
#define LT 0
#define ET 1

// 初始化服务器端的 sockfd 套接字
int InitSocket()
{
	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
	if(sockfd == -1) return -1;

	struct sockaddr_in saddr;
	memset(&saddr, 0, sizeof(saddr));
	saddr.sin_family = AF_INET;
	saddr.sin_port = htons(6000);
	saddr.sin_addr.s_addr = inet_addr("127.0.0.1");

	int res = bind(sockfd, (struct sockaddr*)&saddr, sizeof(saddr));
	if(res == -1) return -1;

	res = listen(sockfd, 5);
	if(res == -1) return -1;

	return sockfd;
}
// 设置文件为非阻塞模式
void SetNoWait(int fd)
{
	int old_option = fcntl(fd, F_GETFL);
	int new_option = old_option | O_NONBLOCK;
	fcntl(fd, F_SETFL, new_option);
}

//关闭客户端连接
void CloseClient(int epfd, int fd)
{
	close(fd);
	printf("A Client disconnected\n");
	if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL) == -1)
	{
		printf("epoll_ctl del error\n");
	}
}

//获取一个新的客户端连接,如果 flag 为 ET,则以 ET 模式处理此客户端
void GetClientLink(int sockfd, int epfd, int flag)
{
	struct sockaddr_in caddr;
	socklen_t len = sizeof(caddr);
	int c = accept(sockfd, (struct sockaddr*)&caddr, &len);
	if (c < 0)
	{
		printf("Client Link error\n");
		return;
	}

	struct epoll_event ev;
	ev.data.fd = c;
	if (flag)
	{
		ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
		SetNoWait(c);
	}
	else
	{
		ev.events = EPOLLIN | EPOLLRDHUP;
	}

	if (epoll_ctl(epfd, EPOLL_CTL_ADD, c, &ev) == -1)
	{
		printf("epoll_ctl add error\n");
	}
}

//LT 模式的客户端数据处理方式
void LTDealClientData(int epfd, int fd)
{
	char buff[DATALEN] = { 0 };
	int n = recv(fd, buff, DATALEN - 1, 0);
	if (n <= 0)
	{
		CloseClient(epfd, fd);
	return;
	}

	printf("%d: %s\n", fd, buff);
	send(fd, "OK", 2, 0);
}
//ET 模式的客户端数据处理方式
void ETDealClientData(int epfd, int fd)
{
	while (1)
	{
		char buff[DATALEN] = { 0 };
		int n = recv(fd, buff, DATALEN - 1, 0);
		if (n == -1)
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
			{
				printf("read later\n");
				break;
			}
			else
			{
				CloseClient(epfd, fd);
				break;
			}
		}
		else if (n == 0)
		{
			CloseClient(epfd, fd);
			break;
		}
		else
		{
			printf("%d: %s\n", fd, buff);
 			send(fd, "OK", 2, 0);
		}
	}
}

// 处理就绪的文件描述符上的数据
void DealReadyEvent(struct epoll_event *events,int n,int sockfd,int epfd)
{
	int i = 0;
	for (; i < n; ++i)
	{
		int fd = events[i].data.fd;
		if (fd == sockfd)
		{
			GetClientLink(sockfd, epfd, LT); // 设置为 LT 模式
			// GetClientLink(sockfd, epfd, ET); // 设置为 ET 模式
		}
		else if (events[i].events & EPOLLRDHUP)
		{
			CloseClient(epfd, fd);
		}
		else if (events[i].events & EPOLLIN)
		{
			LTDealClientData(epfd, fd); // 以 LT 模式处理客户端数据
			//ETDealClientData(epfd, fd); //以 ET 模式处理客户端数据
		}
		else
		{
			printf("error\n");
		}
	}
}

int main()
{
	int sockfd = InitSocket();
	assert(sockfd != -1);

	int epfd = epoll_create(EPOLLSIZE);
	assert(epfd != -1);

	struct epoll_event ev;
	ev.data.fd = sockfd;
	ev.events = EPOLLIN;

	if (epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev) == -1)
	{
		printf("epoll_ctl add error\n");
		exit(0);
	}

	while (1)
	{
		struct epoll_event events[MAX_FD];
		int n = epoll_wait(epfd, events, MAX_FD, 2000);
		if (n < 0)
		{
			printf("epoll_wait error\n");
			continue;
		}
		else if (n == 0)
		{
			printf("timeout\n");
			continue;
		}
		else
		{
			DealReadyEvent(events, n, sockfd, epfd);
		}
	}
}

LT模式测试结果
在这里插入图片描述
ET模式测试结果
在这里插入图片描述

举报

相关推荐

0 条评论