【开源之美】nanomsg(2) :req/rep 模式

2023-11-08

req/rep 模式显然就是类似http的应答模式。在某些基于短连接的进程间通讯方式上可以很方便的使用。下面我们举个例子:

服务端:demo


#ifndef NANOMSGUTIL_H
#define NANOMSGUTIL_H

#include "messageDispatch.h"
#include "thread/nthread.h"

class NanomsgServer : public QThread
{
public:
    NanomsgServer(const QString url = "tcp://127.0.0.1:5555");

    int NanoServer();

    virtual void run() override final;

    int process();

    void stop();

private:
    QString m_url;
    bool m_stopFlag = false;
    MessageDispatch m_dispatcher; /// 消息分发处理
};

#endif

#include "nanomsgServer.h"
#include <NLog>
#include <QJsonDocument>
#include <QJsonObject>
#include <QJsonArray>

/*
    Copyright 2016 Garrett D'Amore <garrett@damore.org>

    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"),
    to deal in the Software without restriction, including without limitation
    the rights to use, copy, modify, merge, publish, distribute, sublicense,
    and/or sell copies of the Software, and to permit persons to whom
    the Software is furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included
    in all copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
    IN THE SOFTWARE.

    "nanomsg" is a trademark of Martin Sustrik
*/

/*  This program serves as an example for how to write an async RPC service,
    using the RAW request/reply pattern and nn_poll.  The server receives
    messages and keeps them on a list, replying to them.

    Our demonstration application layer protocol is simple.  The client sends
    a number of milliseconds to wait before responding.  The server just gives
    back an empty reply after waiting that long.

    To run this program, start the server as async_demo <url> -s
    Then connect to it with the client as async_client <url> <msec>.

    For example:

    % ./async_demo tcp://127.0.0.1:5555 -s &
    % ./async_demo tcp://127.0.0.1:5555 323
    Request took 324 milliseconds.
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#ifdef WIN32
#include <windows.h>
#include <winsock.h>
#else
#include <sys/time.h>
#endif

#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>

/*  MAXJOBS is a limit on the on the number of outstanding requests we
    can queue.  We will not accept new inbound jobs if we have more than
    this queued.  The reason for this limit is to prevent a bad client
    from consuming all server resources with new job requests. */

#define MAXJOBS 100
#define MAXLENS 10*1024

/*  The server keeps a list of work items, sorted by expiration time,
    so that we can use this to set the timeout to the correct value for
    use in poll.  */
struct work {
    struct work *next;
    struct nn_msghdr request;
    uint64_t expire;
    void *control;
};


#ifdef WIN32
int gettimeofday(struct timeval *tp, void *tzp)
{
    time_t clock;
    struct tm tm;
    SYSTEMTIME wtm;
    GetLocalTime(&wtm);
    tm.tm_year   = wtm.wYear - 1900;
    tm.tm_mon   = wtm.wMonth - 1;
    tm.tm_mday   = wtm.wDay;
    tm.tm_hour   = wtm.wHour;
    tm.tm_min   = wtm.wMinute;
    tm.tm_sec   = wtm.wSecond;
    tm. tm_isdst  = -1;
    clock = mktime(&tm);
    tp->tv_sec = clock;
    tp->tv_usec = wtm.wMilliseconds * 1000;
    return (0);
}
#endif


/*  Return the UNIX time in milliseconds.  You'll need a working
    gettimeofday(), so this won't work on Windows.  */
uint64_t milliseconds (void)
{
    struct timeval tv;
    gettimeofday (&tv, NULL);
    return (((uint64_t)tv.tv_sec * 1000) + ((uint64_t)tv.tv_usec / 1000));
}


NanomsgServer::NanomsgServer(const QString url)
{
    m_url = url;
}

