0
点赞
收藏
分享

微信扫一扫

Linux下使用C语言实现高并发服务器

高并发服务器

使用多进程并发服务器时要考虑以下几点:

  1. 父进程最大文件描述个数(父进程中需要close关闭accept返回的新文件描述符)
  2. 系统内创建进程个数(与内存大小相关)
  3. 进程创建过多是否降低整体服务性能(进程调度)
  4. 每一个子进程的效率

在使用线程模型开发服务器时需考虑以下问题:

  1. 调整进程内最大文件描述符上限
  2. 线程如有共享数据,考虑线程同步
  3. 服务于客户端线程退出时,退出处理。(退出值,分离态)
  4. 系统负载,随着链接客户端增加,导致其它线程不能及时得到CPU

多路I/O转接服务器

多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。

主要使用的方法有三种

select

可以使用只一个命令监听一个新的连接(需要把监管使用的lfd给select函数处理), 之后进程可以使用accept获取cfd, select可以使用获取的cfd监听连接的客户端是否发过来数据

  1. select能监听的文件描述符个数受限于FD_SETSIZE,一般为1024,单纯改变进程打开的文件描述符个数并不能改变select监听文件个数
  2. 解决1024以下客户端时使用select是很合适的,但如果链接客户端过多,select采用的是轮询模型,会大大降低服务器响应效率,不应在select上投入更多精力。
#include <sys/select.h>
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
			fd_set *exceptfds, struct timeval *timeout);

readfds: 监控有读数据到达文件描述符集合,传入传出参数

writefds: 监控写数据到达文件描述符集合,传入传出参数

exceptfds:监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数

timeout: 定时阻塞监控时间,3种情况
1.NULL,永远等下去
2.设置timeval,等待固定时间
3.设置timeval里时间均为0,检查描述字后立即返回,轮询

struct timeval {
	long tv_sec; /* seconds */
	long tv_usec; /* microseconds */
};

返回值: 三个集合里面的事件发生的总个数

使用思路
  1. lfd = socket获取描述符, 用于处理连接
  2. bind 把这一个文件描述符和对应的端口连接
  3. listen设置监听的个数
  4. fd_set rset;设置一个位图
  5. FD_ZERO
  6. FD_SET
  7. int ret = select(lfd + 1, &rset, NULL, NULL, NULL);
  8. 检测实际的事件
if(ret >  0){
    //检测一下哪一个文件描述符被设置了
    if(FD_ISSET(lfd, &rset)){
        cfd = accept();
        FD_SET(cfd, &rset);
    } 
    //遍历其余的
}
实际实现
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <sys/select.h>

int main(void){
	int i, j, nready, ret;
	int maxfd = 0;
	int listenfd, connfd;

	char buf[BUFSIZ];

	struct sockaddr_in clie_addr, serv_addr;
	socklen_t clie_addr_len;

	listenfd = socket(AF_INET, SOCK_STREAM, 0);
	if(listenfd == -1){
		perror("socket error");
		exit(1);
	}
	int opt = 1;
	//设置端口复用
	ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
	if(ret == -1){
		perror("setsockopt error");
		exit(1);
	}
	//设置监听端口
	serv_addr.sin_family = AF_INET;
	serv_addr.sin_port = htons(6666);
	serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	ret = bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
	if(ret == -1){
		perror("bind error");
		exit(1);
	}

	ret = listen(listenfd, 128);
	if(ret == -1){
		perror("listen error");
		exit(1);
	}
	//两个文件描述符集合, 用于监听多个文件描述符
	fd_set rset, allset;
	maxfd = listenfd;

	FD_ZERO(&allset);
	FD_SET(listenfd, &allset);

	while(1){
		rset = allset;
		//监听多个文件描述符
		nready = select(maxfd+1, &rset, NULL, NULL, NULL);
		if(nready < 0){
			perror("select error");
			exit(1);
		}
        //检测一下有没有连接事件
		if(FD_ISSET(listenfd, &rset)){
			clie_addr_len = sizeof(clie_addr);
			connfd = accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len);
			if(connfd == -1){
				perror("accept error");
				exit(1);
			}
			FD_SET(connfd, &allset);
            //记录一下现在的最大值
			if(connfd > maxfd){
				maxfd = connfd;
			}
			if(--nready == 0){
				continue;//只有一个连接, 不需要进一步处理
			}
		}
        //遍历其余的读取事件
		for(i = listenfd + 1; i <= maxfd; i++){
			if(FD_ISSET(i, &rset)){
                //获取数据, 这一个实例里面是每一次写入以后会读取服务器的返回值
				ret = read(i, buf, sizeof(buf));
				if(ret == 0){
					printf("client close\n");
					close(i);
                      FD_CLR(i, &allset);
                      int nowmax;
                      //处理一下最大值
                      for(j = listenfd + 1; j <= maxfd; j++){
                          if(FD_ISSET(j, &allset))
                              nowmax = j;
                      }
                      maxfd = nowmax;
				}else if(ret == -1){
					perror("read error");
					exit(1);
				}else{
                      //处理一次写入
					for(j = 0; j < ret; j++){
						buf[j] = toupper(buf[j]);
					}
					write(i, buf, ret);
				}
				if(--nready == 0){
					break;
				}
			}
		}

	}

}
优缺点
  • 缺点
  1. 文件描述符里面不连续的时候上面的效率比较低, 可以使用一个单数的数组记录使用文件描述符
  2. 监听的个数有上限, 最大1024
  3. 检测满足条件的fd, 自己添加逻辑提高比较小, 编码难度比较大
  • 优点

