libevent服务端,多线程应用

2023-11-14

 下面的方式是创建多个event_base来处理多线程的,主event_base用来处理连接请求,各个子event_base用来处理读写和关闭请求。

另一种方式是,所有的连接、读写、断开操作,都在一个event_base里面,然后当读到数据时,放入到子线程中处理,处理完了后再写,不过此时需要读写加锁,参考下列代码中的client_register_cb(...)

// TestServer.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>

#ifdef _WIN32
#include <winsock2.h>
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "event.lib")
#pragma comment(lib, "event_extra.lib")
#pragma comment(lib, "event_core.lib")
#else
#include <unistd.h>
#include <netinet/in.h>
#include <pthread.h>
# ifdef _XOPEN_SOURCE_EXTENDED
#  include <arpa/inet.h>
# endif
#include <sys/socket.h>
#define GetCurrentThreadId() pthread_self()
#endif

#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
#include <event2/thread.h>

static const char MESSAGE[] = "Hello, World!\n";

static const int PORT = 9638;

static void listener_cb(struct evconnlistener *, evutil_socket_t,
    struct sockaddr *, int socklen, void *);
static void conn_writecb(struct bufferevent *, void *);
static void conn_readcb(struct bufferevent *, void *);
static void conn_eventcb(struct bufferevent *, short, void *);
static void signal_cb(evutil_socket_t, short, void *);
static void connlistener_errorcb(struct evconnlistener *, void *);

static bool init_client_dispatch();

int main2(int argc, char **argv)
{
    printf("the main thread id: %d\n",GetCurrentThreadId());
    struct event_base *base;
    struct evconnlistener *listener;
    struct event *signal_event;

    //其实 event_base_new(); 内部也是这么实现的
    struct event_config *cfg = event_config_new();
#ifdef WIN32
    WSADATA wsa_data;
    WSAStartup(0x0202, &wsa_data);
    //告诉libEvent使用Windows自己的线程和同步锁,并启用多线程安全
    evthread_use_windows_threads();
    //Windows启用IOCP模式
    event_config_set_flag(cfg, EVENT_BASE_FLAG_STARTUP_IOCP);
    //根据CPU实际数量配置libEvent的CPU数
    SYSTEM_INFO si;
    GetSystemInfo(&si);
    event_config_set_num_cpus_hint(cfg, 2);//si.dwNumberOfProcessors
#else
    //告诉libEvent使用平台自己的线程和同步锁,并启用多线程安全
    evthread_use_pthreads();
#endif
    base = event_base_new_with_config(cfg);
    event_config_free(cfg);
    cfg = NULL;

    if (!base)
    {
        fprintf(stderr, "Could not initialize libevent!\n");
        return 1;
    }

    struct sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(PORT);

    //创建服务端的socket,并绑定端口监听,当有客户端连接时,则listener_cb发生回调
    listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
        LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1,
        (struct sockaddr *)&sin, sizeof(sin));

    if (!listener)
    {
        fprintf(stderr, "Could not create a listener!\n");
        goto loop_basefree;
        return 1;
    }

    /*//也可以把ipv6添加到监听中去
    struct sockaddr_in6 sin6;
    memset(&sin6, 0, sizeof(struct sockaddr_in6));
    sin6.sin6_family = AF_INET6;
    sin6.sin6_port = htons(2000);
    evconnlistener *listener6 = evconnlistener_new_bind(base, listener_cb, base,
        LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1,
        (struct sockaddr*)&sin6, sizeof(sin6));
    */

    //设置listen监听错误事件,这个其实没什么用
    evconnlistener_set_error_cb(listener, connlistener_errorcb);
    //创建一个新的事件信号
    signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
    if (!signal_event)
    {
        fprintf(stderr, "Could not create/add a signal event!\n");
        goto loop_listener;
        return 1;
    }
    if (event_add(signal_event, NULL) < 0)
    {
        fprintf(stderr, "Could not create/add a signal event!\n");
        goto loop_signal;
        return 1;
    }

    if (!init_client_dispatch())
    {
        goto loop_signal;
    }

    event_base_dispatch(base); //这里面进入循环了

loop_signal:
    event_free(signal_event);
loop_listener:
    evconnlistener_free(listener);
loop_basefree:
    event_base_free(base);

#ifdef WIN32
    WSACleanup();
#endif
    printf("done\n");
    return 0;
}

#include <thread>
#define CLIENT_THREAD_COUNT 2
struct tagThreadContext
{
    evutil_socket_t fdRead;
    evutil_socket_t fdWrite;
    struct event_base* base;
}g_ThreadContext[CLIENT_THREAD_COUNT];

