0
点赞
收藏
分享

微信扫一扫

libevent(十二)bufferevent filter zlib 压缩通信(二)


libevent(十二)bufferevent filter zlib 压缩通信(二)_c++


使用zlib进行文件传输:

客户端:读取文件 -> 输出过滤器进行数据压缩 -> 发送数据

服务端:读取文件 -> 输入过滤器进行数据解压-> 存储数据

main.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <zlib.h>
using namespace std;

int main()
{
#ifdef _WIN32
//初始化socket库
WSADATA wsa;
WSAStartup(MAKEWORD(2, 2), &wsa);
#else
//忽略管道信号,发送数据给已关闭的socket
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
return 1;
#endif

std::cout << "test server!\n";
//创建libevent的上下文
event_base* base = event_base_new();
if (base)
{
cout << "event_base_new success!" << endl;
}

void Server(event_base * base);
Server(base);
void Client(event_base * base);
Client(base);
//事件分发处理
if (base)
event_base_dispatch(base);
if (base)
event_base_free(base);
#ifdef _WIN32
WSACleanup();
#endif
return 0;
}

zlib_server.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <string>
#include <zlib.h>

using namespace std;
#define SPORT 5001


struct Status
{
bool start = false;
FILE* fp = 0;
z_stream* p;
int recv_num = 0;
int write_num = 0;
~Status()
{
if (p)
{
inflateEnd(p);
}
delete p;
p = 0;

if (fp)
{
fclose(fp);
}
fp = 0;
}
};


bufferevent_filter_result filter_in(evbuffer* s, evbuffer* d,
ev_ssize_t limit, bufferevent_flush_mode mode, void* arg)
{
//1 接收客户端发送的文件名
Status* status = (Status*)arg;
if (!status->start)
{
char data[1024] = { 0 };
int len = evbuffer_remove(s, data, sizeof(data) - 1);
evbuffer_add(d, data, len);
return BEV_OK;
}

//解压
evbuffer_iovec v_in[1];

//读取数据,不清理缓冲
int n = evbuffer_peek(s, -1, NULL, v_in, 1);
if (n <= 0)
{
return BEV_NEED_MORE;
}
z_stream* p = status->p;

//zlib 输入数据大小
p->avail_in = v_in[0].iov_len;

//zlib 输入数据地址
p->next_in = (Byte*)v_in[0].iov_base;

//申请输出空间大小
evbuffer_iovec v_out[1];
evbuffer_reserve_space(d, 4096, v_out, 1);

//zlib 输出空间大小
p->avail_out = v_out[0].iov_len;

//zlib 输出空间地址
p->next_out = (Byte*)v_out[0].iov_base;

//解压数据
int re = inflate(p, Z_SYNC_FLUSH);
if (re != Z_OK)
{
cerr << "inflate failed!" << endl;
}


//解压用了多少数据,从source evbuffer中移除
//p->avail_in 未处理数据大小
int n_read = v_in[0].iov_len - p->avail_in;

//解压后数据大小 传入des evbuffer
//p->avail_out 剩余空间大小
int n_write = v_out[0].iov_len - p->avail_out;

//移除source evbuffer中数据
evbuffer_drain(s, n_read);

//传入des evbuffer
v_out[0].iov_len = n_write;
evbuffer_commit_space(d, v_out, 1);
cout << "Server n_read " << n_read << "\t n_write " << n_write << endl;
status->recv_num += n_read;
status->write_num += n_write;
return BEV_OK;
}


static void read_cb(bufferevent* bev, void* arg)
{
Status* status = (Status*)arg;
if (!status->start)
{
//001接收文件名
char data[1024] = { 0 };
bufferevent_read(bev, data, sizeof(data) - 1);
string out = "out/";
out += data;

//打开写入文件
status->fp = fopen(out.c_str(), "wb");
if (!status->fp)
{
cout << "server open " << out << " failed!" << endl;
return;
}

//002 回复OK
bufferevent_write(bev, "OK", 2);
status->start = true;
return;
}

do
{
//写入文件
char data[1024] = { 0 };
int len = bufferevent_read(bev, data, sizeof(data));
if (len >= 0)
{
fwrite(data, 1, len, status->fp);
fflush(status->fp);
}
} while (evbuffer_get_length(bufferevent_get_input(bev)) > 0);
}

