0
点赞
收藏
分享

微信扫一扫

利用aio+epoll简单改写sendfile


其实说白了,也就是想直接尝试利用一下aio +epoll结合代码,在利用人家sendfile的接口,就是替换sendfile,自己随便玩玩传输文件。我采用的是linux原生的libaio,glibc实现的aio貌似很多在吐槽,性能也不好直接别抛弃了。
这里提一下linux libaio的相关结构体,这些可以在/usr/include/libaio.h文件看到,也有io_prep_pread和io_prep_pwrite的实现。

//  iocb是提交IO任务时用到的,可以完整地描述一个IO请求
struct iocb {

void *data; //data是留给用来自定义的指针:可以设置为IO完成后的callback函数;

unsigned key; // 这个没有用过

short aio_lio_opcode; // 表示操作的类型:IO_CMD_PWRITE | IO_CMD_PREAD;

short aio_reqprio; // 优先级,不过看内核代码,貌似没有实现,应该属于预留字段

int aio_fildes; // 当然是要操作的fd

union {

struct io_iocb_common c; // 这个字段比较常用,下面会解释

struct io_iocb_vector v;

struct io_iocb_poll poll;

struct io_iocb_sockaddr saddr;

} u;
};

struct io_iocb_common {

void *buf; // 记录IO要操作的buf

unsigned long nbytes; // 记录IO操作字节数

long long offset; // 记录IO操作文件偏移量

unsigned flags; // 为epoll结合设置

unsigned resfd; // 为epoll结合设置

};

// io_event是用来描述返回结果的:
struct io_event {

void *data; // callback回调函数使用

struct iocb *obj; //obj就是之前提交IO任务时的iocb;

unsigned long res; // 处理的字节数

unsigned long res2; // (res2 = 0)是aio处理

};

sendfile的接口 n = sendfile64(fd, ifd, (off64_t*)&offset64, count);就是替换这个玩玩。 count是需要发送的长度,如果count > 100k我就采用aio读取。最多有128个异步IO同时存在,每个IO最多读取4k,最后在将每个IO读取拷贝到一个大buf里面,在用send做发送处理。这个写这个代码,让我知道很多细节处理,部分代码展示。

#define  AIO_NUM_EVENTS   128
#define ALIGN_SIZE 512
#define RD_WR_SIZE (4 * 1024)
#define BUF_SIZE_VAYNEDU (512 * 1024)

static int aio_num = 0;
char send_buf_vaynedu[512 * 1024 + 1] = {0};
uint64_t offset_vaynedu;

// 每个io处理回调函数,而且把每个io读到数据拷贝到大buf
void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
int iosize = iocb->u.c.nbytes;
char *buf = (char *)iocb->u.c.buf;
off_t offset = iocb->u.c.offset - offset_vaynedu;
int i = offset / RD_WR_SIZE;
char *p = send_buf_vaynedu;

if(res2 != 0){
printf("aio read\n");
}

//printf("request_type: %s, offset: %lld,length: %lu, res: %ld, res2: %ld, i:%d\n",(iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE", offset, iocb->u.c.nbytes, res, res2, i);

p += offset;
memcpy(p, buf, iosize);

//printf("i= %d, offset=%lu, p=%p, send_buf_vaynedu=%d\n\n", i, offset, p, strlen(send_buf_vaynedu));

}

// 对于小于100k的直接采用pread 和 send直接完成处理
int sendfile_small(int ofd, int ifd, off64_t *offset64, size_t count)
{
int pread_num;
int send_len;
int num;
int send_num = 0;
char *buf = (char *)malloc(count);
char *p = buf;

memset(buf, 0, count);
pread_num = pread(ifd, buf, count, *offset64);
if(pread_num < 0){
printf("pread failed: %d\n", pread_num);
exit(1);
}else if(pread_num == 0){
printf("pread finished\n");
}

while(pread_num > 0){
send_len = pread_num > 8192? 8192 : pread_num;
num = send(ofd, p, send_len, 0);
send_num += num;
if(num >= 0 && num < send_len ){
printf("num = %d\n", num);
break;
}else if(num < 0){
if(errno == EAGAIN){
printf("sendfile_small %s\n", strerror(errno));
//continue;
}
break;
}
pread_num -= num;

p += send_len;

}

printf("sendfile_small : ifd = %d, offset64 = %llu, pread_num = %d, send_num = %d\n\n", ifd, *offset64, pread_num, send_num);

free(buf);
buf = NULL;

return send_num;

}

