使用Zeromq和protobuf实现的socket通信

2023-05-16

本文介绍使用ZeroMQ(下文简称ZMQ),结合protobuf序列化实现客户端和服务端的通信。在之前的一篇文章中(http://blog.csdn.net/cjf_wei/article/details/52894560)介绍了Google的protobuf序列化的使用,以及结合unix环境的socket编程实现简单的客户端到服务端的通信。在接触了zmq之后,尝试使用这个“极速消息通信库”来重构之前的实现。
ZMQ是iMatix开发的以消息为导向的开源中间件库,它类似于Berkeley套接字,它支持多种传输协议,它小巧、简单,但速度足够快,可以用作一个并发框架。它支持多种模式的传输,不管是客户端到服务端的1:1关系,还是M:N关系,亦或是订阅/发布它都能轻松应对。本文使用客户端到服务端的1:1的应答模式。

Zmq应答模式的基本使用
使用ZMQ进行通信,首先要创建一个上下文环境,然后使用它创建套接字。

void *context = zmq_ctx_new();//创建上下文

客户端和服务端使用的socket类型并不一样。

void *requester = zmq_socket(context, ZMQ_REQ); //for client
void *responder = zmq_socket(context, ZMQ_REP); //for server

随后服务端将socket绑定到一个周知的地址和端口

zmq_bind(responder,"tcp://*:5555");

而客户端则要尝试连接到服务端提供的地址

zmq_connect(requester,"tcp://localhost:5555");

要把数据写入消息需要使用zmq_msg_init_size()来初始化消息,而读取消息由于未知消息的长度只能使用zmq_msg_init()来创建一个空的消息。
消息初始化后,发送消息使用zmq_send_send(),接收消息则使用zmq_msg_recv();
访问消息可以使用zmq_msg_data(),要想知道消息的大小可以使用zmq_msg_size();

最后需要关闭套接字,并销毁上下文。

zmq_close(&requester);    //关闭套接字
zmq_ctx_destroy(context); //销毁上下文

protobuf的使用请参考(http://blog.csdn.net/cjf_wei/article/details/52894560),在此不再赘述。

代码实现

  • 客户端
//for client
#include <iostream>
#include <string>
//for protobuf
#include "Test.pb.h" 
//for zmq
#include <zmq.h>

using namespace std;
using namespace Test::protobuf ;

const int BUFFSIZE = 128;

int main()
{
    //socket通信所需的上下文环境
    void *context = zmq_ctx_new();
    //根据context建立的socket的链接,客户端使用ZMQ_REQ套接字
    void *requester = zmq_socket(context, ZMQ_REQ);
    if( -1 == zmq_connect(requester,"tcp://localhost:5555"))
    {
        cout<<"Connect to server failed..."<<endl;
        zmq_ctx_destroy(context);
        return -1;
    }
    cout<<"Connect to server success..."<<endl;

    HeartInfo myprotobuf;
    while(1)
    {
        myprotobuf.set_type("client");
        myprotobuf.set_ip("192.168.1.100");
        myprotobuf.set_port(5555);

        char buff[BUFFSIZE];
        myprotobuf.SerializeToArray(buff,BUFFSIZE);

        //客户端发送请求
        int len = strlen(buff);
        zmq_msg_t req;
        if(0 != zmq_msg_init_size(&req,len))
        {
            cout<<"zmq_msg_init failed..."<<endl;
            break;
        }
        memcpy(zmq_msg_data(&req),buff,len);
        if(len != zmq_msg_send(&req,requester,0))
        {
            zmq_msg_close(&req);
            cout<<"send faliled..."<<endl;
            break;
        }
        //成功发送后,在控制台打印发送消息的内容
        cout<<"Type:"<<myprotobuf.type()<<"\t"
            <<"IP:"<<myprotobuf.ip()<<"\t"
            <<"Port:"<<myprotobuf.port()<<"\n";
        zmq_msg_close(&req);

        //清空发送缓存
        memset(buff,0,BUFFSIZE*sizeof(char));

        //客户端接收来自服务端的相应
        zmq_msg_t reply;
        zmq_msg_init(&reply);
        int size = zmq_msg_recv(&reply,requester,0);
        memcpy(buff,zmq_msg_data(&reply),size);
        HeartInfo receive;
        receive.ParseFromArray(buff,BUFFSIZE);
        cout<<"Type:"<<receive.type()<<"\t"
            <<"IP:"<<receive.ip()<<"\t"
            <<"Port:"<<receive.port()<<"\n";
        zmq_msg_close(&reply);
    }
    zmq_close(&requester);
    zmq_ctx_destroy(context);
    return 0;
}
  • 服务端
#include <iostream>
#include <string>
//for protobuf
#include "Test.pb.h" 
//for zmq
#include <zmq.h>

using namespace std;
using namespace Test::protobuf ;

const int BUFFSIZE = 128;

int main()
{
    //socket通信所需的上下文环境
    void *context = zmq_ctx_new();
    //根据context建立的socket的链接,服务端使用ZMQ_REP套接字
    void *responder = zmq_socket(context, ZMQ_REP);
    if( -1 == zmq_bind(responder,"tcp://*:5555"))
    {
        cout<<"bind socket to server failed..."<<endl;
        return -1;
    }

    HeartInfo myprotobuf;
    while(1)
    {
        char buff[BUFFSIZE];
        //接收客户端请求
        zmq_msg_t request;
        zmq_msg_init(&request);
        int size = zmq_msg_recv(&request,responder,0);
        memcpy(buff,zmq_msg_data(&request),size);
        HeartInfo receive;
        receive.ParseFromArray(buff,BUFFSIZE);
        cout<<"Type:"<<receive.type()<<"\t"
            <<"IP:"<<receive.ip()<<"\t"
            <<"Port:"<<receive.port()<<"\n";
        zmq_msg_close(&request);

        //清空接收缓存
        memset(buff,0,BUFFSIZE*sizeof(char));
        sleep(2);

        myprotobuf.set_type("server");
        myprotobuf.set_ip("192.168.1.100");
        myprotobuf.set_port(5555);
        myprotobuf.SerializeToArray(buff,BUFFSIZE);

        //服务端发送响应
        int len = strlen(buff);
        zmq_msg_t reply;
        if(0 != zmq_msg_init_size(&reply,len))
        {
            cout<<"zmq_msg_init failed..."<<endl;
            break;
        }
        memcpy(zmq_msg_data(&reply),buff,len);
        if(len != zmq_msg_send(&reply,responder,0))
        {
            zmq_msg_close(&reply);
            cout<<"send faliled..."<<endl;
            break;
        }
        //成功发送后,在控制台打印发送消息的内容
        cout<<"Type:"<<myprotobuf.type()<<"\t"
            <<"IP:"<<myprotobuf.ip()<<"\t"
            <<"Port:"<<myprotobuf.port()<<"\n";
        zmq_msg_close(&reply);
    }
    zmq_close(&responder);
    zmq_ctx_destroy(context);
    return 0;
}

在本文中使用的是protobuf来序列化要传输的内容,当然直接传输字符串也是可以的,但是
需要注意的是“除了字节大小外,zmq对你发送的数据一无所知”。这意味着在C/C++中,传输的字符串是否以’\0’结尾,你要自己决定并负责安全的处理。


1.《ZeroMQ云时代极速消息通信库》.电子工业出版社,2015.
2. 使用protobuf和socket实现服务器间消息的传递.http://blog.csdn.net/cjf_wei/article/details/52894560

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用Zeromq和protobuf实现的socket通信 的相关文章

  • NanoMsg (NNG) 和 FlatBuffers 是否适合该项目?

    大声喊出我们是否应该考虑更好的事情 我正在寻找一种非常快速且简单的方法来获取多个程序 例如 5 个 每个程序都在私有 OpenStack 云上的单独节点上运行以相互通信 数据包将是短 C 结构 小于 100 字节 交通流量将会较少 可能低于
  • zmq:多线程可以以简单的 PUSH-PULL 模式进行 PUSH

    我有两个进程 producer它通过推送消息ZMQ http www zeromq org to a consumer以简单的 拉 推 点对点模式 生产者有几个内部线程send 通过 zmq 但是 0MQ 的文档建议不要在线程之间共享套接字
  • Linux重启后nodejs消失了

    我刚刚使用 nvm 安装了 nodejs 版本 0 10 14 它安装成功 同样在安装nodejs之前 我安装了zeromq版本2 2 0 出于测试目的 我尝试运行下面的基本酒吧示例 var zmq require zmq var sock
  • 使用 ZeroMQ 实现消息总线

    我必须开发一个消息总线 供进程相互发送和接收消息 目前 我们正在Linux上运行 并计划稍后移植到其他平台 为此 我使用 TCP 上的 ZeroMQ 该模式是带有转发器的 PUB SUB 我的总线作为一个单独的进程运行 所有客户端都连接到
  • 具有自定义负载平衡的 ZMQ 套接字

    我研究了 ZMQ PUSH PULL 套接字 尽管我非常喜欢这种简单性 特别是与我现在在 UDP 套接字上的系统中实现的自定义碎片 ack 相比 但我希望使用自定义负载平衡 而不是简单的循环 robin 我相信 ZMQ PUSH PULL
  • 为分布式系统构建数据收集和监控的中间件[关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我目前正在寻找一个好的中间件来构建监控和维护系统的解决方案 我们面临的挑战是监控 收集数据并维护由多达
  • Apache Thrift 和 ZeroMQ 之间的区别

    据我了解 Apache Thrift 和 ZeroMQ 是属于不同类别的软件 并且很难进行比较 因为这是苹果与橘子的比较 但我不知道为什么它们属于不同的类别 它们不是都用于在不同的服务之间传递数据吗 这些服务可能用不同的语言编写 也可能不是
  • php-zmq 未显示在 phpinfo() 页面上

    我在 Windows 上使用 Wamp Server 并安装了 zmq 库 当我在控制台上编写 php info 时 我看到 zmq 显示为已安装 zmq ZMQ 扩展 gt 已启用 ZMQ 扩展版本 gt 1 1 2 libzmq 版本
  • Python ZeroMQ PUSH/PULL——丢失消息?

    我正在尝试使用python with zeroMQ in PUSH PULL模式 发送大小的消息4 MB 每隔几秒钟 由于某种原因 虽然看起来所有消息都已发送 但服务器似乎只收到了其中一些消息 我在这里缺少什么 这是客户端的代码 clien
  • 如何使 Zeromq PUB/SUB 删除旧消息而不是新消息(用于实时提要)?

    说我有一个PUB服务器zmq send 的实时消息SUB客户 如果客户很忙而无法zmq recv 消息足够快 那么消息将在客户端 和 或服务器 中缓冲 如果缓冲区变得太大 高水位线 则新消息将被丢弃 对于实时消息来说 这与人们想要的相反 应
  • 如果可以使用 JZMQ,为什么还需要 JeroMQ?

    简单的问题 为什么在 Java 上 移植 zmq 并将其称为 JeroMQ 是个好主意 JeroMQ是ZeroMQ社区的官方项目 它是 C libzmq 库的完整移植 支持 3 2 版本 优点 纯Java 因此无需通过JNI链接C C 这对
  • Jupyter 和 Common Lisp

    我正在尝试安装cl jupyter https github com fredokun cl jupyter common lisp 内核Jupyter http jupyter org 我无法让它工作 当我打开一个新的 lisp 笔记本
  • 将 ZeroMQ 交叉编译为 ARM,以便在 MonoTouch iPhone 应用程序配置设置中使用

    我正在尝试在使用 MonoTouch 用 C 开发的 iPhone 应用程序中使用 ZeroMQ 库 我几乎解决了所有的问题 却在最后一道坎倒下了 我正在使用 ZeroMQ 2 1 10 和 C CLR 绑定 包装器 并在 Mac OS X
  • PyZMQ 是否处理为每个新客户端连接创建线程?

    我正在使用 PyZMQ 创建请求 回复服务器 并且我试图弄清楚为每个新客户端连接创建线程的行为是否由 PyZMQ 自动处理 最终 我试图弄清楚来自一个客户端的请求需要很长时间才能回复 是否会阻止来自所有其他客户端的请求 通常 我会在 Pyt
  • 带有 epgm 的 ZeroMQ PUB/SUB 无法接收同一主机上进程发送的消息

    我的所有进程都有两个套接字 一个 PUB 和一个 SUB 并且它们都使用相同的多播地址和端口 例如 PUB 会这样做 绑定 epgm 239 192 1 1 5555 SUB 将执行以下操作 连接 epgm 239 192 1 1 5555
  • ZeroMQ 在 python 多处理类/对象解决方案中挂起

    我正在尝试将 Python pyzmq 中的 ZeroMQ 与多处理一起使用 作为一个最小的 不是 工作示例 我有一个服务器类和一个客户端类 它们都继承自multiprocessing Process 客户端作为子进程应向服务器子进程发送消
  • 在多线程 c++0x11 程序中使用 zmq::poll 与 cntr +x 或终止信号结合

    对于自定义服务器 我打算使用int zmq poll zmq pollitemt t items int nitems long timeout 1 我认为这是 unix poll 函数的包装器 但包括zmq socket t文件描述符旁边
  • 如何在 ZeroMQ 套接字中检索和存储随机 UUID?

    我需要在多个客户端之间进行通信 当我尝试运行文件 多个终端 时 我得到相同的身份 所以我让路由器套接字自动设置UUID 但我发现我无法使用该身份存储在服务器上以在多个客户端之间进行路由 我如何处理多个客户端 ID 我正在尝试构建一个异步聊天
  • ZeroMQ:PUSH 上的 HWM 不起作用

    我正在尝试编写一个服务器 客户端脚本 其中包含一个执行任务的服务器和多个执行它的工作人员 问题是我的呼吸机有太多的任务 它很快就会填满内存 我尝试在绑定之前设置 HWM 但没有成功 一旦工作人员连接 它就会继续发送消息 完全忽略设置的 HW
  • 无法加载动态库 php_zmq.dll - 找不到指定的模块

    I have XAMPP含 PHP 版本 PHP 7 1 13 cli 构建时间 2018 年 1 月 3 日 20 16 04 ZTS MSVC14 Visual C 2015 x86 我想安装 ZeroMQ 扩展 它 我从这里下载的ht

随机推荐