diff --git a/test_bufffer_filter_zlib/zlib_client.cpp b/test_bufffer_filter_zlib/zlib_client.cpp new file mode 100644 index 0000000..49dd0ef --- /dev/null +++ b/test_bufffer_filter_zlib/zlib_client.cpp @@ -0,0 +1,195 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef _WIN32 + +#else +#include +#endif // !_WIN32 + +using namespace std; + +#define FILEPATH "001.txt" +#define SPORT 5001 +struct ClientStatus { + FILE *fp = 0; + bool end = false; + bool startSend = false; + z_stream *z_output = 0; + ~ClientStatus() { + delete z_output; + fclose(fp); + fp = 0; + z_output = 0; + } +}; +bufferevent_filter_result clinet_filter_out(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit, + enum bufferevent_flush_mode mode, void *ctx) +{ + ClientStatus *clientStatus = (ClientStatus * )ctx; + + if (!clientStatus->startSend) { + char data[1024] = { 0 }; + int len = evbuffer_remove(src, data, sizeof(data) - 1); + evbuffer_add(dst, data, len); + return BEV_OK; + } + //开始压缩文件 + //1.取出buffer中的数据的引用 + evbuffer_iovec v_in[1] = {0}; + int n = evbuffer_peek(src, -1, 0, v_in, 1); + if (n <= 0) { + //没有数据 + return BEV_NEED_MORE; + } + z_stream *p = clientStatus->z_output; + if (!p) { + return BEV_ERROR; + } + //输入数据大小 + p->avail_in = v_in[0].iov_len; + //输入数据地址 + p->next_in = (Byte*)v_in[0].iov_base; + + //申请输出空间大小 + evbuffer_iovec v_out[1] = { 0 }; + n = evbuffer_reserve_space(dst, 4096, v_out, 1); + if (n < 0) + return BEV_ERROR; + + //zlib输出空间大小 + p->avail_out = v_out[0].iov_len; + //zlib输出空间地址 + p->next_out = (Byte*)v_out[0].iov_base; + + //zlib 压缩 + int re = deflate(p, Z_SYNC_FLUSH); + if (re != Z_OK) { + cerr << "deflate falid" << endl; + } + + //压缩用了多少数据 + //p->avail_in 未处理数据大小 + int nread = v_in[0].iov_len - p->avail_in; + //压缩后数据大小 + //v_out[0].iov_len 剩余空间大小 + int nwrite = v_out[0].iov_len - p->avail_out; + + //移除 source evbuffer中数据 + evbuffer_drain(src, nread); + + //传入des evbuffer + v_out[0].iov_len = nwrite; + evbuffer_commit_space(dst, v_out, 1); + cout << "nread = " << nread << " nwirte=" << nwrite << endl; + +} +void clinet_read_cb(bufferevent *bev, void *arg) { + ClientStatus *clientStatus = (ClientStatus *)arg; + //002 接收服务端发送的OK消息 + char data[1024] = { 0 }; + int len = bufferevent_read(bev, data, sizeof(data) - 1); + if (strcmp(data, "ok") == 0) { + cout << data << endl; + clientStatus->startSend = true; + //开始发送文件 + bufferevent_trigger(bev, EV_WRITE, 0); + } + else { + bufferevent_free(bev); + } + + cout << "clinet_read_cb" << endl; +} + +void clinet_write_cb(bufferevent *bev, void *arg) { + ClientStatus *clientStatus = (ClientStatus *)arg; + FILE *fp = clientStatus->fp; + //判断什么时候清理资源 + if (clientStatus->end) { + //判断缓冲是否有数据 如果有就刷新 + //获取过滤器绑定的buffer + bufferevent * be = bufferevent_get_underlying(bev); + evbuffer *evb = bufferevent_get_output(be); + int len = evbuffer_get_length(evb); + if (len <= 0) { + bufferevent_free(bev); + delete clientStatus; + return; + } + + bufferevent_flush(bev, EV_WRITE, BEV_FINISHED); + return; + } + if (!fp) { + cout << "open file " << FILEPATH << "faild!" << endl; + return; + } + char data[1024] = { 0 }; + int len = fread(data, 1, sizeof(data), fp); + if (len <= 0) { + clientStatus->end = true; + bufferevent_flush(bev, EV_WRITE, BEV_FINISHED); + return; + } + bufferevent_write(bev, data, len); + +} +void clinet_event_cb(bufferevent *bev, short events, void *arg) { + cout << "clinet_event_cb" << endl; +} + +void client_event_cb(struct bufferevent *bev, short what, void *ctx) { + cout << "client_event_cb" << what << endl; + if (what & BEV_EVENT_CONNECTED) { + cout << "BEV_EVENT_CONNECTED" << endl; + //001 发送文件名 + bufferevent_write(bev, FILEPATH,strlen(FILEPATH)); + + FILE *fp = fopen(FILEPATH, "rb"); + if (!fp) { + cout << "open file " << FILEPATH << "faild!" << endl; + } + ClientStatus *clientStatus = new ClientStatus; + clientStatus->fp = fp; + + //初始化zlib上下文 + clientStatus->z_output = new z_stream(); + deflateInit(clientStatus->z_output, Z_DEFAULT_COMPRESSION); + + //创建输出过滤 + bufferevent *bev_filter = bufferevent_filter_new(bev, + 0,//输入过滤函数 + clinet_filter_out, //输出过滤 + BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS, //关闭filter同时关闭bufferevent 延时调用 + 0, //清理回调 + clientStatus); + + + bufferevent_setcb(bev_filter, clinet_read_cb, clinet_write_cb, clinet_event_cb, clientStatus); + bufferevent_enable(bev_filter, EV_READ | EV_WRITE); + } +} +void Client(event_base* base) { + cout << "begin Client" << endl; + //连接服务器 + sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(SPORT); + evutil_inet_pton(AF_INET,"127.0.0.1",&sin.sin_addr.s_addr); + bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); + + //只绑定连接事件回调,用于确认连接成功 + bufferevent_enable(bev, EV_READ | EV_WRITE); + bufferevent_setcb(bev, 0, 0, client_event_cb, 0); + + bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin)); + + +} \ No newline at end of file diff --git a/test_bufffer_filter_zlib/zlib_server.cpp b/test_bufffer_filter_zlib/zlib_server.cpp new file mode 100644 index 0000000..2a5f72c --- /dev/null +++ b/test_bufffer_filter_zlib/zlib_server.cpp @@ -0,0 +1,114 @@ +#include +#include +#include +#include +#include +#include + +#ifdef _WIN32 +#include +#else +#include +#include +#endif // !_WIN32 + +using namespace std; + +#define SPORT 5001 +struct Status { + bool start = false; + FILE *fp = 0; + string filename = ""; +}; +bufferevent_filter_result server_filter_in(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit, + enum bufferevent_flush_mode mode, void *ctx) +{ + cout << "server_filter_in" << endl; + + //1.接收客户端发送的文件名 + char data[1024] = { 0 }; + int len = evbuffer_remove(src, data, sizeof(data) - 1); + evbuffer_add(dst, data, len); + return BEV_OK; +} +void server_read_cb(bufferevent *bev, void *arg) { + + Status *status = (Status*)arg; + if (!status->start) { + char data[1024] = { 0 }; + bufferevent_read(bev, data, sizeof(data) - 1); + status->filename = data; + string out = "out\\"; + out += data; + status->fp = fopen(out.c_str(), "wb"); + if (!status->fp) { + cout << "open file " << out << "faild!" << endl; + return; + } + + bufferevent_write(bev, "ok", 2); + status->start = true; + return; + } + + //写入文件 + do { + char data[1024] = { 0 }; + int len = bufferevent_read(bev, data, sizeof(data) - 1); + if (len >= 0) { + fwrite(data, 1, len, status->fp); + fflush(status->fp); + } + } while (evbuffer_get_length(bufferevent_get_input(bev)) > 0); + +} +void server_event_cb(bufferevent *bev, short events, void *arg) { + cout << "server_event_cb" << events << endl; + + Status *status = (Status *)arg; + if (events&BEV_EVENT_EOF || events & BEV_EVENT_TIMEOUT) { + if (status->fp) { + fclose(status->fp); + status->fp = 0; + } + bufferevent_free(bev); + } +} + + +void server_listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg) +{ + std::cout << "server_listen_cb" << std::endl; + event_base *base = (event_base *)arg; + // 1.创建bufferevent 用于通信 + bufferevent *bev = bufferevent_socket_new(base, s, BEV_OPT_CLOSE_ON_FREE); + // 2.添加过滤 并设置输入回调 + Status *status = new Status; + bufferevent *bev_filter = bufferevent_filter_new(bev, + server_filter_in,//输入过滤函数 + 0, //输出过滤 + BEV_OPT_CLOSE_ON_FREE, //关闭filter同时关闭bufferevent + 0, //清理回调 + status); //传递参数 + // 3.设置回调 读取 事件(处理连接断开) + bufferevent_setcb(bev_filter, server_read_cb, 0, server_event_cb, status); + bufferevent_enable(bev_filter, EV_READ | EV_WRITE); +} + + +void Server(event_base* base) { + cout << "begin Server" << endl; + + sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(SPORT); + + evconnlistener *ev = evconnlistener_new_bind(base, //libevent上下文 + server_listen_cb, //接收到连接的回调函数 + base, //回调函数获取的参数(根据业务来) + LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, //地址重用,evconnlistener关闭同时关闭socket + 10, //连接队列大小,对应listen函数参数 + (sockaddr *)&sin, //绑定的地址和端口 + sizeof(sin)); +} \ No newline at end of file