python中zmq的基础三种模式

2023-05-16

ZMQ 的三个基本模型

ZMQ 提供了三个基本的通信模型,分别是“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”

  1. 请求应答模式(Request-Reply)(rep 和 req)
    消息双向的,有来有往,req端请求的消息,rep端必须答复给req端
  2. 订阅发布模式 (pub 和 sub)
    消息单向的,有去无回的。可按照发布端可发布制定主题的消息,订阅端可订阅喜欢的主题,订阅端只会收到自己已经订阅的主题。发布端发布一条消息,可被多个订阅端同事收到。
  3. push pull模式
    消息单向的,也是有去无回的。push的任何一个消息,始终只会有一个pull端收到消息.

1.ZMQ 的 Request-Reply
由 Client 发起请求,并等待 Server 回应请求。请求端发送一个简单的 hello,服务端则回应一个 world。请求端和服务端都可以是 1:N 的模型。通常把 1 认为是 Server ,N 认为是 Client 。ZMQ 可以很好的支持路由功能(实现路由功能的组件叫作 Device),把 1:N 扩展为N:M (只需要加入若干路由节点)

server为REP模式,等待消息,client为REQ模式,向server请求消息。
简单的例子
server.py

import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
    try:
        print("wait for client ...")
        message = socket.recv()
        print("message from client:", message.decode('utf-8'))
        input1 = raw_input("请输入内容:")
        socket.send(input1)
    except Exception as e:
        print '异常:',e
        sys.exit()

client.py

import zmq
import sys
context = zmq.Context()
print("Connecting to server...")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
while True:
    input1 = raw_input("请输入内容:")
    if input1 == 'b':
        sys.exit()
    socket.send(input1.encode('utf-8'))
    message = socket.recv()
    print("Received reply: ", message.decode('utf-8'))

1.服务端和客户端无论谁先启动,效果是相同的,这点不同于 Socket。
2.在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。
3.问一答式的。如果 Server 先 send,client 先 rev 是会报错的。
4.ZMQ 通信通信单元是消息,他除了知道 Bytes 的大小,他并不关心的消息格式。因此,你可以使用任何你觉得好用的数据格式。Xml、Protocol Buffers、Thrift、json 等等。

2.Publish-Subscribe模式(发布订阅模型)
广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤。

服务端
server.py

import zmq
import time
import sys
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")

while True:
    msg = input("请输入要发布的信息:").strip()
    if msg == 'b':   #关闭连接的字符
        sys.exit()
    socket.send(msg.encode('utf-8'))
    time.sleep(1)
  1. 与 Hello World 不同的是,Socket 的类型变成 SOCKET_PUB 和 SOCKET_SUB 类型。
  2. 客户端需要$subscriber->setSockOpt (ZMQ::SOCKOPT_SUBSCRIBE, $filter);设置一个过滤值,相当于设定一个订阅频道,否则什么信息也收不到。
  3. 服务器端一直不断的广播中,如果中途有 Subscriber 端退出,并不影响他继续的广播,当 Subscriber 再连接上来的时候,收到的就是后来发送的新的信息了。这对比较晚加入的,或者是中途离开的订阅者,必然会丢失掉一部分信息,这是这个模式的一个问题,所谓的 Slow joiner。稍后,会解决这个问题。
  4. 但是,如果 Publisher 中途离开,所有的 Subscriber 会 hold 住,等待 Publisher 再上线的时候,会继续接受信息。

客户端 (可以开多个客户端来接收消息)
client.py

import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")   #多个客户端连接同样的地址
socket.setsockopt(zmq.SUBSCRIBE,'123'.encode('utf-8'))  # 消息过滤  只接受123开头的信息
#socket.setsockopt(zmq.SUBSCRIBE,''.encode('utf-8'))  # 接收所有消息
#两种情况  这里必须使用一种来 过滤消息 不然无法接受消息
while True:
    response = socket.recv().decode('utf-8');
    print("response: %s" % response)

3.Parallel Pipeline模式(管道模型)
由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。
当连接被断开,数据不会丢失,重连后数据继续发送到对端。
在这里插入图片描述
server.py

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")

while True:
    msg = input("请输入要发布的信息:").strip()
    socket.send(msg.encode('utf-8'))
    print("已发送")
    time.sleep(1)

worker.py

import zmq
context = zmq.Context()
receive = context.socket(zmq.PULL)
receive.connect('tcp://127.0.0.1:5557')
sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')

while True:
    data = receive.recv()
    print("正在转发...")
    sender.send(data)

client.py

import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")

while True:
    response = socket.recv().decode('utf-8')
    print("response: %s" % response)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

