PUB-SUB 的 ZMQ 延迟(慢订阅者)

2023-12-11

我发现了很多关于类似主题的问题,但它们并没有帮助我解决我的问题。

Using :

  • Linux Ubuntu 14.04
  • 蟒蛇3.4
  • zmq : 4.0.4 // pyZMQ 14.3.1

TL;DR

即使设置了 HWM,ZMQ SUB 套接字中的接收器队列也会无限增长。当订阅者比发布者慢时就会发生这种情况。 我可以做什么来预防它?

背景

我在人机交互领域工作。我们有一个巨大的代码库来控制鼠标光标之类的东西。我想在几个模块中“打破它”,与 ZMQ 通信。 它必须具有尽可能小的延迟,但丢弃(丢失)消息并不那么重要。

另一个有趣的方面是可以在节点之间添加“间谍”。因此 PUB/SUB 插座似乎是最合适的。

像这样的东西:

+----------+                +-----------+                 +------------+
|          | PUB            |           |  PUB            |            |
|  Input   | +----+------>  |  Filter   |  +----+------>  |   Output   |
|          |      |     SUB |           |       |     SUB |            |
+----------+      v         +-----------+       v         +------------+
               +-----+                       +-----+                   
               |Spy 1|                       |Spy 2|                   
               +-----+                       +-----+       

Problem

一切正常,除了我们添加间谍的时候。 如果我们添加一个间谍来做“繁重的事情”,比如使用 matplotlib 进行实时可视化,我们会注意到绘图中的延迟不断增加。 IE:在上图中,过滤器和输出很快,没有看到延迟,但在 Spy 2 上,运行 20 分钟后延迟可以达到 10 分钟(!!)

看起来接收器上的队列无限增长。 我们研究了 ZMQ 的高水位线 (HWM) 功能,将其设置为低位以丢弃较旧的消息,但没有任何改变。

最少的代码

建筑学 :

+------------+                +-------------+
|            |  PUB           |             |
|   sender   | -------------> |  receiver   |
|            |             SUB|             |
+------------+                +-------------+

接收器是一个慢速接收器(在第一张图中充当间谍)

Code :

发件人.py

import time
import zmq

ctx = zmq.Context()

sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.SNDBUF, 256)
sender.set_hwm(10)
sender.bind('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(sender.get_hwm()) ## 10

i = 0
while True:
    mess = "{} {}".format(i, time.time())
    sender.send_string(mess)
    print("Send : {}".format(mess))
    i+= 1

接收者.py:

import time
import zmq

ctx = zmq.Context()
front_end = ctx.socket(zmq.SUB)

front_end.set_hwm(1)
front_end.setsockopt(zmq.RCVBUF, 8)

front_end.setsockopt_string(zmq.SUBSCRIBE, '')
front_end.connect('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(front_end.get_hwm()) ## 1

while True:
    mess = front_end.recv_string()
    i, t = mess.split(" ")
    mess = "{} {}".format(i, time.time() - float(t))
    print("received : {}".format(mess))
    time.sleep(1)  # slow

我认为这不是 ZMQ Pub/Sub 的正常行为。 我尝试在接收器、订户和两者中设置 HWM,但没有任何改变。

我缺少什么?

Edit :

当我解释我的问题时,我认为我没有说清楚。我做了一个移动鼠标光标的实现。输入是在 ZMQ 中以 200Hz 发送的鼠标光标位置(带有.sleep( 1.0 / 200 )),完成了一些处理并更新了鼠标光标位置(在我的最小示例中没有这种睡眠)。

一切都很顺利,即使是在我发射间谍的时候。然而,间谍的延迟时间却越来越长(因为处理速度很慢)。延迟不会出现在“管道”末尾的光标中。

我认为问题出在缓慢用户排队的消息。

在我的示例中,如果我们杀死发送者并让接收者存活,消息将继续显示,直到显示所有(?)提交的消息。

间谍正在绘制光标的位置以提供一些反馈,有这样的延迟还是很不方便......我只想得到最后发送的消息,这就是我尝试降低 HWM 的原因。


缺少更好的实时设计/验证

ZeroMQ 是一个强大的消息传递层。

也就是说,check原始情况下每秒实际发送多少条消息while True:杀手循环

Measure它。设计要基于事实,而不是感觉。

事实很重要。

start_CLK = time.time()                                    # .SET _CLK
time.sleep( 0.001)                                         # .NOP avoid DIV/0!
i = 0                                                      # .SET CTR
while True:                                                # .LOOP
    sender.send_string( "{} {}".format( i, time.time() ) ) # .SND ZMQ-PUB 
    print i / ( time.time() - start_CLK )                  # .GUI perf [msg/sec]
    i+= 1                                                  # .INC CTR

ZeroMQ 尽最大努力填充方案中的雪崩。

它在这方面非常擅长。

Your [Filter] + [Spy1] + [Output] + [Spy2] 管道处理,端到端,

  • 更快,包括。 .send() + .recv_string() 开销都比 [Input]-sender

or

  • 是主要的阻塞病态元素,导致内部 PUB/SUB 队列不断增长、增长、增长

这种队列链问题可以通过另一种架构设计来解决。

需要重新思考的事情:

  1. 子样本 the [Filter].send() 节奏(交错系数取决于您控制下的实时过程的稳定性问题 - 无论是 1 毫秒(顺便说一句,O/S 计时​​器分辨率,因此使用 COTS O/ 不可能进行量子物理实验) S 定时器控制:o) ),双向语音流为 10 毫秒,电视/GUI 流为 50 毫秒,键盘事件流为 300 毫秒等)

  2. online v/s offline后处理/可视化(你注意到一个沉重的matplotlib加工,在那里你通常要承受大约 800 - 1600 - 3600 毫秒的开销,即使是简单的 2D 绘图 -测量它在决定更改 PUB/SUB-proc1>-PUB/SUB-proc2> 处理架构(你已经注意到了,spy2> 导致生长问题proc2>-PUB-馈送和发送开销)。

  3. 线程数与执行线程的本地主机核心数 -- 从 localhost ip 可以看出,所有进程都驻留在同一个 localhost 上。加上每个使用的 ZMQ.Context 添加一个线程,加上 查看 Python GIL 锁定开销(如果所有线程都已实例化) 来自同一个 Python 解释器...阻塞增加。阻挡很痛。 更好的分布式架构可以提高这些性能 方面。不过,请先回顾一下 [1] 和 [2]

n.b.调用一个20分钟的处理管道延迟(实时系统TimeDOMAIN skew)是一个委婉的延迟

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PUB-SUB 的 ZMQ 延迟(慢订阅者) 的相关文章

随机推荐

  • Bootstrap 4 导航栏未显示[重复]

    这个问题在这里已经有答案了 我正在尝试使用 Bootstrap 4 创建导航栏 这是我的 package json 文件的内容 name NAME here version 1 0 0 description Descripttion ge
  • 为 iPad 和 iPhone 设置输入按钮的样式

    我使用 CSS 来设置网站上输入按钮的样式 但在 IOS 设备上 样式被 Mac 的默认按钮所取代 有没有一种方法可以为 iOS 设计按钮样式 或者可以制作一个行为类似于提交按钮的超链接 您可能正在寻找 webkit appearance
  • Java EE 6 编程安全性、glassfish 和 JDBC 领域

    我正在探索基于 glassfish 服务器的 jdbc 领域的纯 Java EE 方法来实现编程安全性 特别是登录用户 基本上 在我的登录 servlet 中我正在做 String username request getParameter
  • Laravel 4:如何运行原始 SQL?

    我想重命名 Laravel 4 中的表 但不知道该怎么做 SQL 是alter table photos rename to images 如果有 Eloquent 解决方案 我还想知道如何运行原始 SQL 因为有时别无选择 In the
  • R:如何让我的包使用另一个包?

    这是一个非常简单的问题 我正在延长某人的包裹 它当前使用包 A B 它们列在说明文件中 如果我需要包 C 中的函数 将包添加到依赖项 我是否只需在描述文件中添加包就足够了 进入哪个部分 依赖或导入 还需要采取更多其他步骤吗 一旦我的代码需要
  • Google Apps 脚本中的格式化日期

    我试图在提交表单时获取包含简单格式化日期的电子表格 但所有日期 包括时间戳 都被发布为 1969 年 12 月 31 日下午 2 00 我做错了什么 任何帮助将不胜感激 function formSubmitReply e var shee
  • 有没有一个工具可以将swf反编译为actionscript? [复制]

    这个问题在这里已经有答案了 可能的重复 如何反编译 swf 文件 有人知道有这样的工具吗 硕思的反编译器是一个很好的起点
  • 从 pojo 生成 JsonSchema:如何自动添加“描述”?

    我正在尝试从项目中的 pojos 自动生成 JsonSchema 代码如下所示 ObjectMapper mapper new ObjectMapper SchemaFactoryWrapper visitor new SchemaFact
  • 只想比较 TIME 值 MomentJS

    在浏览了一些其他 MomentJS 问题和答案之后 我仍然对如何使用 moment 进行简单比较感到困惑两个不同的时间 我不需要 想要 考虑日期 日期 我的用例是这样的 我正在从配置文件中读取时间表 开始 结束时间 这是使用 Node js
  • 有多少 iPhone 应用程序可以在 Linux 上开发和测试?

    我为客户开发一些 iPhone 应用程序 但我更喜欢在 Linux 机器上工作 我知道您需要在运行 OS X 的计算机上执行某些操作 例如构建最终发行版 在模拟器中运行 iPhone 应用程序等 但我想知道你可以使用普通的 Objectiv
  • 获取图像缩略图文件路径

    我正在尝试获取缩略图paths 而不是位图对象 当我查询这些时 某些缩略图路径由于某种原因为空 我的设备中有1028个缩略图 光标长度确实是1028 但仍然返回空值 我知道有1028个缩略图 因为我检查过 这是我的代码 String pro
  • SQL OVER() 子句 - 何时以及为何有用?

    USE AdventureWorks2008R2 GO SELECT SalesOrderID ProductID OrderQty SUM OrderQty OVER PARTITION BY SalesOrderID AS Total
  • 为什么应该使用 CUDA 驱动程序 API 而不是 CUDA 运行时 API?

    为什么我应该使用 CUDA Driver API 在哪些情况下我不能使用 CUDA Runtime API 这比 Driver API 更方便 运行时 API 是比驱动程序 API 更高级别的抽象 并且通常更易于使用 性能差距应该很小 驱动
  • cygwin 控制台中的 Python datetime.now 不正确

    如果你能帮助我理解原因 从 Cygwin 终端 这是对的 date Wed Sep 2 2020 11 19 07 PM 这也是正确的 date utc Wed Sep 2 2020 9 19 14 PM 时区也是正确的 echo TZ E
  • 在 Ruby 中以编程方式访问属性/方法注释

    有没有一种方法可以以编程方式访问方法注释 或者属性注释 我想用它作为文档中方法的描述 我不想让它成为静态的或使用 rdoc 或等效文件生成 下面是 Ruby 类的示例 Class MyClass This method tries over
  • 如何管理 Azure AD 用户的 Azure AD 应用程序角色

    1 有人知道可以管理的工具吗 任务Azure AD 中企业应用程序的 Azure AD 用户角色 清单中定义的 appRoles 的数量 我正在讨论如何将角色 特定于应用程序 分配给现有的 Azure AD 用户 使用 Azure 门户来执
  • 正则表达式匹配一定长度的单词

    我想知道匹配单词的正则表达式 以使单词具有最大长度 例如 如果一个单词的长度最多为 10 个字符 我希望正则表达式能够匹配 但如果长度超过 10 那么正则表达式不应匹配 I tried w 10 但只有当单词的最小长度为 10 个字符时才会
  • 将带有 numpy 数组的字典写入 .csv

    我想将结果文件写入 csv 我准备了一个简单的测试示例 import numpy as np data testdata np array 1 2 3 4 5 data set1 a testdata b testdata c testda
  • Google 数据 API 中的“authTokenType”参数是什么?

    我刚刚使用新的 google api java client 实现了 Google Translator Toolkit API 问题是 文档中没有提到 authTokenType 它似乎对于身份验证很重要 在样本中 bigquery js
  • PUB-SUB 的 ZMQ 延迟(慢订阅者)

    我发现了很多关于类似主题的问题 但它们并没有帮助我解决我的问题 Using Linux Ubuntu 14 04 蟒蛇3 4 zmq 4 0 4 pyZMQ 14 3 1 TL DR 即使设置了 HWM ZMQ SUB 套接字中的接收器队列