可以跨平台, windows, linux, macOS, Unix, mips都支持这一个

poll

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

nfds 监控数组中有多少文件描述符需要被监控

timeout 毫秒级等待

​ -1:阻塞等,#define INFTIM -1 Linux中没有定义此宏

​ 0:立即返回,不阻塞进程

​ >0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值

如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1,poll不再监控此pollfd,下次返回时,把revents设置为0

相较于select而言,poll的优势:

  1. 传入、传出事件分离。无需每次调用时,重新设定监听事件。
  2. 文件描述符上限,可突破1024限制。能监控的最大上限数可使用配置文件调整。
示例
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <poll.h>

int main(void){
	int i, j, nready, ret;
	int maxfd = 0;
	int listenfd, connfd;
	struct pollfd client[1024];//poll使用的文件描述符数组
	char buf[BUFSIZ], str[INET_ADDRSTRLEN];

	struct sockaddr_in clie_addr, serv_addr;
	socklen_t clie_addr_len;
	//获取文件描述符
	listenfd = socket(AF_INET, SOCK_STREAM, 0);
	if(listenfd == -1){
		perror("socket error");
		exit(1);
	}
	int opt = 1;
	//设置端口复用
	ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
	if(ret == -1){
		perror("setsockopt error");
		exit(1);
	}
	//设置监听端口
	serv_addr.sin_family = AF_INET;
	serv_addr.sin_port = htons(6666);
	serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	ret = bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
	if(ret == -1){
		perror("bind error");
		exit(1);
	}

	ret = listen(listenfd, 128);
	if(ret == -1){
		perror("listen error");
		exit(1);
	}
	//设置poll使用的结构体
	client[0].fd = listenfd;
	client[0].events = POLLIN;
	for(i = 1; i < 1024 ;i++){
		client[i].fd = -1;
	}
	maxfd = 0;

	while(1){

		nready = poll(client, maxfd+1, -1);
		if(nready < 0){
			perror("select error");
			exit(1);
		}
		if(client[0].revents & POLLIN){
			clie_addr_len = sizeof(clie_addr);
			connfd = accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len);
			printf("connect\n");
			if(connfd == -1){
				perror("accept error");
				exit(1);
			}
			for(i = 1;i < 1024; i++){
				if(client[i].fd < 0){
					client[i].fd = connfd;
					client[i].events = POLLIN;
					break;
				}
			}
			if(i == 1024){
				perror("too many client");
			}
			if(i>maxfd){
				maxfd = i;
			}
			if(--nready == 0){
				continue;
			}
		}
		//处理其他的文件描述符
		for(i = 1; i <= maxfd; i++){
			if(client[i].fd < 0){
				continue;
			}

			if(client[i].revents & POLLIN){
				ret = read(client[i].fd, buf, sizeof(buf));
				if(ret == 0){
					printf("client close\n");
					close(client[i].fd);
					int nowmax = 0;
					for(j = listenfd + 1; j <= maxfd; j++){
						if(client[j].fd > 0)
							nowmax = i;
					}
					maxfd = nowmax;
					client[i].fd = -1;
				}else if(ret == -1){
					perror("read error");
					exit(1);
				}else{
					for(j = 0; j < ret; j++){
						buf[j] = toupper(buf[j]);
					}
					write(client[i].fd, buf, ret);
				}
				if(--nready == 0){
					break;
				}

			}
		}

	}

}
优缺点
  • 优点

自带数据结构, 把监听事件和返回事件分离

可以拓展监听上限, 超出1024

  • 缺点

