一个 epoll 连接句柄定时管理器
当使用 epoll 作服务端框架,如果缓存了客户端到服务器的连接句柄(connfd),服务端需要管理起来,例如定时检查连接会话是否过期,自动清理连接等等。如下提供了一个 API 用于此目的。使用了 epoll 和 libevent。
/***********************************************************************************
* Copyright (c) 2008-2080 pepstack.com, 350137278@qq.com *
* *
* Anti 996 License Version 1.0 (Draft) *
* *
* https://github.com/kattgu7/Anti-996-License/blob/master/LICENSE *
* *
* Permission is hereby granted to any individual or legal entity obtaining a copy *
* of this licensed work (including the source code, documentation and/or related *
* items, hereinafter collectively referred to as the "licensed work"), free of *
* charge, to deal with the licensed work for any purpose, including without *
* limitation, the rights to use, reproduce, modify, prepare derivative works of, *
* publish, distribute and sublicense the licensed work, subject to the following *
* conditions: *
* *
* 1. The individual or the legal entity must conspicuously display, without *
* modification, this License on each redistributed or derivative copy of the *
* Licensed Work. *
* *
* 2. The individual or the legal entity must strictly comply with all applicable *
* laws, regulations, rules and standards of the jurisdiction relating to *
* labor and employment where the individual is physically located or where *
* the individual was born or naturalized; or where the legal entity is *
* registered or is operating (whichever is stricter). In case that the *
* jurisdiction has no such laws, regulations, rules and standards or its *
* laws, regulations, rules and standards are unenforceable, the individual *
* or the legal entity are required to comply with Core International Labor *
* Standards. *
* *
* 3. The individual or the legal entity shall not induce or force its *
* employee(s), whether full-time or part-time, or its independent *
* contractor(s), in any methods, to agree in oral or written form, *
* to directly or indirectly restrict, weaken or relinquish his or *
* her rights or remedies under such laws, regulations, rules and *
* standards relating to labor and employment as mentioned above, *
* no matter whether such written or oral agreement are enforceable *
* under the laws of the said jurisdiction, nor shall such individual *
* or the legal entity limit, in any methods, the rights of its employee(s) *
* or independent contractor(s) from reporting or complaining to the copyright *
* holder or relevant authorities monitoring the compliance of the license *
* about its violation(s) of the said license. *
* *
* THE LICENSED WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS *
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT *
* HOLDER BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION *
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN ANY WAY CONNECTION *
* WITH THE LICENSED WORK OR THE USE OR OTHER DEALINGS IN THE LICENSED WORK. *
***********************************************************************************/
/**
* epevent.h
* epoll libevent loop base for multi-threads
* 自动管理超时 epoll 连接句柄
*
* refer:
* http://libevent.org/
*
* @author: master@pepstack.com
* @version: 1.0
* @create: 2019-04-20
* @update: 2019-04-20
*/
#ifndef EPEVENT_H_INCLUDED
#define EPEVENT_H_INCLUDED
#if defined(__cplusplus)
extern "C" {
#endif
#include <sys/epoll.h>
/**
* Only one event_base_loop can run on each event_base at once.
* Libevent can also be used for multi-threaded applications,
* either by isolating each event_base so that only a single
* thread accesses it, or by locked access to a single shared
* event_base.
*
* link to: libevent.a
*/
#include <event2/event.h>
/**
* evthread_use_pthreads()
*
* link to: libevent_pthreads.a
*/
#include <event2/thread.h>
/**
* error codes
*/
#define EPEVT_SUCCESS 0
#define EPEVT_ERROR (-1)
#define EPEVT_E_USEPTHREADS (-2)
#define EPEVT_E_BASENEW (-3)
#define EPEVT_E_OUTMEMORY (-4)
#define EPEVT_E_NEWTHREAD (-5)
#define EPEVT_E_EVENTNEW (-6)
#define EPEVT_E_SYSTEM (-10)
/**
* 0: trace - print all verbose messages
* 1: debug - print the important messages
* 2: error - print only error messages
* 3: disable - do not print any messages
*/
#ifndef EPEVT_PRINT_LEVEL
# define EPEVT_PRINT_LEVEL 0
#endif
typedef struct epevt_loopbase_t *epevt_loopbase;
typedef struct epevt_timeval_t *epevt_timeval;
typedef void *(*epevt_addloop_thread_t)(void *);
static void epevt_timeout_cb (evutil_socket_t fd, short events, void *evarg);
static int epevt_loopbase_create (int interval_ms, int timeout_ms,
void (*timeout_cb)(evutil_socket_t, short, void *),
int (*tearoff_cb)(void *, epevt_timeval), void *cbarg,
void *(*addloop_thread)(void *),
epevt_loopbase *outevb);
static void epevt_loopbase_free (epevt_loopbase evb);
static int epevt_timeval_create (epevt_loopbase evb,
int epollfd, int connfd, int evt_timeo_ms,
void *evarg, int argsize,
void (*timeout_cb)(evutil_socket_t, short, void *),
epevt_timeval *outepevt);
static void epevt_timeval_delete (epevt_timeval epevt);
static int epevt_timeval_add (epevt_timeval epevt, int loop);
typedef struct epevt_timeval_t
{
/* epoll fd */
int epollfd;
/* socket fd for connection */
int connfd;
/* libevent and timer event */
struct event *evt;
struct timeval tv;
/* event base */
epevt_loopbase evb;
int argsize;
char evarg[0];
} epevt_timeval_t;
typedef struct epevt_loopbase_t
{
pthread_mutex_t lock;
pthread_cond_t cond;
/* control thread running */
volatile int shutdown;
struct event_base *base;
/* addloop thread */
int interval_ms;
int timeout_ms;
void (*timeout_cb)(evutil_socket_t, short, void *);
int (*tearoff_cb)(void *cbarg, struct epevt_timeval_t *epevt);
void *cbarg;
pthread_t thread;
} epevt_loopbase_t;
static void * epevt_addloop_thread (void * evarg)
{
int err;
struct timeval now;
struct timespec abstime;
epevt_timeval epevt;
epevt_loopbase evb = (epevt_loopbase) evarg;
/* 计算等待的相对时间间隔 */
int tv_sec = evb->interval_ms / 1000;
int tv_usec = (evb->interval_ms % 1000) * 1000;
#if EPEVT_PRINT_LEVEL < 2
printf("(epevent.h:%d) epevt_addloop_thread start. (interval=%d ms, timeout=%d ms)\n", __LINE__, evb->interval_ms, evb->timeout_ms);
#endif
pthread_mutex_lock(&evb->lock);
while (! evb->shutdown ) {
/* 计算等待到期的绝对时间 */
gettimeofday(&now, 0);
abstime.tv_sec = now.tv_sec + tv_sec;
abstime.tv_nsec = (now.tv_usec + tv_usec) * 1000UL;
/**
* 等待条件有两种方式:条件等待 pthread_cond_wait 和计时等待 pthread_cond_timedwait.
* 无论论哪种等待方式, 都必须和一个互斥锁配合, 以防止多个线程同时请求 pthread_cond_wait.
*
* 计时等待方式如果在给定时刻前条件没有满足,则返回 ETIMEOUT, 同时上锁.
*
* 等待条件变量, 解除锁, 然后阻塞. 当 wait 返回,则条件变量有信号,同时上锁.
*/
err = pthread_cond_timedwait(&evb->cond, &evb->lock, &abstime);
if (err != ETIMEDOUT) {
/* 得到一个非超时信号, 说明系统要求中止线程 */
#if EPEVT_PRINT_LEVEL < 3
printf("(epevent.h:%d) error: pthread_cond_timedwait returns(%d): %s.\n", __LINE__, err, strerror(err));
#endif
break;
}
err = epevt_timeval_create(evb, -1, 0, evb->timeout_ms, NULL, 64, evb->timeout_cb, &epevt);
if (err) {
#if EPEVT_PRINT_LEVEL < 3
printf("(epevent.h:%d) error: out memory.\n", __LINE__);
#endif
exit(EPEVT_E_OUTMEMORY);
} else {
err = epevt_timeval_add(epevt, 1);
if (err) {
#if EPEVT_PRINT_LEVEL < 3
printf("(epevent.h:%d) error: event_base_loop.\n", __LINE__);
#endif
epevt_timeval_delete(epevt);
}
}
}
/* Unlock */
pthread_mutex_unlock(&evb->lock);
#if EPEVT_PRINT_LEVEL < 2
printf("(epevent.h:%d) epevt_addloop_thread shutdown.\n", __LINE__);
#endif
return (void *) 0;
}
void epevt_timeout_cb (evutil_socket_t fd, short events, void *evarg)
{
epevt_timeval epevt;
int err, rc;
epevt = (epevt_timeval) evarg;
if (epevt->epollfd == -1) {
#if EPEVT_PRINT_LEVEL < 1
printf("(epevent.h:%d) timeout (connfd=%d).\n", __FILE__, epevt->connfd);
#endif
epevt_timeval_delete(epevt);
return;
}
/**
* tearoff_cb()
*
* 调用者提供的回调函数.
* 返回 rc:
* 0: 继续循环
* 1: 结束循环
* 其他: 未知错误
*/
rc = epevt->evb->tearoff_cb(epevt->evb->cbarg, epevt);
if (rc == 0) {
/* 继续循环: 没有作废, 继续监视该事件 */
#if EPEVT_PRINT_LEVEL < 1
printf("(epevent.h:%d) not tearoffed (connfd=%d).\n", __FILE__, epevt->connfd);
#endif
err = epevt_timeval_add(epevt, 0);
if (err) {
#if EPEVT_PRINT_LEVEL < 2
printf("(epevent.h:%d) exit on event_add error:%d (connfd=%d).\n", __FILE__, err, epevt->connfd);
#endif
epevt_timeval_delete(epevt);
/* 退出程序 */
exit(err);
}
} else if (rc == 1) {
/* 结束循环: 已经作废, 删除该事件 */
#if EPEVT_PRINT_LEVEL < 1
printf("(epevent.h:%d) tearoffed (connfd=%d).\n", __FILE__, epevt->connfd);
#endif
epevt_timeval_delete(epevt);
} else {
/* 其他: 发生错误 ! */
#if EPEVT_PRINT_LEVEL < 3
printf("(epevent.h:%d) tearoff_cb error:%d (connfd=%d)\n", __FILE__, rc, epevt->connfd);
#endif
epevt_timeval_delete(epevt);
}
}
int epevt_timeval_create (epevt_loopbase evb,
int epollfd, int connfd, int evt_timeo_ms,
void *evarg, int argsize,
void (*timeout_cb)(evutil_socket_t, short, void *),
epevt_timeval *outepevt)
{
epevt_timeval epevt = (epevt_timeval) malloc( sizeof(epevt_timeval_t) + argsize );
if (! epevt) {
return EPEVT_E_OUTMEMORY;
}
epevt->evt = NULL;
epevt->evb = evb;
epevt->epollfd = epollfd;
epevt->connfd = connfd;
epevt->tv.tv_sec = evt_timeo_ms / 1000;
epevt->tv.tv_usec = (evt_timeo_ms % 1000) * 1000;
epevt->argsize = argsize;
if (evarg) {
memcpy(epevt->evarg, evarg, argsize);
}
if (timeout_cb) {
epevt->evt = event_new(evb->base, -1, EV_TIMEOUT, timeout_cb, (void*) epevt);
} else {
epevt->evt = event_new(evb->base, -1, EV_TIMEOUT, evb->timeout_cb, (void*) epevt);
}
if (! epevt->evt) {
free(epevt);
return EPEVT_E_EVENTNEW;
}
*outepevt = epevt;
return EPEVT_SUCCESS;
}
int epevt_timeval_add (epevt_timeval epevt, int loop)
{
int err;
err = event_add(epevt->evt, &epevt->tv);
if (! err) {
if (! loop) {
return EPEVT_SUCCESS;
}
err = event_base_loop(epevt->evb->base, EVLOOP_ONCE | EVLOOP_NONBLOCK);
if (! err) {
return EPEVT_SUCCESS;
}
}
return err;
}
void epevt_timeval_delete (epevt_timeval epevt)
{
if (epevt->evt) {
// remove from loop
event_del(epevt->evt);
// MUST call it to free memory
event_free(epevt->evt);
}
if (epevt->epollfd != -1) {
/**
* In kernel versions before 2.6.9, the EPOLL_CTL_DEL operation
* required a non-NULL pointer in event, even though this argument
* is ignored. Since Linux 2.6.9, event can be specified as NULL
* when using EPOLL_CTL_DEL.
* Applications that need to be portable to kernels before 2.6.9
* should specify a non-NULL pointer in event.
* Usage:
* struct epoll_event ev = {0};
* epoll_ctl(epevt->epollfd, EPOLL_CTL_DEL, epevt->connfd, &ev);
*/
epoll_ctl(epevt->epollfd, EPOLL_CTL_DEL, epevt->connfd, NULL);
close(epevt->connfd);
}
free(epevt);
}
int epevt_loopbase_create (int interval_ms, int timeout_ms,
void (*timeout_cb)(evutil_socket_t, short, void *),
int (*tearoff_cb)(void *, epevt_timeval), void *cbarg,
void *(*addloop_thread)(void *),
epevt_loopbase *outevb)
{
epevt_loopbase evb;
if (evthread_use_pthreads() != 0) {
return EPEVT_E_USEPTHREADS;
}
evb = (epevt_loopbase) calloc ( 1, sizeof(epevt_loopbase_t) );
if (! evb) {
return EPEVT_E_OUTMEMORY;
}
if (pthread_cond_init(&evb->cond, 0)) {
free(evb);
return EPEVT_E_SYSTEM;
}
if (pthread_mutex_init(&evb->lock, 0)) {
free(evb);
return EPEVT_E_SYSTEM;
}
evb->base = event_base_new();
if (! evb->base) {
free(evb);
return EPEVT_E_BASENEW;
}
evb->interval_ms = interval_ms;
evb->timeout_ms = timeout_ms;
evb->tearoff_cb = tearoff_cb;
evb->cbarg = cbarg;
if (timeout_cb) {
evb->timeout_cb = timeout_cb;
} else {
evb->timeout_cb = epevt_timeout_cb;
}
if (addloop_thread) {
evb->thread = create_joinable_thread(addloop_thread, (void*) evb);
} else {
evb->thread = create_joinable_thread(epevt_addloop_thread, (void*) evb);
}
if (evb->thread == (pthread_t) (-1)) {
event_base_free(evb->base);
free(evb);
return EPEVT_E_NEWTHREAD;
}
*outevb = evb;
return EPEVT_SUCCESS;
}
void epevt_loopbase_free (epevt_loopbase evb)
{
if (evb) {
if (pthread_mutex_lock(&evb->lock) == 0) {
/* 通知结束线程 */
evb->shutdown = 1;
pthread_cond_signal(&evb->cond);
pthread_mutex_unlock(&evb->lock);
} else {
#if EPEVT_PRINT_LEVEL < 3
printf("(epevent.h:%d) should never run this!\n", __FILE__);
#endif
exit(EPEVT_E_SYSTEM);
}
/* 等待结束线程 */
pthread_join(evb->thread, NULL);
pthread_mutex_destroy(&evb->lock);
pthread_cond_destroy(&evb->cond);
event_base_loopexit(evb->base, NULL);
event_base_free(evb->base);
free(evb);
}
}
#if defined(__cplusplus)
}
#endif
#endif /* EPEVENT_H_INCLUDED */