// 模拟改写的sendfile
int sendfile_aio_test(int ofd, int ifd, off64_t *offset64, size_t count)
{
int i, j, num;
int ret;
int efd, epfd;
io_context_t ctx;
struct iocb **ios;
struct iocb *iosp;
struct io_event *events;
struct epoll_event epevent;
struct timespec tms;
char **iobuf = NULL;

offset_vaynedu = *offset64;
if(count < 102400){
return sendfile_small(ofd, ifd, offset64, count);
}else{
aio_num = AIO_NUM_EVENTS > (count/4096) ? (count/4096):AIO_NUM_EVENTS;
}

ios = (struct iocb **)malloc(aio_num * sizeof(struct iocb *));
iosp = (struct iocb *)malloc(aio_num * sizeof(struct iocb));
events = (struct io_event*)malloc(aio_num * sizeof(struct io_event));
iobuf = (char **)malloc(aio_num * sizeof(char *));

efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if(efd == -1){
printf("eventfd failed\n");
exit(1);
}

memset(&ctx, 0, sizeof(ctx));
ret = io_setup(aio_num, &ctx);
if(ret < 0){
printf("is_setup failed\n");
exit(1);
}
for(i = 0; i < aio_num; i++){
ret = posix_memalign((void **)&iobuf[i], ALIGN_SIZE, RD_WR_SIZE);
if(ret < 0){
printf("posix_memalign failed\n");
exit(1);
}
}

//memset(ios, 0, aio_num * sizeof(struct iocb *));
//memset(iobuf, 0 ,aio_num * RD_WR_SIZE);
memset(iosp, 0, aio_num * sizeof(struct iocb));
for(i = 0; i < aio_num; i++){
ios[i] = &iosp[i];
io_prep_pread(&iosp[i], ifd, iobuf[i], RD_WR_SIZE, i*RD_WR_SIZE+(*offset64));
io_set_eventfd(&iosp[i], efd);
io_set_callback(&iosp[i], aio_callback);
// printf("ios[%d].u.c.offset = %d, ios[%d].u.c.nbytes = %d, ios[%d].aio_fildes = %d\n\n",
// i, ios[i]->u.c.offset, i, ios[i]->u.c.nbytes, i, ios[i]->aio_fildes);
}

// printf("aio_num = %d\n", aio_num);
ret = io_submit(ctx, aio_num, ios);
if(ret != aio_num){
if(ret < 0){
printf("io_submit error:%s\n", strerror(-ret));
}else{
printf("io_submit failed,could not submit IOs ret:%d, aio_num:%d\n", ret, aio_num);
}
exit(1);
}

epfd = epoll_create(1);
if(epfd == -1){
printf("epoll_create failed\n");
exit(1);
}

epevent.events = EPOLLIN | EPOLLET;
epevent.data.ptr = NULL;

ret = epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent);
if(ret < 0){
printf("epoll_ctl failed\n");
exit(1);
}

i = 0;
while(i < aio_num){
uint64_t finished_aio;

if(epoll_wait(epfd, &epevent, 1, -1) != 1){
printf("epoll_wait failed\n");
exit(1);
}

if(read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)){
printf("read failed\n");
exit(1);
}

// printf("finished_aio = %d\n", finished_aio);

while(finished_aio > 0){
tms.tv_sec = 0;
tms.tv_nsec = 0;
num = io_getevents(ctx, 1, aio_num, events, &tms);
if(num > 0){
for(j = 0; j < num; j++){
((io_callback_t)(events[j].data))(ctx, events[j].obj,
events[j].res,
events[j].res2);
}
i += num;
finished_aio -= num;
}

}

}

int send_num = 0;
int count_num = 0;
int send_buf_vaynedu_len = strlen(send_buf_vaynedu);
char *p = send_buf_vaynedu;
printf("send_buf_vaynedu = %d, ", send_buf_vaynedu_len);
while(send_buf_vaynedu_len > 0){
num = send(ofd, p, 4096, 0);
send_num += num;
if(num >= 0 && num < 4096){
// printf("num = %d\n", num);
break;
}else if( num < 0){
if(errno == EAGAIN){
printf("send_aio_test: %s\n", strerror(errno));
//continue;
}
break;
}

send_buf_vaynedu_len -= num;

p += 4096;

}
printf("send_num = %d, send_buf_vaynedu = %d\n", send_num, send_buf_vaynedu_len);
if((aio_num < AIO_NUM_EVENTS) && (count-aio_num *4096 > 0)){
uint64_t offset64_small = aio_num * RD_WR_SIZE + *offset64;
count_num = sendfile_small(ofd, ifd, (off64_t *)&offset64_small, count - aio_num*4096);
printf("aio_num = %d, count_num = %d\n", aio_num, count_num);
}
memset(send_buf_vaynedu, 0, 512 * 1024 + 1);

close(epfd);
close(efd);
free(iosp);
free(ios);
free(events);
for(i = 0; i < aio_num; i++){
free(iobuf[i]);
}

io_destroy(ctx);
if(count_num < 0){
count_num = 0;
}

return send_num + count_num;

}

代码有严重的性能问题,因为仅仅是玩玩,熟悉一下aio的用法。

参考:
​​​http://www.kuqin.com/linux/20120908/330333.html​​​
​​​http://backend.blog.163.com/blog/static/20229412620135257159731/​​


举报

相关推荐

0 条评论