/*  The server runs forever. */
void NanomsgServer::run()
{
    INFO_PRINT_LINE << "start service thread.";

    int fd;
    struct work *worklist = NULL;
    int npending = 0;

    /*  Create the socket. */
    fd = nn_socket(AF_SP, NN_REP);
    if (fd < 0) {
        fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
        return ;
    }

    /*  Bind to the URL.  This will bind to the address and listen
        synchronously; new clients will be accepted asynchronously
        without further action from the calling program. */

    if (nn_bind (fd, m_url.toStdString().data()) < 0) {
        fprintf (stderr, "nn_bind: %s\n", nn_strerror (nn_errno ()));
        nn_close (fd);
        return ;
    }

    /*  Main processing loop. */
    while(!m_stopFlag){

        void *buf = NULL;
        int nbytes = nn_recv (fd, &buf, NN_MSG, 0);
        if (nbytes < 0) {

            fprintf (stderr, "nn_recv: %s\n",nn_strerror (nn_errno ()));
            nn_freemsg (buf);

            continue;
        }

        char* request = NULL;
        request = (char*)malloc(nbytes+1);
        //memcpy((void*)request,buf,nbytes);
        strncpy(request,(const char*)buf,nbytes);
        request[nbytes] = '\0';
        QByteArray ba = QByteArray(request).trimmed();

        //INFO_PRINT_LINE << (char*)buf << nbytes;
        INFO_PRINT_LINE << request << strlen(request);

        /// message dispatch
        QJsonDocument loadDoc(QJsonDocument::fromJson(ba));
        QJsonObject dataObj = loadDoc.object();

		/// deal message
        QString responce = m_dispatcher.deal(QString(request));

        // responce to client
        const char *d = responce.toUtf8().constData();

        int sz_d = strlen(d) + 1; // '\0' too
        nbytes = nn_send (fd, d, sz_d, 0);

        assert (bytes == sz_d);

        INFO_PRINT_LINE << "[responce]  " << d << nbytes;

        free(request);
        nn_freemsg (buf);
    }

    nn_close (fd);
    return;
}

void NanomsgServer::stop()
{
    INFO_PRINT_LINE << "stop";

    if (QThread::isRunning())
    {
        INFO_PRINT_LINE << "stop";

        m_stopFlag = true;
        QThread::quit();
        QThread::wait();
    }
}

客户端:demo

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#ifdef WIN32
#include <windows.h>
#include <winsock.h>
#else
#include <sys/time.h>
#endif

#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
#define DEFAULT_URL             "tcp://127.0.0.1:5555"
#define DEFAULT_BUFFER_SIZE     (10*1024)

char npi_appId[32] = {0};

/*************************  Log Module *******************************/
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

enum {
    LL_NOTICE 	= 1, 	//一般输出
    LL_WARNING 	= 2, 	//告警输出
    LL_TRACE 	= 3,	//追踪调试
    LL_DEBUG 	= 4,	//软件bug
    LL_FATAL 	= 5     //致命错误
};

