ZeroMQ+Protobuf实例

2023-05-16

使用库版本
zeromq-4.0.3

 接收端代码

#include <zmq.h>
#include "stdio.h"

int main(int argc, char * argv[])
{
    void * pCtx = NULL;
    void * pSock = NULL;
    const char * pAddr = "tcp://*:7766";

    //创建context,zmq的socket 需要在context上进行创建 
    if((pCtx = zmq_ctx_new()) == NULL)
    {
        return 0;
    }
    //创建zmq socket ,socket目前有6中属性 ,这里使用dealer方式具体使用方式请参考zmq官方文档 
    if((pSock = zmq_socket(pCtx, ZMQ_DEALER)) == NULL)
    {
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    int iRcvTimeout = 5000;// millsecond
    //设置zmq的接收超时时间为5秒 
    if(zmq_setsockopt(pSock, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    //绑定地址 tcp://*:7766 也就是使用tcp协议进行通信,使用网络端口 7766
    if(zmq_bind(pSock, pAddr) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    printf("bind at : %s\n", pAddr);
    while(1)
    {
        char szMsg[1024] = {0};
        printf("waitting...\n");
        errno = 0;
        //循环等待接收到来的消息,当超过5秒没有接到消息时,zmq_recv函数返回错误信息
        if(zmq_recv(pSock, szMsg, sizeof(szMsg), 0) < 0)
        {
            printf("error = %s\n", zmq_strerror(errno));
            continue;
        }
        printf("received message : %s\n", szMsg);
    }

    return 0;
}

发送端代码

#include <zmq.h>
#include "stdio.h"

int main(int argc, char * argv[])
{
    void * pCtx = NULL;
    void * pSock = NULL;
    //使用tcp协议进行通信,需要连接的目标机器IP地址为192.168.1.2通信使用的网络端口 为7766 
    const char * pAddr = "tcp://192.168.1.2:7766";

    //创建context 
    if((pCtx = zmq_ctx_new()) == NULL)
    {
        return 0;
    }
    //创建socket 
    if((pSock = zmq_socket(pCtx, ZMQ_DEALER)) == NULL)
    {
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    int iSndTimeout = 5000;// millsecond
    //设置接收超时 
    if(zmq_setsockopt(pSock, ZMQ_RCVTIMEO, &iSndTimeout, sizeof(iSndTimeout)) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    //连接目标IP192.168.1.2,端口7766 
    if(zmq_connect(pSock, pAddr) < 0)
    {
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    //循环发送消息 
    while(1)
    {
        static int i = 0;
        char szMsg[1024] = {0};
        snprintf(szMsg, sizeof(szMsg), "hello world : %3d", i++);
        printf("Enter to send...\n");
        if(zmq_send(pSock, szMsg, sizeof(szMsg), 0) < 0)
        {
            fprintf(stderr, "send message faild\n");
            continue;
        }
        printf("send message : [%s] succeed\n", szMsg);
        getchar();
    }

    return 0;
}
编译
gcc -o recv recv.c -lzmq
gcc -o send send.c -lzmq

运行

export LD_LIBRARY_PATH=/usr/local/lib/

接收端

$ ./recv 
bind at : tcp://*:7766
waitting...
received message : hello world :   0
waitting...
received message : hello world :   1
waitting...
received message : hello world :   2
waitting...
received message : hello world :   3
waitting...
received message : hello world :   4
waitting...
received message : hello world :   5
waitting...

 

发送端

$ ./send 
Enter to send...
send message : [hello world :   0] succeed

Enter to send...
send message : [hello world :   1] succeed

Enter to send...
send message : [hello world :   2] succeed

Enter to send...
send message : [hello world :   3] succeed

Enter to send...
send message : [hello world :   4] succeed

Enter to send...
send message : [hello world :   5] succeed

 

UserInfo.proto,适⽤用于protobuf3的协议定义⽂文件,该协议包含两部分:

Part-1:Server/Client消息定义,⽤用于Client主动请求Server的协议内容;

Part-2:Publish/Subscribe消息定义,⽤用于Publisher主动发布数据给Subscriber的协议内容,其中每条消息对应的topic参考其注释;

syntax = "proto3";
package UserInfo.proto;
/* ----------Part-1 : Server/Client消息定义---------- */
/* 请求类型枚举 */
enum RequestType {
UNKNOWN_REQUEST_TYPE = 0;
MAKE_I_CALL = 1; // 拨打 I-CALL

}

/* ----------Part-2 :Publish/Subscribe消息定义---------- */
//topic:"user_id"
message UserID {
string user_id = 1;
}

 

代码⽣生成

$ protoc --cpp_out=./ UserInfo.proto # ⽣生成 C++ 代码

UserInfo.pb.h和UserInfo.pb.cc
$ protoc --java_out=./ UserInfo.proto # ⽣生成 Java 代码

 

其他资料

https://gitee.com/solym/ZeroMQ-Guide-Zh

https://github.com/booksbyus/zguide

ZMQ接口文档的官方网站 : http://api.zeromq.org/

ZMQ接口文档的百度网盘下载地址(英文):http://pan.baidu.com/s/1jGDqXfS

 

● zmq - 0MQ 轻量级消息传输内核

● zmq_bind - 绑定一个socket

● zmq_close - 关闭ZMQ socket

● zmq_connect - 由一个socket创建一个对外连接

● zmq_ctx_destroy - 销毁一个ZMQ环境上下文

● zmq_ctx_get - 得到环境上下文的属性

● zmq_ctx_new – 创建一个新的ZMQ 环境上下文

● zmq_ctx_set - 设置环境上下文属性

● zmq_ctx_shutdown - 停止一个ZMQ context

● zmq_socket_monitor - 注册一个监控回调函数

● zmq_ctx_term - 终结一个ZMQ环境上下文

● zmq_curve – 安全的认证方式和保密方式

● zmq_curve_keypair - 生成一个新的CURVE 密钥对

● zmq_disconnect - 断开一个socket的连接

● zmq_errno – 返回errno的值给调用此函数的线程

● zmq_init - 初始化ZMQ环境上下文 (已弃用)

● zmq_z85_decode – 从一个用Z85算法生成的文本中解析出二进制密码 

● zmq_z85_encode – 使用Z85算法对一个二进制秘钥进行加密,输出可打印的文本

● zmq_version – 返回ZMQ链接库的版本

● zmq_unbind - 停止连接外来的请求

● zmq_plain - 明文认证

● zmq_null - 无安全和加密

● zmq_msg_more - 指出是不是还有更多的消息部分可以接收

● zmq_msg_init - 初始化一个空的ZMQ消息结构

● zmq_msg_init_data - 从一个指定的存储空间中初始化一个ZMQ消息对象的数据

● zmq_msg_init_size - 使用一个指定的空间大小初始化ZMQ消息对象

● zmq_msg_move - 将一个消息里面的内容移动到另一个消息里面

● zmq_msg_copy - 把一个消息的内容复制到另一个消息中

● zmq_msg_data - 返回消息内容的指针

● zmq_msg_get - 获取消息的性质

● zmq_msg_set - 设置消息的性质

● zmq_msg_size - 以字节为单位返回消息内容的大小

● zmq_msg_recv - 从一个socket中接受一个消息帧

● zmq_msg_close – 释放一个ZMQ消息

● zmq_msg_send – 从一个socket发送一个消息帧

● zmq_term - 终结ZMQ环境上下文(context)(已弃用)

● zmq_strerror - 获取ZMQ错误描述字符串

● zmq_poll - I/O多路技术

● zmq_tcp – 使用TCP协议的ØMQ网络单播协议

● zmq_recv – 从一个socket上接收一个消息帧

● zmq_send – 在一个socket上发送一个消息帧

● zmq_proxy – 开始ZMQ内置代理

● zmq_recvmsg – 从一个socket上接收一个消息帧 (已弃用)

● zmq_sendmsg – 从一个socket上发送一个消息帧 (已弃用)

● zmq_ipc – ZMQ本地进程间通信传输协议

● zmq_proxy_steerable – 以STOP/RESUME/TERMINATE控制方式开启内置的ZMQ代理

● zmq_inproc – ØMQ 本地进程内(线程间)传输方式

● zmq_pgm – ØMQ 使用PGM 进行可靠的多路传输

● zmq_send_const – 从一个socket上发送一个固定内存数据

● zmq_socket – 创建ZMQ套接字

● zmq_setsockopt –设置ZMQ socket的属性

● zmq_getsockopt – 获取ZMQ socket的属性

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ZeroMQ+Protobuf实例 的相关文章

  • 无回复请求的 ZMQ 模式

    我使用 ZMQ 允许客户端连接到服务器并向其发送命令 命令传入的频率很高 不需要任何回复 我正在考虑使用 REQ REP 套接字 但发送空回复感觉很浪费 我不想使用 PUB SUB 或 PUSH PULL 因为我希望客户端启动连接 是否有比
  • ZeroMQ (clrzmq4) 轮询问题

    我想要完成的是实现从两个套接字之一读取消息 无论消息首先到达何处 据我了解轮询 zmq poll 是正确的做法 如指南中的 mspoller http zguide zeromq org cs mspoller 在这里我将提供小的伪代码片段
  • Linux重启后nodejs消失了

    我刚刚使用 nvm 安装了 nodejs 版本 0 10 14 它安装成功 同样在安装nodejs之前 我安装了zeromq版本2 2 0 出于测试目的 我尝试运行下面的基本酒吧示例 var zmq require zmq var sock
  • 如何使用 Zeromq 的 inproc 和 ipc 传输?

    我是 ZERMQ 的新手 ZeroMQ 具有 TCP INPROC 和 IPC 传输 我正在寻找在 Winx64 和 python 2 7 中使用 python 和 inproc 的示例 这些示例也可以用于 Linux 另外 我一直在寻找
  • Python 多处理问题?

    我有一个包含 500 个输入文件的文件夹 所有文件的总大小约为 500 MB 我想写一个python执行以下操作的脚本 1 将所有输入文件加载到内存中 2 初始化一个空的python稍后将使用的列表 参见项目符号 4 3 启动 15 个不同
  • ipc:// 上的 ZeroMQ REQ/REP 和并发性

    我使用 REQ REP 0MQ ipc 套接字实现了一个 JSON RPC 服务器 我遇到了奇怪的行为 我怀疑这是由于 ipc 底层 unix 套接字不是真正的套接字 而是一个事实一根管子 根据文档 必须强制执行严格的 zmq send z
  • NODE_MODULE_VERSION 46。此版本的 Node.js 需要 NODE_MODULE_VERSION 64。请尝试重新编译或重新安装

    我正在尝试执行提供给我的节点应用程序 它应该可以正常工作 我已尝试运行它 但无法修复此错误 seba vps92941 services drivetech node awto js home seba services drivetech
  • Python ZeroMQ PUSH/PULL——丢失消息?

    我正在尝试使用python with zeroMQ in PUSH PULL模式 发送大小的消息4 MB 每隔几秒钟 由于某种原因 虽然看起来所有消息都已发送 但服务器似乎只收到了其中一些消息 我在这里缺少什么 这是客户端的代码 clien
  • 如何使用 ZeroMQ 处理原始 UDP?

    我有一个客户 我无法更改其代码 但我想使用 重新 编写ZeroMQ插座 客户使用原始TCP和原始的UDP插座 我知道我可以使用ZMQ ROUTER RAW对于生的TCP插座 但是原始的怎么样 UDP数据流 ZeroMQ 中对 UDP 的支持
  • clrzmq 在 Xamarin Studios/C# 应用程序中找不到 libzmq

    我在 Mac 上使用 Xamarin Studio clrzmq通过 NuGet 包含 libzmq dll 上的 clrzmq 引用 我的应用程序编译得很好 但是当我尝试运行它时 我得到了这个 Unhandled Exception Sy
  • ZeroMQ套接字在什么情况下会丢弃或无法传递消息?

    是否有某种规范或其他解释来描述正常情况 对于每种通信类型 您可以预期在 ZeroMQ 套接字上发送的消息不会被 所有 侦听进程接收 例如 我有一个实验程序 它基本上假设所有订阅者PUB套接字接收在该套接字上发送的所有消息 在初始化握手之后
  • 如何将扩展 PUB-SUB 模式中的发布者和订阅者与 C++ 中 ZeroMQ 中的中介同步?

    Extended PUB SUB topology https i stack imgur com GEgpx png 我在一个有 1 个中介的用例中有多个发布者和多个订阅者 在 ZeroMQ 指南中 我了解了如何使用额外的方法来同步 1
  • 创建的线程数超出预期

    你可以找到该程序here https pastebin com H5fq732a 我正在消息传递框架 0MQ 中构建一个程序 我尝试执行我发布的内容here https stackoverflow com questions 4409620
  • REQ/REP 模式中的 ZeroMQ FiniteStateMachineException

    我有两个简单的组件 它们应该使用 REQ REP ZeroMQ 模式相互通信 服务器 REP Socket 是使用 pyzmq 在 Python 中实现的 import zmq def launch server print Launchi
  • ZeroMQ可以用来接受传统的套接字请求吗?

    我正在尝试使用 ZeroMQ 重写我们的旧服务器之一 现在我有以下服务器设置 适用于 Zmq 请求 using var context ZmqContext Create using var server context CreateSoc
  • 带有 epgm 的 ZeroMQ PUB/SUB 无法接收同一主机上进程发送的消息

    我的所有进程都有两个套接字 一个 PUB 和一个 SUB 并且它们都使用相同的多播地址和端口 例如 PUB 会这样做 绑定 epgm 239 192 1 1 5555 SUB 将执行以下操作 连接 epgm 239 192 1 1 5555
  • ZeroMQ 在 python 多处理类/对象解决方案中挂起

    我正在尝试将 Python pyzmq 中的 ZeroMQ 与多处理一起使用 作为一个最小的 不是 工作示例 我有一个服务器类和一个客户端类 它们都继承自multiprocessing Process 客户端作为子进程应向服务器子进程发送消
  • 在轮询器内异步运行代码

    在我的 ruby 脚本中 我使用 celluloid zmq gem 我尝试在轮询器内异步运行评估响应 使用 async evaluate response socket read multipart 但是 如果我从循环中删除睡眠 不知何故
  • 为什么/何时使用 DDS 而不是 ZeroMQ? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我读了以下内容 DDS AMQP ZeroMQ https stackoverflow com questions 3202521 dds vs
  • 无法加载动态库 php_zmq.dll - 找不到指定的模块

    I have XAMPP含 PHP 版本 PHP 7 1 13 cli 构建时间 2018 年 1 月 3 日 20 16 04 ZTS MSVC14 Visual C 2015 x86 我想安装 ZeroMQ 扩展 它 我从这里下载的ht

随机推荐