static void event_cb(bufferevent* bev, short events, void* arg)
{
cout << "server event_cb " << events << endl;
Status* status = (Status*)arg;
if (events & BEV_EVENT_EOF)
{
cout << "server event BEV_EVENT_EOF success!" << endl;
cout << "Server recv = " << status->recv_num << endl;
cout << "Server write = " << status->write_num << endl;
delete status;
bufferevent_free(bev);
}
}


static void listen_cb(struct evconnlistener* e, evutil_socket_t s, struct sockaddr* a, int socklen, void* arg)
{
cout << "listen_cb" << endl;
event_base* base = (event_base*)arg;

//1 创建一个bufferevent 用来通信
bufferevent* bev = bufferevent_socket_new(base, s, BEV_OPT_CLOSE_ON_FREE);
Status* status = new Status();
status->p = new z_stream();
inflateInit(status->p);

//2 添加过滤 并设置输入回调
bufferevent* bev_filter = bufferevent_filter_new(bev,
filter_in, // 输入过滤函数
0, // 输出过滤
BEV_OPT_CLOSE_ON_FREE, // 关闭filter同时管理bufferevent
0, // 清理回调
status // 传递参数
);

//3 设置回调 读取 事件(处理连接断开)
bufferevent_setcb(bev_filter, read_cb, 0, event_cb, status);
bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}


void Server(event_base* base)
{
cout << "----begin Server----" << endl;
//监听端口(socket ,bind,listen 绑定事件)

sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);

evconnlistener* ev = evconnlistener_new_bind(base, // libevent的上下文
listen_cb, // 接收到连接的回调函数
base, // 回调函数获取的参数 arg
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, // 地址重用,evconnlistener关闭同时关闭socket
10, // 连接队列大小,对应listen函数
(sockaddr*)&sin, // 绑定的地址和端口
sizeof(sin)
);
}

zlib_client.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <zlib.h>
using namespace std;

#define FILEPATH "001.txt"

struct ClientStatus
{
FILE* fp = 0;
bool end = false;
bool startSend = false;
z_stream* z_output = 0;
int readNum = 0;
int sendNum = 0;
~ClientStatus()
{
if (z_output)
{
deflateEnd(z_output);
}
delete z_output;
z_output = 0;

if (fp)
{
fclose(fp);
}
fp = 0;
}
};

bufferevent_filter_result filter_out(evbuffer* s, evbuffer* d,
ev_ssize_t limit, bufferevent_flush_mode mode, void* arg)
{
ClientStatus* sta = (ClientStatus*)arg;

//压缩文件,发送文件名消息去掉
if (!sta->startSend)
{
char data[1024] = { 0 };
int len = evbuffer_remove(s, data, sizeof(data));
evbuffer_add(d, data, len);
return BEV_OK;
}
//开始压缩文件(取出buffer中数据的引用)
evbuffer_iovec v_in[1];
int n = evbuffer_peek(s, -1, 0, v_in, 1);
if (n<=0)
{
//调用write回调, 清理空间
if (sta->end)
{
return BEV_OK;
}
//没有数据 BEV_NEED_MORE 不会进入写入回调
return BEV_NEED_MORE;
}
//记下zlib上下文
z_stream* p = sta->z_output;
if (!p)
{
return BEV_ERROR;
}
//zlib 输入数据大小
p->avail_in = v_in[0].iov_len;
//zlib 输入数据地址
p->next_in = (Byte*)v_in[0].iov_base;

//申请输出空间大小
evbuffer_iovec v_out[1];
evbuffer_reserve_space(d, 4096, v_out, 1);
//zlib 输出空间大小
p->avail_out = v_out[0].iov_len;
//zlib 输出空间地址
p->next_out = (Byte*)v_out[0].iov_base;

//压缩
int re = deflate(p, Z_SYNC_FLUSH);
if (re != Z_OK)
{
cerr << "deflate failed!" << endl;
}

//压缩用了多少数据,从source evbuffer中移除
//p->avail_in 未处理数据大小
int n_read = v_in[0].iov_len - p->avail_in;

//压缩后数据大小 传入des evbuffer
//p->avail_out 剩余空间大小
int n_write = v_out[0].iov_len - p->avail_out;

//移除source evbuffer中数据
evbuffer_drain(s, n_read);

//传入des evbuffer
v_out[0].iov_len = n_write;
evbuffer_commit_space(d, v_out, 1);
cout << "Client n_read " << n_read << "\t n_write " << n_write << endl;
sta->readNum += n_read;
sta->sendNum += n_write;
return BEV_OK;
}