static void client_event_loop(int idx);
static void client_register_cb(evutil_socket_t, short, void *);
static bool init_client_dispatch()
{
    for (int i = 0; i < CLIENT_THREAD_COUNT; ++i)
    {
        evutil_socket_t fds[2];
#ifdef WIN32
        if (evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) != 0)
#else
        if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0)
#endif
        {
            printf("pipe fail: %s\n", evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
            return false;
        }
        evutil_make_socket_nonblocking(fds[0]);
        evutil_make_socket_nonblocking(fds[1]);
/*#else
        if (pipe(fds) != 0)
        {//管道不能使用send,而是write
            printf("pipe fail: %s\n", evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
            return false;
        }
#endif*/
        g_ThreadContext[i].fdRead = fds[0];
        g_ThreadContext[i].fdWrite = fds[1];
    }
    for (int i = 0; i < CLIENT_THREAD_COUNT; ++i)
    {
        std::thread thd(&client_event_loop,i);
        thd.detach();
    }
    return true;
}

static void client_event_loop(int idx)
{
    printf("the client_event_loop thread id: %d, idx=%d\n", GetCurrentThreadId(), idx);
    struct event_base* base = event_base_new();

    g_ThreadContext[idx].base = base;

    /*EV_PERSIST表示该事件永久有效,这样当没有客户端连接时,这个event_base_loop也不会结束
    */
    struct event* pEvt = event_new(base, g_ThreadContext[idx].fdRead, EV_READ | EV_PERSIST, client_register_cb,(void*)idx);

    event_add(pEvt, NULL);

    event_base_dispatch(base);

    event_free(pEvt);

    event_base_free(base);

    evutil_closesocket(g_ThreadContext[idx].fdRead);
    evutil_closesocket(g_ThreadContext[idx].fdWrite);
}

static void client_register_cb(evutil_socket_t fd, short evt, void * arg)
{
    int idx = (int)arg;
    evutil_socket_t fdConn;
    if (recv(g_ThreadContext[idx].fdRead, (char*)&fdConn, sizeof(fdConn), 0) > 0)
    {
        struct event_base* base = g_ThreadContext[idx].base;
        /*将fd,也就是socket注册到事件循环中,同时启用线程安全操作BEV_OPT_THREADSAFE,
        这样,在别的线程中  bufferevent_read/bufferevent_write/bufferevent_free(bev)将会是线程安全的。
        这样,我们可以把数据处理放到别的线程中执行,读写通知放到这里执行。*/
        struct bufferevent * bev = bufferevent_socket_new(base, fdConn, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
        if (!bev)
        {
            fprintf(stderr, "Error constructing bufferevent!");
            evutil_closesocket(fd);
            //强制结束main里面的event_base_dispatch,这样整个服务端结束
            event_base_loopbreak(base);
            return;
        }

        printf("the client_register_cb thread id: %d, bev=%p\n", GetCurrentThreadId(), bev);

        //设置读写和客户端连接状态的回调函数
        bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, (void *)9);

        //设置读写回调是否可用
        bufferevent_enable(bev, EV_WRITE | EV_READ);

        //向客户端写数据
        bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
    }
}


/*
libevent的一些问题记录。
1. 读写时,可能存在最大包字节限制,因为默认为16384字节,下面这2个宏是内部定义的。
#define MAX_SINGLE_READ_DEFAULT 16384
#define MAX_SINGLE_WRITE_DEFAULT 16384
可以通过bufferevent_set_max_single_read(...),bufferevent_set_max_single_write(...)去设置最大值。

evutil_socketpair(...)在Windows上使用可能会被防火墙挡住
在上面的应用例子listener_cb()中,之所以采用定时器的方式去获取,是因为可能Windows的防火墙挡住问题,在linux上可以直接使用pipe或者evutil_socketpair
*/
/**********************************************************************************/
//当一个新的连接到来时,触发该回调被调用
static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    struct sockaddr *sa, int socklen, void *user_data)
{
    /*
    服务器开启多线程时,实际上是只负责监听连接事件,当收到连接时,就会把链接的对象抛给event_base_dispatch()
    所以我们可以看到这个 listener_cb()实际就是在event_base_dispatch()所在的线程中执行的。
    
    为了把读写操作放到另外的线程中执行,我们必须要重新创建 event_base.
    这里面利用一个非常巧妙的设计,就是在新开启的线程中创建一个event_base,并进入自己的事件循环,然后在这个事件循环中,
    注册增加新连接的socket,在这个新的event_base上监听读写事件。

    看evutil_socketpair的源码,说在Windows上,可能因为防火墙问题导致创建失败,那么可以换一个新的方法,就是在新的event_base上
    注册一个微秒级别的永久定时器,在定时器中去创建并注册这个新的fd,方法就是在
    listener_cb里面,把fd存入到某个位置,定时器去取。
    */
    printf("the listener_cb thread id: %d\n", GetCurrentThreadId());
    int idx = fd % CLIENT_THREAD_COUNT;
    send(g_ThreadContext[idx].fdWrite, (const char*)&fd, sizeof(fd), 0);
    return;
}


