下面的方式是创建多个event_base来处理多线程的,主event_base用来处理连接请求,各个子event_base用来处理读写和关闭请求。
另一种方式是,所有的连接、读写、断开操作,都在一个event_base里面,然后当读到数据时,放入到子线程中处理,处理完了后再写,不过此时需要读写加锁,参考下列代码中的client_register_cb(...)
// TestServer.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#ifdef _WIN32
#include <winsock2.h>
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "event.lib")
#pragma comment(lib, "event_extra.lib")
#pragma comment(lib, "event_core.lib")
#else
#include <unistd.h>
#include <netinet/in.h>
#include <pthread.h>
# ifdef _XOPEN_SOURCE_EXTENDED
# include <arpa/inet.h>
# endif
#include <sys/socket.h>
#define GetCurrentThreadId() pthread_self()
#endif
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
#include <event2/thread.h>
static const char MESSAGE[] = "Hello, World!\n";
static const int PORT = 9638;
static void listener_cb(struct evconnlistener *, evutil_socket_t,
struct sockaddr *, int socklen, void *);
static void conn_writecb(struct bufferevent *, void *);
static void conn_readcb(struct bufferevent *, void *);
static void conn_eventcb(struct bufferevent *, short, void *);
static void signal_cb(evutil_socket_t, short, void *);
static void connlistener_errorcb(struct evconnlistener *, void *);
static bool init_client_dispatch();
int main2(int argc, char **argv)
{
printf("the main thread id: %d\n",GetCurrentThreadId());
struct event_base *base;
struct evconnlistener *listener;
struct event *signal_event;
//其实 event_base_new(); 内部也是这么实现的
struct event_config *cfg = event_config_new();
#ifdef WIN32
WSADATA wsa_data;
WSAStartup(0x0202, &wsa_data);
//告诉libEvent使用Windows自己的线程和同步锁,并启用多线程安全
evthread_use_windows_threads();
//Windows启用IOCP模式
event_config_set_flag(cfg, EVENT_BASE_FLAG_STARTUP_IOCP);
//根据CPU实际数量配置libEvent的CPU数
SYSTEM_INFO si;
GetSystemInfo(&si);
event_config_set_num_cpus_hint(cfg, 2);//si.dwNumberOfProcessors
#else
//告诉libEvent使用平台自己的线程和同步锁,并启用多线程安全
evthread_use_pthreads();
#endif
base = event_base_new_with_config(cfg);
event_config_free(cfg);
cfg = NULL;
if (!base)
{
fprintf(stderr, "Could not initialize libevent!\n");
return 1;
}
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
//创建服务端的socket,并绑定端口监听,当有客户端连接时,则listener_cb发生回调
listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr *)&sin, sizeof(sin));
if (!listener)
{
fprintf(stderr, "Could not create a listener!\n");
goto loop_basefree;
return 1;
}
/*//也可以把ipv6添加到监听中去
struct sockaddr_in6 sin6;
memset(&sin6, 0, sizeof(struct sockaddr_in6));
sin6.sin6_family = AF_INET6;
sin6.sin6_port = htons(2000);
evconnlistener *listener6 = evconnlistener_new_bind(base, listener_cb, base,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr*)&sin6, sizeof(sin6));
*/
//设置listen监听错误事件,这个其实没什么用
evconnlistener_set_error_cb(listener, connlistener_errorcb);
//创建一个新的事件信号
signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
if (!signal_event)
{
fprintf(stderr, "Could not create/add a signal event!\n");
goto loop_listener;
return 1;
}
if (event_add(signal_event, NULL) < 0)
{
fprintf(stderr, "Could not create/add a signal event!\n");
goto loop_signal;
return 1;
}
if (!init_client_dispatch())
{
goto loop_signal;
}
event_base_dispatch(base); //这里面进入循环了
loop_signal:
event_free(signal_event);
loop_listener:
evconnlistener_free(listener);
loop_basefree:
event_base_free(base);
#ifdef WIN32
WSACleanup();
#endif
printf("done\n");
return 0;
}
#include <thread>
#define CLIENT_THREAD_COUNT 2
struct tagThreadContext
{
evutil_socket_t fdRead;
evutil_socket_t fdWrite;
struct event_base* base;
}g_ThreadContext[CLIENT_THREAD_COUNT];
static void client_event_loop(int idx);
static void client_register_cb(evutil_socket_t, short, void *);
static bool init_client_dispatch()
{
for (int i = 0; i < CLIENT_THREAD_COUNT; ++i)
{
evutil_socket_t fds[2];
#ifdef WIN32
if (evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) != 0)
#else
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0)
#endif
{
printf("pipe fail: %s\n", evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
return false;
}
evutil_make_socket_nonblocking(fds[0]);
evutil_make_socket_nonblocking(fds[1]);
/*#else
if (pipe(fds) != 0)
{//管道不能使用send,而是write
printf("pipe fail: %s\n", evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
return false;
}
#endif*/
g_ThreadContext[i].fdRead = fds[0];
g_ThreadContext[i].fdWrite = fds[1];
}
for (int i = 0; i < CLIENT_THREAD_COUNT; ++i)
{
std::thread thd(&client_event_loop,i);
thd.detach();
}
return true;
}
static void client_event_loop(int idx)
{
printf("the client_event_loop thread id: %d, idx=%d\n", GetCurrentThreadId(), idx);
struct event_base* base = event_base_new();
g_ThreadContext[idx].base = base;
/*EV_PERSIST表示该事件永久有效,这样当没有客户端连接时,这个event_base_loop也不会结束
*/
struct event* pEvt = event_new(base, g_ThreadContext[idx].fdRead, EV_READ | EV_PERSIST, client_register_cb,(void*)idx);
event_add(pEvt, NULL);
event_base_dispatch(base);
event_free(pEvt);
event_base_free(base);
evutil_closesocket(g_ThreadContext[idx].fdRead);
evutil_closesocket(g_ThreadContext[idx].fdWrite);
}
static void client_register_cb(evutil_socket_t fd, short evt, void * arg)
{
int idx = (int)arg;
evutil_socket_t fdConn;
if (recv(g_ThreadContext[idx].fdRead, (char*)&fdConn, sizeof(fdConn), 0) > 0)
{
struct event_base* base = g_ThreadContext[idx].base;
/*将fd,也就是socket注册到事件循环中,同时启用线程安全操作BEV_OPT_THREADSAFE,
这样,在别的线程中 bufferevent_read/bufferevent_write/bufferevent_free(bev)将会是线程安全的。
这样,我们可以把数据处理放到别的线程中执行,读写通知放到这里执行。*/
struct bufferevent * bev = bufferevent_socket_new(base, fdConn, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
if (!bev)
{
fprintf(stderr, "Error constructing bufferevent!");
evutil_closesocket(fd);
//强制结束main里面的event_base_dispatch,这样整个服务端结束
event_base_loopbreak(base);
return;
}
printf("the client_register_cb thread id: %d, bev=%p\n", GetCurrentThreadId(), bev);
//设置读写和客户端连接状态的回调函数
bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, (void *)9);
//设置读写回调是否可用
bufferevent_enable(bev, EV_WRITE | EV_READ);
//向客户端写数据
bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
}
}
/*
libevent的一些问题记录。
1. 读写时,可能存在最大包字节限制,因为默认为16384字节,下面这2个宏是内部定义的。
#define MAX_SINGLE_READ_DEFAULT 16384
#define MAX_SINGLE_WRITE_DEFAULT 16384
可以通过bufferevent_set_max_single_read(...),bufferevent_set_max_single_write(...)去设置最大值。
evutil_socketpair(...)在Windows上使用可能会被防火墙挡住
在上面的应用例子listener_cb()中,之所以采用定时器的方式去获取,是因为可能Windows的防火墙挡住问题,在linux上可以直接使用pipe或者evutil_socketpair
*/
/**********************************************************************************/
//当一个新的连接到来时,触发该回调被调用
static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sa, int socklen, void *user_data)
{
/*
服务器开启多线程时,实际上是只负责监听连接事件,当收到连接时,就会把链接的对象抛给event_base_dispatch()
所以我们可以看到这个 listener_cb()实际就是在event_base_dispatch()所在的线程中执行的。
为了把读写操作放到另外的线程中执行,我们必须要重新创建 event_base.
这里面利用一个非常巧妙的设计,就是在新开启的线程中创建一个event_base,并进入自己的事件循环,然后在这个事件循环中,
注册增加新连接的socket,在这个新的event_base上监听读写事件。
看evutil_socketpair的源码,说在Windows上,可能因为防火墙问题导致创建失败,那么可以换一个新的方法,就是在新的event_base上
注册一个微秒级别的永久定时器,在定时器中去创建并注册这个新的fd,方法就是在
listener_cb里面,把fd存入到某个位置,定时器去取。
*/
printf("the listener_cb thread id: %d\n", GetCurrentThreadId());
int idx = fd % CLIENT_THREAD_COUNT;
send(g_ThreadContext[idx].fdWrite, (const char*)&fd, sizeof(fd), 0);
return;
}
//向客户端写完数据后发生的一个回调,通常是调用bufferevent_write后发生了回调。
static void conn_writecb(struct bufferevent *bev, void *user_data)
{
printf("the conn_writecb thread id: %d, bev=%p\n", GetCurrentThreadId(), bev);
//user_data值为bufferevent_setcb的最后一个参数
struct evbuffer *output = bufferevent_get_output(bev);
if (evbuffer_get_length(output) == 0)
{
//printf("flushed answer\n");
}
}
//当收到客户端发送过来的数据时,该函数发生了回调
static void conn_readcb(struct bufferevent *bev, void *user_data)
{
printf("the conn_readcb thread id: %d, bev=%p\n", GetCurrentThreadId(), bev);
struct evbuffer *input = bufferevent_get_input(bev);
size_t len = evbuffer_get_length(input);
if (!len)
{
puts("接收到的数据个数是0");
return;
}
char data[1025] = "";
size_t size = 0;
//从缓冲区中读取接收到的数据
while (0 != (size = bufferevent_read(bev, data, 1024)))
{
printf("data=%s, len=%d\n", data, size);
}
const char *wData = "send to client!";
bufferevent_write(bev, wData, strlen(wData) + 1);
//主动断开与客户端的连接
//bufferevent_free(bev);
}
static void conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{
printf("the conn_eventcb thread id: %d, bev=%p\n", GetCurrentThreadId(), bev);
if (events & BEV_EVENT_EOF)
{
printf("Connection closed.\n");
}
else if (events & BEV_EVENT_ERROR)
{
printf("Got an error on the connection: %s\n", evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
}
// None of the other events can happen here, since we haven't enabled timeouts
bufferevent_free(bev);
}
static void signal_cb(evutil_socket_t sig, short events, void *user_data)
{
printf("the signal_cb thread id: %d\n", GetCurrentThreadId());
struct event_base *base = (struct event_base *)user_data;
struct timeval delay = { 2, 0 };
printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");
event_base_loopexit(base, &delay);
}
static void connlistener_errorcb(struct evconnlistener *listener, void *user_data)
{
printf("the connlistener_errorcb thread id: %d\n", GetCurrentThreadId());
struct event_base *base = (struct event_base *)user_data;
}