static void client_read_cb(bufferevent* bev, void* arg)
{

ClientStatus* sta = (ClientStatus*)arg;

//002 接收服务端发送的OK回复
char data[1024] = { 0 };
int len = bufferevent_read(bev, data, sizeof(data) - 1);
if (strcmp(data, "OK") == 0)
{
cout << data << endl;
sta->startSend = true;

//开始发送文件,触发写入回调
bufferevent_trigger(bev, EV_WRITE, 0);
}
else
{
bufferevent_free(bev);
}
cout << "client_read_cb " << len << endl;
}


static void client_write_cb(bufferevent* bev, void* arg)
{

cout << "client_write_cb" << endl;
ClientStatus* s = (ClientStatus*)arg;
FILE* fp = s->fp;
//判断什么时候清理资源
if (s->end)
{
//判断缓冲是否有数据,如果有刷新
//获取过滤器绑定的buffer
bufferevent* be = bufferevent_get_underlying(bev);
//获取输出缓冲及其大小
evbuffer* evb = bufferevent_get_output(be);
int len = evbuffer_get_length(evb);
if (len <= 0)
{

cout << "Client readNum = " << s->readNum << endl;
cout << "Client sendNum = " << s->sendNum << endl;

//立刻清理 如果缓冲有数据,不会发送
bufferevent_free(bev);
delete s;
return;
}
//刷新缓冲
bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
return;
}

if (!fp)return;

//读取文件
char data[1024] = { 0 };
int len = fread(data, 1, sizeof(data), fp);
if (len <= 0)
{
fclose(fp);
s->end = true;
//刷新缓冲
bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
return;
}
//发送文件
bufferevent_write(bev, data, len);
}


static void client_event_cb(bufferevent* be, short events, void* arg)
{
cout << "client_event_cb " << events << endl;
if (events & BEV_EVENT_CONNECTED)
{
cout << "client BEV_EVENT_CONNECTED" << endl;
//001 发送文件名
bufferevent_write(be, FILEPATH, strlen(FILEPATH));

//初始化文件句柄
FILE* fp = fopen(FILEPATH, "rb");
if (!fp)
{
cout << "open file " << FILEPATH << " failed!" << endl;
}
ClientStatus* s = new ClientStatus();
s->fp = fp;

//初始化zlib上下文
s->z_output = new z_stream();
deflate(s->z_output, Z_DEFAULT_COMPRESSION);

//创建输出过滤
bufferevent* bev_filter = bufferevent_filter_new(be, 0, filter_out,
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS, 0, s);


//设置读取、写入和事件的回调
bufferevent_setcb(bev_filter, client_read_cb, client_write_cb, client_event_cb, s);
bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}
}


void Client(event_base* base)
{
cout << "-----begin Client-----" << endl;
//连接服务端
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(5001);
evutil_inet_pton(AF_INET, "127.0.0.1", &sin.sin_addr.s_addr);
bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);

//只绑定事件回调,用来确认连接成功
bufferevent_enable(bev, EV_READ | EV_WRITE);
bufferevent_setcb(bev, 0, 0, client_event_cb, 0);

bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin));
//接收回复确认OK
}

Makefile

test_buffer_filter_zlib:main.cpp zlib_server.cpp zlib_client.cpp
g++ $^ -o $@ -levent -lz
./$@

clean:
rm -rf test_buffer_filter_zlib
rm


举报

相关推荐

0 条评论