You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

195 lines
5.2 KiB

#include <iostream>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#include <zlib.h>
#ifdef _WIN32
#else
#include <signal.h>
#endif // !_WIN32
using namespace std;
#define FILEPATH "001.txt"
#define SPORT 5001
struct ClientStatus {
FILE *fp = 0;
bool end = false;
bool startSend = false;
z_stream *z_output = 0;
~ClientStatus() {
delete z_output;
fclose(fp);
fp = 0;
z_output = 0;
}
};
bufferevent_filter_result clinet_filter_out(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit,
enum bufferevent_flush_mode mode, void *ctx)
{
ClientStatus *clientStatus = (ClientStatus * )ctx;
if (!clientStatus->startSend) {
char data[1024] = { 0 };
int len = evbuffer_remove(src, data, sizeof(data) - 1);
evbuffer_add(dst, data, len);
return BEV_OK;
}
//开始压缩文件
//1.取出buffer中的数据的引用
evbuffer_iovec v_in[1] = {0};
int n = evbuffer_peek(src, -1, 0, v_in, 1);
if (n <= 0) {
//没有数据
return BEV_NEED_MORE;
}
z_stream *p = clientStatus->z_output;
if (!p) {
return BEV_ERROR;
}
//输入数据大小
p->avail_in = v_in[0].iov_len;
//输入数据地址
p->next_in = (Byte*)v_in[0].iov_base;
//申请输出空间大小
evbuffer_iovec v_out[1] = { 0 };
n = evbuffer_reserve_space(dst, 4096, v_out, 1);
if (n < 0)
return BEV_ERROR;
//zlib输出空间大小
p->avail_out = v_out[0].iov_len;
//zlib输出空间地址
p->next_out = (Byte*)v_out[0].iov_base;
//zlib 压缩
int re = deflate(p, Z_SYNC_FLUSH);
if (re != Z_OK) {
cerr << "deflate falid" << endl;
}
//压缩用了多少数据
//p->avail_in 未处理数据大小
int nread = v_in[0].iov_len - p->avail_in;
//压缩后数据大小
//v_out[0].iov_len 剩余空间大小
int nwrite = v_out[0].iov_len - p->avail_out;
//移除 source evbuffer中数据
evbuffer_drain(src, nread);
//传入des evbuffer
v_out[0].iov_len = nwrite;
evbuffer_commit_space(dst, v_out, 1);
cout << "nread = " << nread << " nwirte=" << nwrite << endl;
}
void clinet_read_cb(bufferevent *bev, void *arg) {
ClientStatus *clientStatus = (ClientStatus *)arg;
//002 接收服务端发送的OK消息
char data[1024] = { 0 };
int len = bufferevent_read(bev, data, sizeof(data) - 1);
if (strcmp(data, "ok") == 0) {
cout << data << endl;
clientStatus->startSend = true;
//开始发送文件
bufferevent_trigger(bev, EV_WRITE, 0);
}
else {
bufferevent_free(bev);
}
cout << "clinet_read_cb" << endl;
}
void clinet_write_cb(bufferevent *bev, void *arg) {
ClientStatus *clientStatus = (ClientStatus *)arg;
FILE *fp = clientStatus->fp;
//判断什么时候清理资源
if (clientStatus->end) {
//判断缓冲是否有数据 如果有就刷新
//获取过滤器绑定的buffer
bufferevent * be = bufferevent_get_underlying(bev);
evbuffer *evb = bufferevent_get_output(be);
int len = evbuffer_get_length(evb);
if (len <= 0) {
bufferevent_free(bev);
delete clientStatus;
return;
}
bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
return;
}
if (!fp) {
cout << "open file " << FILEPATH << "faild!" << endl;
return;
}
char data[1024] = { 0 };
int len = fread(data, 1, sizeof(data), fp);
if (len <= 0) {
clientStatus->end = true;
bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
return;
}
bufferevent_write(bev, data, len);
}
void clinet_event_cb(bufferevent *bev, short events, void *arg) {
cout << "clinet_event_cb" << endl;
}
void client_event_cb(struct bufferevent *bev, short what, void *ctx) {
cout << "client_event_cb" << what << endl;
if (what & BEV_EVENT_CONNECTED) {
cout << "BEV_EVENT_CONNECTED" << endl;
//001 发送文件名
bufferevent_write(bev, FILEPATH,strlen(FILEPATH));
FILE *fp = fopen(FILEPATH, "rb");
if (!fp) {
cout << "open file " << FILEPATH << "faild!" << endl;
}
ClientStatus *clientStatus = new ClientStatus;
clientStatus->fp = fp;
//初始化zlib上下文
clientStatus->z_output = new z_stream();
deflateInit(clientStatus->z_output, Z_DEFAULT_COMPRESSION);
//创建输出过滤
bufferevent *bev_filter = bufferevent_filter_new(bev,
0,//输入过滤函数
clinet_filter_out, //输出过滤
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS, //关闭filter同时关闭bufferevent 延时调用
0, //清理回调
clientStatus);
bufferevent_setcb(bev_filter, clinet_read_cb, clinet_write_cb, clinet_event_cb, clientStatus);
bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}
}
void Client(event_base* base) {
cout << "begin Client" << endl;
//连接服务器
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);
evutil_inet_pton(AF_INET,"127.0.0.1",&sin.sin_addr.s_addr);
bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
//只绑定连接事件回调,用于确认连接成功
bufferevent_enable(bev, EV_READ | EV_WRITE);
bufferevent_setcb(bev, 0, 0, client_event_cb, 0);
bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin));
}