//向客户端写完数据后发生的一个回调,通常是调用bufferevent_write后发生了回调。
static void conn_writecb(struct bufferevent *bev, void *user_data)
{
    printf("the conn_writecb thread id: %d, bev=%p\n", GetCurrentThreadId(), bev);
    //user_data值为bufferevent_setcb的最后一个参数
    struct evbuffer *output = bufferevent_get_output(bev);
    if (evbuffer_get_length(output) == 0)
    {
        //printf("flushed answer\n");
    }
}

//当收到客户端发送过来的数据时,该函数发生了回调
static void conn_readcb(struct bufferevent *bev, void *user_data)
{
    printf("the conn_readcb thread id: %d, bev=%p\n", GetCurrentThreadId(), bev);
    struct evbuffer *input = bufferevent_get_input(bev);
    size_t len = evbuffer_get_length(input);
    if (!len)
    {
        puts("接收到的数据个数是0");
        return;
    }
    char data[1025] = "";
    size_t size = 0;
    //从缓冲区中读取接收到的数据
    while (0 != (size = bufferevent_read(bev, data, 1024)))
    {
        printf("data=%s, len=%d\n", data, size);
    }
    const char *wData = "send to client!";
    bufferevent_write(bev, wData, strlen(wData) + 1);

    //主动断开与客户端的连接
    //bufferevent_free(bev);
}