不可以跨平台

无法直接定位满足监听事件的描述符

epoll

epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行

epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

epoll_create打开

image-20240408163813369

#include <sys/epoll.h>
int epoll_create(int size);
epoll_ctl控制
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

fd : 要监听的fd

event: 告诉内核需要监听的事件

typedef union epoll_data {
 void        *ptr;
 int          fd;	//对应监听事件的fd
 uint32_t     u32;	//这一个不使用
 uint64_t     u64;	//不使用
} epoll_data_t;

struct epoll_event {
 uint32_t     events;      /* Epoll events 主要还是EPOLLIN, EPOLLOUT, EPOLLERR*/
 epoll_data_t data;        /* User data variable */
};

返回值:成功:0;失败:-1,设置相应的errno

epoll_wait监听
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events,
               int maxevents, int timeout);
实际使用
  1. epoll_creat创建epoll使用的文件描述符
  2. 把获取的监听的文件描述符使用epoll_ctl进行监听
  3. 调用epoll_wait阻塞等待
  4. 返回以后判断是不是监听的文件描述符, 是的话把这一个描述符加入监听
  5. 不是的话处理一下连接客户端发送的数据
#include <sys/epoll.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ctype.h>

int main(void){
	int i, j, nready, ret;
	int listenfd, connfd, efd;
	struct epoll_event temp, ep[1024];
	char buf[BUFSIZ], str[INET_ADDRSTRLEN];

	struct sockaddr_in clie_addr, serv_addr;
	socklen_t clie_addr_len;
	//获取文件描述符
	listenfd = socket(AF_INET, SOCK_STREAM, 0);
	if(listenfd == -1){
		perror("socket error");
		exit(1);
	}
	int opt = 1;
	//设置端口复用
	ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
	if(ret == -1){
		perror("setsockopt error");
		exit(1);
	}
	//设置监听端口
	serv_addr.sin_family = AF_INET;
	serv_addr.sin_port = htons(6666);
	serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	ret = bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
	if(ret == -1){
		perror("bind error");
		exit(1);
	}

	ret = listen(listenfd, 128);
	if(ret == -1){
		perror("listen error");
		exit(1);
	}
    
	//获取一个树根
	efd = epoll_create(1024);
	if(efd < 0){
		perror("epoll creat error");
		exit(0);
	}
    
	//把监听使用的节点添加
	temp.events = EPOLLIN;
	temp.data.fd = listenfd;
	ret = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &temp);
	if(ret == -1){
		perror("epoll ctl error");
		exit(1);
	}

	while(1){
        //开始等待
		nready =epoll_wait(efd, ep, 1024, -1);
		if(nready < 0){
			perror("select error");
			exit(1);
		}
        //处理
		for(i = 0 ; i < nready ; i++){
			if(!ep[i].events & EPOLLIN)
				continue;
			if(ep[i].data.fd == listenfd){
				//这是一个连接事件
                  clie_addr_len = sizeof(clie_addr);
				connfd = accept(listenfd, (struct sockaddr *)&clie_addr,
						&clie_addr_len);
				//打印一下连接端口的信息
				printf("connect %s :%d\n", 
						inet_ntop(AF_INET, &clie_addr.sin_addr,
							str, sizeof(str)), 
						ntohs(clie_addr.sin_port));
				if(connfd == -1){
					perror("accept error");
						exit(1);
				}
				temp.events = EPOLLIN;
				temp.data.fd = connfd;
				ret = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &temp);
				if(ret < 0){
					perror("epoll ctl con error");
					exit(1);
				}
			}else{
				//处理其他的文件
				if(ep[i].events & EPOLLIN){
					ret = read(ep[i].data.fd, buf, sizeof(buf));
					if(ret == 0){
						//连接结束
						printf("client close\n");
						close(ep[i].data.fd);
						epoll_ctl(efd, EPOLL_CTL_DEL, ep[i].data.fd, NULL);
					}else if(ret == -1){
                        //read错误, 最好判断一下errno
						perror("read error");
						epoll_ctl(efd, EPOLL_CTL_DEL, ep[i].data.fd, NULL);
						close(ep[i].data.fd);
					}else{
						for(j = 0; j < ret; j++){
							buf[j] = toupper(buf[j]);
						}
						write(ep[i].data.fd, buf, ret);
					}
				}
			}
		}
	}
}

poll和epoll突破1024的限制

可以使用cat命令查看一个进程可以打开的socket描述符上限。

cat /proc/sys/fs/file-max

如有需要,可以通过修改配置文件的方式修改该上限值。