python中zmq的基础三种模式 的相关文章

  • 六、51单片机之定时器/计数器_理论

    1 什么是定时器 计数器 定时器就是单片机设定一个时间间隔 xff0c 时间间隔到后通知单片机 例如设置100ms的定时器 xff0c 100ms后定时器通知单片机时间到了 1 定时器是单片机的一种内部外设 以前的单片机只有CPU 也就是只
  • 八、51单片机之蜂鸣器

    1 蜂鸣器的原理 蜂鸣器分为有源蜂鸣器和无源蜂鸣器 这里的 源 不是指电源 xff0c 而是指震荡源 1 1 无源蜂鸣器 1 早期蜂鸣器都是无源的 2 内部没有震荡电路 xff0c 无源蜂鸣器比有源蜂鸣器更便宜 3 无源蜂鸣器内部没有震荡源
  • CSS3实现loading效果

    前言 晚上躺床上刷视频的时候看到有个前端大佬写了一个loading xff0c 这是效果 xff1a loading 感觉也挺有意思哈 xff0c 要不自己也写一个 xff0c 学习 43 复习 两不误 但是又因为太晚了 xff0c 不想起
  • C#应用程序界面开发基础——窗体控制(2)——MDI窗体

    MDI窗体 单文档界面 xff08 SDI xff09 多文档界面 xff08 MDI xff09 MDI窗体的概念 MDI窗体 xff08 Multiple Document Interface xff0c 多文档界面 xff09 用于同
  • windows 10 + GTX1650 环境下基于TensorFlow的深度学习环境配置

    因论文需要 xff0c 简单的记录一个深度学习环境的配置过程 说明 xff1a 与广为应用的基于Anaconda的深度学习环境配置方法不同的是 xff0c 本文直接基于Python基础环境 43 Pycharm进行环境的配置 xff0c 不
  • TX2硬盘扩展

    TX2硬盘扩展 硬件平台 xff1a NVIDIA TX2 Samsung SSD 860EVO 系统平台 xff1a Ubuntu 18 04 LTS 安装 将硬盘连接至侧边SATA接口 扩展home 1 查看硬盘所有分区 sudo fd
  • Android手机4G网络设置ipv6

    我的卡是联通的 xff0c 所以下面截图也是联通的 xff0c 移动和电信的卡类似 xff1b 1 进入目录 xff1a 设置 移动网络 接入点名称 xff08 APN xff09 xff1b 2 点击默认的连接项 xff0c 进入详情页
  • 简单TCP编程

    1 TCP服务端 span class token macro property span class token directive hash span span class token directive keyword include
  • Redis的基本使用

    Redis简介 什么是Redis Redis是一款开源的内存数据库 xff0c 也称为键值存储 database 缓存 database 和消息队列 database 系统 它提供了丰富的数据结构和高效的操作方式 xff0c 并且支持多种编
  • Ubuntu16.04通过VNC远程桌面并开机自启动

    前置条件 xff0c 远程客户端已经安装VNC Viewer x11vnc安装 在Ubuntu 16 04中安装x11vnc xff0c 可以按照以下步骤操作 xff1a span class token comment 输入以下命令以更新
  • SpringBoot数据库读写分离

    导入maven坐标 span class token tag span class token tag span class token punctuation lt span dependency span span class toke
  • 滑模控制

    滑动模态的定义 人为设定一经过平衡点的相轨迹 xff0c 通过适当设计 xff0c 系统状态点沿着此相轨迹渐近稳定到平衡点 xff0c 或形象地称为滑向平衡点的一种运动 xff0c 滑动模态的 滑动 二字即来源于此 滑模控制的优点 xff1
  • FPGA基础之VGA(一)满屏红色

    一 项目分析 用VGA显示全屏的红色 xff0c VGA xff08 Video Graphics Array xff0c 视频图形阵列 xff09 是一种电脑显示标准 开发板采用至芯科技zx 1学习板 xff0c VGA视频显示接口是25
  • docker基础学习入门(六)------ DockerFile解析

    DockerFile是什么 Dockerfile是用来构建Docker镜像的构建文件 xff0c 是由一系列命令和参数构成的脚本 构建三步骤 xff1a 编写Dockerfile文件docker builddocker run 构建的文件什
  • C#应用程序界面开发基础——窗体控制(5)——分组类控件

    分组类控件 分组类控件有容器 xff08 Panel xff09 控件 分组框 xff08 GroupBox xff09 控件 选项卡 xff08 TabControl xff09 控件等 容器控件 容器控件是由System Windows
  • openmv4修改好的原理图和pcb文件

    openmv4修改好的原理图和pcb文件以及试验过的了 xff0c 生产过pcb文件 download csdn net download weixin 42741023 11953091
  • stm32h750/stm32h743原理图和pcb源文件

    stm32在目前使用非常广泛 xff0c 但是目前很多人都还停留在stmf1 f4仅仅只有72 128m主频阶段 xff0c stm32h743采用arm m7架构 xff0c 高达400m主频的处理器 xff0c 为我们的控制提供强有力的
  • Mac 安装brew时遇到curl: (7) Failed to connect to raw.githubusercontent.com port 443: Connection refused

    MAC电脑 有梯子但是感觉不好用 下面是我的折腾历程 xff0c 不想看的直接空降到最后 当时的想法 1 可能是ip被墙了 xff0c 笔记本电脑连接手机热点安装 xff0c 可解决问题 但是用手机每次到46 就停了 xff0c 然后就上网
  • docker部署Mysql并创建远程连接账号并赋权

    span class token comment 启动mysql容器 span docker run span class token operator span itd span class token operator span spa
  • Feature Squeezing

    对于 Feature Squeezing Detecting Adversarial Examples in Deep Neural Networks 的理解 很多先前的防止adversarial example的方法都是 xff0c ad

随机推荐