static void conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{
    printf("the conn_eventcb thread id: %d, bev=%p\n", GetCurrentThreadId(), bev);
    if (events & BEV_EVENT_EOF)
    {
        printf("Connection closed.\n");
    }
    else if (events & BEV_EVENT_ERROR)
    {
        printf("Got an error on the connection: %s\n", evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
    }
    // None of the other events can happen here, since we haven't enabled timeouts
    bufferevent_free(bev);
}

static void signal_cb(evutil_socket_t sig, short events, void *user_data)
{
    printf("the signal_cb thread id: %d\n", GetCurrentThreadId());
    struct event_base *base = (struct event_base *)user_data;
    struct timeval delay = { 2, 0 };

    printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");

    event_base_loopexit(base, &delay);
}

static void connlistener_errorcb(struct evconnlistener *listener, void *user_data)
{
    printf("the connlistener_errorcb thread id: %d\n", GetCurrentThreadId());
    struct event_base *base = (struct event_base *)user_data;
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

libevent服务端,多线程应用 的相关文章

随机推荐

  • 巴比特

    摘要 之前 由于没法开放注册 中国大模型厂商其实一直束手束脚 而8月31日 北京 上海率先通过了大模型备案 这意味着 跑得快 的企业可以大胆向市场推出产品了 从靠PPT来嘴花花 到放开手脚大干一场 大模型厂商们走向了真正的战场 争用户 抢市
  • LeetCode (二)找出数组中多于半数的数字

    题目 给定一个大小为 n 的数组 找到其中的多数元素 多数元素是指在数组中出现次数 大于 n 2 的元素 解法 自己的解法 思路 for循环遍历存Map 记录key对应的Count 如果大于半数 返回 package com jzj stu
  • React v16.3新生命周期、性能优化及注意事项

    欢迎点击领取 前端面试题进阶指南 前端登顶之巅 最全面的前端知识点梳理总结 React Version 16 3版本对组件的生命周期函数进行了一些修改 在每个react组件中都有以下几个生命周期方法 我们应该掌握最新生命周期 学以致用 以达
  • mciSendString函数简介(播放音乐以及录音相关操作)

    函数功能 播放多媒体音乐 视频等 mciSendString是用来播放多媒体文件的API指令 可以播放MPEG AVI WAV MP3 等等 这个函数有自己的mci指令 可以通过不同的指令实现不同的功能 这里我会详细讲解mciSendStr
  • qemu 启动自定义文件系统命令

    kvm qemu aarch64 bin qemu system aarch64 M virt smp 8 cpu cortex a76 m 4G nographic kernel out kernel arm64 Image append
  • 对以太网粗略理解

    1 以太网定义 以太网 Ethernet 指的是由 Xerox公司创建并由Xerox Intel和 DEC公司联合开发的基带局域网规范 通用的以太网标准于1980年9月30日出台 是当今现有局域网采用的最通用的通信协议标准 是局域网的一种
  • html+css+javascript学习总结

    html用来写页面的结构和内容 css写样式和呈现效果 javascript写行为和动作 1 html常用标签 a 超链接 div 盒子 常用来控制样式的 ul ol 无序列表和有序列表 img 图片标签 button 按钮 form 表单
  • 看懂影片标题,各种电影视频格式标题的含义

    一 资源片源解析 根据命名 可以知道资源的来源 从而判断资源画质的好坏 1 CAM 枪版 珍爱生命 远离枪版 CAM通常是用数码摄像机从电影院盗录 有时会使用小三角架 但大多数时候不可能使用 所以摄像机会抖动 因此我们看到画面通常偏暗 人物
  • 【免费】win7 所有.net framework框架集合,免费下载,若要运行此应用程序,您必须首先安装net framework如何解决

    运行软件缺失框架 若要运行此应用程序 您必须首先安装net framework如何解决 那天我看见网上下载一个框架都要收费还要100大洋 现在真的是干啥都要钱 索性就整理了一个全库供大家下载 做点好事 net 框架所有的 net官网下载地址
  • 摄像机标定到底是在干什么?

    2017年11月13日学习记录 机器视觉 1 摄像机标定概括 刚开始学机器视觉 我研究的方向主要是双目视觉测距 做机器视觉的肯定对摄像机标定并不陌生 我入坑一个月 开始就是看看书 论文 了解了大概流程和研究主要方法 无特别明确目的和压力 然
  • 关于Redis的Zset使用方法

    序言 Zset跟Set之间可以有并集运算 因为他们存储的数据字符串集合 不能有一样的成员出现在一个zset中 但是为什么有了set还要有zset呢 zset叫做有序集合 而set是无序的 zset怎么做到有序的呢 就是zset的每一个成员都
  • Java基础:简单的Runnable 接口创建线程

    创建一个线程 Java 提供了三种创建线程的方法 通过实现 Runnable 接口 通过继承 Thread 类本身 通过 Callable 和 Future 创建线程 为了实现 Runnable 一个类只需要执行一个方法调用 run 声明如
  • 数字图像处理-离散傅里叶变换(opencv3+C++显示)

    参考 http daily zhihu com story 3935067 http blog csdn net keith bb article details 53389819 在学习信号与系统或通信原理等课程里面可能对傅里叶变换有了一
  • 关于局域网、广域网、C/S、P2P编程

    一直以为局域网和广域网的编程没什么不同 实际上确实没什么不同 但这里我要说的是C S和P2P 先说说为局域网编程 局域网编程不用考虑网关 不用考虑NAT 不用考虑消息发送成功后对方将消息丢弃等 这样编程相当简单 只要建立相应的套接口 设置端
  • SQL server基础

    一 SQL Server数据库的数据类型含义 数据类型含义 int 每个数值占用 4字节 2 147 483 648到2 147 483 647之间的整数 smallint 2个字节 存储范围是 32 768 到 32 767 之间的整数
  • Android Studio 红米3 一直运行或者debug不成功,提示 Failed to establish session 解决方案

    换了一个测试机 红米note3开发 一直run OR debug 失败 下面是提示图 找了半天原因 后面发现原因所在了 一般手机默认用开发工具跑起来 会弹出提示 确认是否安装XXX应用 而红米note3就是个奇葩 在它的开发者选项中 有个
  • MATLAB 多目标规划

    作者简介 人工智能专业本科在读 喜欢计算机与编程 写博客记录自己的学习历程 个人主页 小嗷犬的个人主页 个人网站 小嗷犬的技术小站 个人信条 为天地立心 为生民立命 为往圣继绝学 为万世开太平 本文目录 多目标规划 数学模型 正负偏差变量
  • c/c++不定参数函数

    http plutoblog iteye com blog 1150671 不定参数函数 stdarg h是C语言中C标准函数库的头文件 stdarg是由stdandard 标准 arguments 参数 简化而来 主要目的为让函数能够接收
  • WdatePicker日期控件与UEditor富文本编辑器

    WdatePicker日期控件 My97日期控件 下载 更新日志 My97Datepicker Download Changelog 代码中的生日使用插件
  • libevent服务端,多线程应用

    下面的方式是创建多个event base来处理多线程的 主event base用来处理连接请求 各个子event base用来处理读写和关闭请求 另一种方式是 所有的连接 读写 断开操作 都在一个event base里面 然后当读到数据时