sudo vi /etc/security/limits.conf

在文件尾部写入以下配置,soft软限制,hard硬限制。如下图所示。

* soft nofile 65536

* hard nofile 100000

事件模型

EPOLL事件有两种模型:

Edge Triggered (ET) 边缘触发只有数据到来才触发,不管缓存区中是否还有数据。

Level Triggered (LT) 水平触发只要有数据都会触发(缓冲区里面的数据没有读取完就会触发)。

思考如下步骤:

  1. 假定我们已经把一个用来从管道中读取数据的文件描述符(rfd)添加到epoll描述符。
  2. 管道的另一端写入了2KB的数据
  3. 调用epoll_wait,并且它会返回rfd,说明它已经准备好读取操作
  4. 读取1KB的数据
  5. 调用epoll_wait……

在这个过程中,有两种工作模式:

ET模式

ET模式即Edge Triggered工作模式。

如果我们在第1步将rfd添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。

  1. 基于非阻塞文件句柄
  2. 只有当read或者write返回EAGAIN(非阻塞读,暂时无数据)时才需要挂起、等待。但这并不是说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/epoll.h>
#include <unistd.h>

int main(void){
	int efd, i;
	int pfd[2];//管道使用的文件描述符
	pid_t pid;
	char buf[10];
	char ch = 'a';

	pipe(pfd);//创建一个管道
	pid = fork();

	if(pid == 0){
		//子进程
		//子进程关闭读端
		close(pfd[0]);
		while(1){
			//子进程写数据到管道
			for(i = 0 ;i < 5 ;i++)
				buf[i] = ch;
			buf[i-1] = '\n';
			ch++;
			for(;i<10;i++)
				buf[i] = ch;
			buf[i-1] = '\n';
			if(ch > 'z')
				ch = 'a';
			write(pfd[1], buf, sizeof(buf));
			sleep(5);
		}
		close(pfd[1]);
	}else if(pid > 0){
		//父进程
		//父进程关闭写端
		close(pfd[1]);
		struct epoll_event event;
		struct epoll_event revents[10];
		int ret, len;

		efd = epoll_create(1);
		event.events = EPOLLIN | EPOLLET;//监听的时候使用ET模式
		event.data.fd = pfd[0];
		epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);
		
		while(1){
			ret = epoll_wait(efd, revents, 10, -1);
			if(ret > 0){
				for(i = 0; i < ret; i++){
					if(revents[i].data.fd == pfd[0]){
						len = read(pfd[0], buf, sizeof(buf)/2);//这一个读取的时候只会读取一半
						write(STDOUT_FILENO, buf, len);
					}
				}
			}
		}
		close(pfd[0]);
		close(efd);
	}else{
		perror("fork");
		exit(1);
	}
	return 0;
}
LT模式

LT模式即Level Triggered工作模式。

与ET模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用。

比较

LT(level triggered):LT是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

ET(edge-triggered):ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once).

反应堆模型

实际是使用epoll的ET模式加非阻塞加返回联合体里面的void *ptr

可以使用这一个结构体里面加一个fd和一个回调函数

使用这一个模型的时候处理的方式会发生改变

  • 普通模型

socket, bind, listen – epoll_creat 创建一个红黑树 – 返回 epfd – epoll_ctl 在树上加一个监听节点 – while(1) – epoll_wait 监听 – 监听事件发生 – 返回数组 – 判断返回元素 – lfd – accept / cfd – read() – 大小写转换 – write

  • 反应堆模型

socket, bind, listen – epoll_creat 创建一个红黑树 – 返回 epfd – epoll_ctl 在树上加一个监听节点 – while(1) – epoll_wait 监听 – 监听事件发生 – 返回数组 – 判断返回元素 – lfd – accept / cfd – 调用读回调函数 – epoll_ctl把这一个节点取下来 – 改为监听写事件 – epoll_ctl把这一个节点加回去 – epoll_wait等待可写 – 写回调函数 – 把这一个节点取下来 – 改为读事件 – 加回去

struct myevent_s {
	int fd;				//记录文件描述符
    int events;			//记录当前监听的事件
    void *arg;			//一个数据
    void (*call_back)(int fd, int events, void *arg);		//回调函数
    int status;						//记录在不在红黑树里面
    char buf[BUFLEN];
    int len;						
    long last_active;				//记录最后一次的连接时间, 长时间连接不断开踢出去  
};

优缺点

  • 优点

高效, 使用简单, 不受文件描述符的个数的限制

  • 缺点

不能跨平台, Linux专有

举报

相关推荐

0 条评论