#define Print_NOTICE(log_fmt,...) \
    do{ \
    printf("L(%d)[%s:%d][%s]:  "log_fmt"\n", LL_NOTICE,__FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \
    }while (0)

#define Print_WARN(log_fmt,...) \
    do{ \
    printf("L(%d)[%s:%d][%s]:  "log_fmt"\n", LL_WARNING, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \
    }while (0)

#define Print_TRACE(log_fmt,...) \
    do{ \
    printf("L(%d)[%s:%d][%s]:  "log_fmt"\n", LL_TRACE,__FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \
    }while (0)

#define Print_DEBUG(log_fmt,...) \
    do{ \
    printf("L(%d)[%s:%d][%s]:  "log_fmt"\n", LL_DEBUG, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \
    }while (0)

#define Print_FATAL(log_fmt,...) \
    do{ \
    printf("L(%d)[%s:%d][%s]:  "log_fmt"\n",LL_FATAL, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \
    }while (0)


int NanoClientRequest(const char *url , const char* request, long len,char* result);

/*************************  nanomsg client  *******************************/
#define MAXJOBS 100
#define MAXLENS 10*1024

struct work {
    struct work *next;
    struct nn_msghdr request;
    uint64_t expire;
    void *control;
};


#ifdef WIN32
int gettimeofday(struct timeval *tp, void *tzp)
{
    time_t clock;
    struct tm tm;
    SYSTEMTIME wtm;
    GetLocalTime(&wtm);
    tm.tm_year   = wtm.wYear - 1900;
    tm.tm_mon   = wtm.wMonth - 1;
    tm.tm_mday   = wtm.wDay;
    tm.tm_hour   = wtm.wHour;
    tm.tm_min   = wtm.wMinute;
    tm.tm_sec   = wtm.wSecond;
    tm. tm_isdst  = -1;
    clock = mktime(&tm);
    tp->tv_sec = clock;
    tp->tv_usec = wtm.wMilliseconds * 1000;
    return (0);
}
#endif

uint64_t milliseconds (void)
{
    struct timeval tv;
    gettimeofday (&tv, NULL);
    return (((uint64_t)tv.tv_sec * 1000) + ((uint64_t)tv.tv_usec / 1000));
}


/*  The client runs just once, and then returns. */
int NanoClientRequest (const char *url, const char* request, long len, char *result)
{
    int fd;
    int rc;

    fd = nn_socket (AF_SP, NN_REQ);
    if (fd < 0) {
        fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
        return (-1);
    }

    if (nn_connect (fd, url) < 0) {
        fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
        nn_close (fd);
        return (-1);
    }

    if (nn_send (fd, request, len , 0) < 0) {
        fprintf (stderr, "nn_send: %s\n", nn_strerror (nn_errno ()));
        nn_close (fd);
        return (-1);
    }

    void* buf = NULL;
    rc = nn_recv (fd, &buf, NN_MSG , 0);
    if (rc < 0) {
        fprintf (stderr, "nn_recv: %s\n", nn_strerror (nn_errno ()));
        nn_close (fd);
        return (-1);
    }
    
    Print_TRACE("[recv rep]: %d  %s",rc,buf);

    memcpy((void*)result,buf,rc);

    nn_freemsg (buf);

    nn_shutdown (fd, 0);

    return 0;
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【开源之美】nanomsg(2) :req/rep 模式 的相关文章

  • 向进度条添加百分比文本 C#

    我有一个方法可以显示进程栏何时正在执行以及何时成功完成 我工作得很好 但我想添加一个百分比 如果完成 则显示 100 如果卡在某个地方 则显示更少 我在网上做了一些研究 但我无法适应我正在寻找的解决方案 这是我的代码 private voi
  • 使用 std::packaged_task/std::exception_ptr 时,线程清理程序报告数据争用

    我遇到了线程清理程序 TSan 的一些问题 抱怨某些生产代码中的数据争用 其中 std packaged task 通过将它们包装在 std function 中而移交给调度程序线程 对于这个问题 我简化了它在生产中的作用 同时触发 TSa
  • 计算 Richtextbox 中所有单词的最有效方法是什么?

    我正在编写一个文本编辑器 需要提供实时字数统计 现在我正在使用这个扩展方法 public static int WordCount this string s s s TrimEnd if String IsNullOrEmpty s re
  • 提交后禁用按钮

    当用户提交付款表单并且发布表单的代码导致 Firefox 中出现重复发布时 我试图禁用按钮 去掉代码就不会出现这个问题 在firefox以外的任何浏览器中也不会出现这个问题 知道如何防止双重帖子吗 System Text StringBui
  • 在 DataView 的 RowFilter 中选择 DISTINCT

    我试图根据与另一个表的关系缩小 DataView 中的行范围 我使用的 RowFilter 如下 dv new DataView myDS myTable id IN SELECT DISTINCT parentID FROM myOthe
  • 复制 std::function 的成本有多高?

    While std function是可移动的 但在某些情况下不可能或不方便 复制它会受到重大处罚吗 它是否可能取决于捕获变量的大小 如果它是使用 lambda 表达式创建的 它依赖于实现吗 std function通常被实现为值语义 小缓
  • 使用 LINQ2SQL 在 ASP.NET MVC 中的各种模型存储库之间共享数据上下文

    我的应用程序中有 2 个存储库 每个存储库都有自己的数据上下文对象 最终结果是我尝试将从一个存储库检索到的对象附加到从另一个存储库检索到的对象 这会导致异常 Use 构造函数注入将 DataContext 注入每个存储库 public cl
  • 由 IHttpClientFactory 注入时模拟 HttpClient 处理程序

    我创建了一个自定义库 它会自动为依赖于特定服务的 Polly 策略设置HttpClient 这是使用以下方法完成的IServiceCollection扩展方法和类型化客户端方法 一个简化的例子 public static IHttpClie
  • 是否有实用的理由使用“if (0 == p)”而不是“if (!p)”?

    我倾向于使用逻辑非运算符来编写 if 语句 if p some code 我周围的一些人倾向于使用显式比较 因此代码如下所示 if FOO p some code 其中 FOO 是其中之一false FALSE 0 0 0 NULL etc
  • 如何检测表单的任何控件的变化?

    如何检测 C 中表单的任何控件的更改 由于我在一个表单上有许多控件 并且如果表单中的任何控件值发生更改 我需要禁用按钮 我正在寻找一些内置函数 事件处理程序 属性 并且不想为此创建自定义函数 不 我不知道任何时候都会触发任何事件any控制表
  • 动态添加 ASP.Net 控件

    我有一个存储过程 它根据数据库中存储的记录数返回多行 现在我想有一种方法来创建 div 带有包含该行值的控件的标记 如果从数据库返回 10 行 则 10 div 必须创建标签 我有下面的代码来从数据库中获取结果 但我不知道如何从这里继续 S
  • 为什么 gcc 抱怨“错误:模板参数 '0' 的类型 'intT' 取决于模板参数”?

    我的编译器是gcc 4 9 0 以下代码无法编译 template
  • C++ 函数重载类似转换

    我收到一个错误 指出两个重载具有相似的转换 我尝试了太多的事情 但没有任何帮助 这是那段代码 CString GetInput int numberOfInput BOOL clearBuffer FALSE UINT timeout IN
  • 按 Esc 按键关闭 Ajax Modal 弹出窗口

    我已经使用 Ajax 显示了一个面板弹出窗口 我要做的是当用户按 Esc 键时关闭该窗口 这可能吗 如果有人知道这一点或以前做过这一点 请帮助我 Thanks 通过以下链接 您可以通过按退出按钮轻松关闭窗口 http www codepro
  • 不同类型指针之间的减法[重复]

    这个问题在这里已经有答案了 我试图找到两个变量之间的内存距离 具体来说 我需要找到 char 数组和 int 之间的距离 char data 5 int a 0 printf p n p n data 5 a long int distan
  • 调用堆栈中的“外部代码”是什么意思?

    我在 Visual Studio 中调用一个方法 并尝试通过检查调用堆栈来调试它 其中一些行标记为 外部代码 这到底是什么意思 方法来自 dll已被处决 外部代码 意味着该dll没有可用的调试信息 你能做的就是在Call Stack窗口中单
  • 如果没有抽象成员,基类是否应该标记为抽象?

    如果一个类没有抽象成员 可以将其标记为抽象吗 即使没有实际理由直接实例化它 除了单元测试 是的 将不应该实例化的基类显式标记为抽象是合理且有益的 即使在没有抽象方法的情况下也是如此 它强制执行通用准则来使非叶类抽象 它阻止其他程序员创建该类
  • 我的班级应该订阅自己的公共活动吗?

    我正在使用 C 3 0 遵循标准事件模式我有 public event EventHandler
  • 如何从 ODBC 连接获取可用表的列表?

    在 Excel 中 我可以转到 数据 gt 导入外部数据 gt 导入数据 然后选择要使用的数据源 然后在提供登录信息后 它会给我一个表格列表 我想知道如何使用 C 以编程方式获取该列表 您正在查询什么类型的数据源 SQL 服务器 使用权 看
  • 如何将 PostgreSql 与 EntityFramework 6.0.2 集成? [复制]

    这个问题在这里已经有答案了 我收到以下错误 实体框架提供程序类型的 实例 成员 Npgsql NpgsqlServices Npgsql 版本 2 0 14 2 文化 中性 PublicKeyToken 5d8b90d52f46fda7 没

随机推荐

  • 奇安信远程技术支持实习面试总结

    1 自我介绍 2 问项目 3 ssl vpn 和ipsec vpn 区别 4 Vxlan的用途和作用 5 跨专业就业的原因 6 Linux常用命令 查询内存的命令 7 交换机两种接口模式 8 Vrrp协议 9 Ospf五类数据包 10 对于
  • erp系统服务器电脑配置,erp软件服务器电脑配置

    erp软件服务器电脑配置 内容精选 换一换 Atlas 200 DK开发者板支持通过USB端口或者网线与Ubuntu服务器进行连接 连接示例图如图1所示 Atlas 200 DK连接Ubuntu服务器有以下场景 使用USB连接线通过USB端
  • 仿百度页面制作html+css+js动态页面

    仿百度页面制作html css js动态页面
  • java stream 两个List<Map>合并

    new三条源数据 value值均为一个字 加入list Map
  • C++笔记(随时更新)

    一 string key str sort key begin key end 二 emplace back优于push back 能就地通过参数构造对象 不需要拷贝和移动内存 提升容器插入性能 三 swap nums i nums i 1
  • Python表白代码合集:用完这几种表白代码,找不到对象你来找我,这也太秀了吧❤️

    明天就七夕了 谁说程序员不懂浪漫 今天给大家分享几种有意思的表白代码 带你用Python 码 上七夕 话不多说 我们直接上代码 第一种 表白弹窗 先看效果 文字背景啥的 大家可以自定义一下 代码展示 20行代码实现弹窗 import tki
  • 特征工程(1)--特征工程是什么?

    机器学习领域的大神Andrew Ng 吴恩达 老师曾说 Coming up with features is difficult time consuming requires expert knowledge Applied machin
  • 乐高编程机器人编程有什么区别

    乐高编程机器人编程有什么区别 一直以来家长们对于孩子的学习重视程度可谓是相当的大 很多的家长会给孩子选择一些能够让孩子适应社会发展的课程 就拿现在很多的家长想要孩子去学习机器人编程的课程来说 有的家长对于乐高编程机器人编程有什么区别并不清楚
  • 【Xgplayer】xgplayer基本使用

    文章目录 xgplayer简介 xgplayer官网 Xgplayer VS VideoJs xgplayer下载 播放器组件 使用播放器 效果图 推荐 xgplayer简介 开发团队 字节跳动 字节跳动出品 必属精品 xgplayer是一
  • VS2022编译GDAL库报错: LINK : error LNK2001: 无法解析的外部符号 _OSRValidate _OGR_G_GetPointCount _OGRRegisterAll

    目录 场景复现 解决方案 场景复现 使用VS2022的Native Tools command prompt for 2022工具编译GDAL库时 报 LINK error LNK2001 无法解析的外部符号 OSRValidate OGR
  • Pytorch入门实战(5):基于nn.Transformer实现机器翻译(英译汉)

    使用Google Colab运行 open In Colab 源码地址 文章目录 本文涉及知识点 本文内容 环境配置 数据预处理 文本分词与构造词典 Dataset and Dataloader 模型构建 模型训练 模型推理 本文涉及知识点
  • 数据库并发控制 事务调度 可串行调度

    所谓并发操作 是指在多用户共享的系统中 许多用户可能同时对同一数据进行操作 所带来的问题是数据的不一致性 具体表现为 丢失更新 不可重复读 读脏数据 1 事务调度 1 1 串行调度 Serial Schedule 是指多个事务依序串行执行
  • 华为手机上的网上邻居怎么用_HUAWEI Mate 8 网络邻居 使用教程

    本帖最后由 爱奔跑的蜗牛 于 2016 1 19 23 54 编辑 有根数据线 手机连接电脑传输管理文件算不上什么秘密 但总有那么一两天 忘记带数据线 又急需拷贝电脑文件到手机上 除了问别人借数据线 难道就不能 自力更生 了吗 当然不是 拥
  • 【华为OD机试真题 python】通信误码【2022 Q4

    题目描述 通信误码 信号传播过程中会出现一些误码 不同的数字表示不同的误码ID 取值范围为1 65535 用一个数组记录误码出现的情况 每个误码出现的次数代表误码频度 请找出记录中包含频度最高误码的最小子数组长度 输入描述 误码总数目 取值
  • Python编程的注意事项

    目录 一 异常处理 1 精细化地捕获异常 2 finally 块中的资源清理 3 抛出自定义异常 二 类的继承 1 不要过度使用继承 2 了解多重继承的问题 三 垃圾回收与内存管理 1 对象引用计数的概念 2 循环引用的问题 Python
  • Kubernetes中的PV和PVC

    K8s引入了一组叫作Persistent Volume Claim PVC 和Persistent Volume PV 的API对象 大大降低了用户声明和使用持久化Volume的门槛 在Pod的Volumes中 只要声明类型是persist
  • [1080]idea import引用报错

    从GIT上拉下代码后 出现这种情况 类正常 但是import是浅灰色 引用类有红色警告 代码中所有的引用都报错 重启idea 无效 删除引用的类与被引用的类中的代码 无效 重新加载maven 无效 最后 清理缓存后 恢复正常 File gt
  • 硬件系统工程师宝典(29)-----应用DC/DC要注意什么?

    各位同学大家好 欢迎继续做客电子工程学习圈 今天我们继续来讲这本书 硬件系统工程师宝典 上篇我们说到使用LDO时 除了要考虑输入 输出电压外 还要注意压差 最大输出电流等 今天我们来讲讲DC DC的应用分析 DC DC分类 将一个不受控的输
  • 台式电脑重装系统失败怎么办

    当大家使用一键重装系统软件给自己电脑重装系统的时候 都可能会遇到一些故障问题造成台式电脑重装系统失败的情况发生 那么大家遇到台式电脑重装系统失败怎么办呢 现在小编就教下大家相关的方法教程 大家一起来看看吧 工具 原料 系统版本 window
  • 【开源之美】nanomsg(2) :req/rep 模式

    req rep 模式显然就是类似http的应答模式 在某些基于短连接的进程间通讯方式上可以很方便的使用 下面我们举个例子 服务端 demo ifndef NANOMSGUTIL H define NANOMSGUTIL H include