如何使 Zeromq PUB/SUB 删除旧消息而不是新消息(用于实时提要)?

2024-02-24

说我有一个PUB服务器zmq_send()的实时消息SUB客户。如果客户很忙而无法zmq_recv()消息足够快,那么消息将在客户端(和/或服务器)中缓冲。

如果缓冲区变得太大(高水位线),则新消息将被丢弃。对于实时消息来说,这与人们想要的相反。应删除旧消息,为新消息腾出位置。

有什么方法可以做到这一点吗?

理想情况下我想要SUB客户端的接收队列为空或仅包含最新消息。当收到新消息时,它将替换旧消息。 (我猜这里的问题是客户端会阻止zmq_recv()当队列为空时,这样做会浪费时间。 )

那么实时提要通常是如何实现的ZeroMQ?


我会在这里回答我自己的问题。设置 ZMQ_CONFLATE“仅保留最后一条消息”似乎很有希望,但它不适用于订阅过滤器。它只在队列中保留一条消息。如果您有多个过滤器,则其他过滤器类型的旧消息和新消息都会被丢弃。

同样,zeromq 指南建议简单地杀死慢速订阅者,但这似乎不是现实的解决方案。让具有不同阅读速度的订阅者订阅同一个快速发布者,应该是正常的用例。其中一些订阅者可能生活在慢速计算机上,其他人可能生活在快速计算机上,等等。ZeroMQ 应该能够以某种方式处理这个问题。

http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicidal-Snail-Pattern http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicidal-Snail-Pattern

我最终在客户端手动删除旧的排队消息。看起来效果很好。我通过这种方式向客户端订阅了不到 3 毫秒的消息(通过 tcp localhost)。即使在队列中有 5000 条 10 秒的旧消息,位于后面那几条实时消息前面的情况下,这种方法也有效。这对我来说已经足够好了。

我不禁认为这是图书馆应该提供的东西。它可能可以做得更好。

无论如何,这是客户端,旧消息丢失,代码:

bool Empty(zmq::socket_t& socket) {
    bool ret = true;
    zmq::pollitem_t poll_item = { socket, 0, ZMQ_POLLIN, 0 };
    zmq::poll(&poll_item, 1, 0); //0 = no wait
    if (poll_item.revents & ZMQ_POLLIN) {
        ret = false;
    }
    return ret;
}

std::vector<std::string> GetRealtimeSubscribedMessageVec(zmq::socket_t& socket_sub, int timeout_ms)
{
    std::vector<std::string> ret;

    struct MessageTmp {
        int id_ = 0;
        std::string data_;
        boost::posix_time::ptime timestamp_;
    };

    std::map<int, MessageTmp> msg_map;

    int read_msg_count = 0;
    int time_in_loop = 0;
    auto start_of_loop = boost::posix_time::microsec_clock::universal_time();
    do {
        read_msg_count++;

        //msg format sent by publisher is: filter, timestamp, data
        MessageTmp msg;
        msg.id_ = boost::lexical_cast<int>(s_recv(socket_sub));
        msg.timestamp_ = boost::posix_time::time_from_string(s_recv(socket_sub));
        msg.data_ = s_recv(socket_sub);

        msg_map[msg.id_] = msg;

        auto now = boost::posix_time::microsec_clock::universal_time();
        time_in_loop = (now - start_of_loop).total_milliseconds();
        if (time_in_loop > timeout_ms) {
            std::cerr << "Timeout reached. Publisher is probably sending messages quicker than we can drop them." << std::endl;
            break;
        }
    } while ((Empty(socket_sub) == false)); 

    if (read_msg_count > 1) {
        std::cout << "num of old queued up messages dropped: " << (read_msg_count - 1) << std::endl;
    }

    for (const auto &pair: msg_map) {
        const auto& msg_tmp = pair.second;

        auto now = boost::posix_time::microsec_clock::universal_time();
        auto message_age_ms = (now - msg_tmp.timestamp_).total_milliseconds();

        if (message_age_ms > timeout_ms) {
            std::cerr << "[SUB] Newest message too old. f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
        }
        else {
            std::cout << "[SUB] f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
            ret.push_back(msg_tmp.data_);
        }
    }

    return ret;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使 Zeromq PUB/SUB 删除旧消息而不是新消息(用于实时提要)? 的相关文章

  • (py)zmq/PUB:是否可以立即调用 connect() 然后调用 send() 并且不会丢失消息?

    使用这段代码 我总是会丢失消息 def publish frontend url message context zmq Context socket context socket zmq PUB socket connect fronte
  • 为什么 ZeroMQ 示例不起作用?

    我是 Python ZeroMQ 的新手 所以如果这是一个简单的问题 请表现出宽容 我尝试运行一些示例 但效果并不好 以下是 ZeroMQ 指南的 hwserver hwclient 示例 SERVER Hello World server
  • ZMQ套接字连接超时

    我正在使用 ZMQ cppzmq 的 C 绑定 并且尝试使用以下命令设置 TCP 套接字的连接超时 setsockopt 这样的方法 int connectTimeout 1000 socket setsockopt ZMQ CONNECT
  • ZeroMQ 中的 N 到 N 异步模式?

    尽管我阅读了该指南 但我找不到执行以下操作的方法 我们有n个出版商 我们有 m 个订户 每个订阅者订阅某种类型的消息 一个发布者可以发送多种消息 多个发布者可以发出相同类型的消息 如何在 0MQ 中创建 N 到 N 或 N 到 1 到 N
  • Python 在从函数返回时挂起

    假设我在一个相当复杂的 Flask 应用程序中有两个函数 一个函数调用另一个函数 def dispatch unlock stuff log dis start this routine just sends some data over
  • 适用于 Windows 的 Zeromq PHP 扩展

    我正在使用配置了 IIS 7 5 的 Zend 服务器 我搜索了 edit Zeromq php 扩展 我找到了这些http valokuva org builds http valokuva org builds and http sna
  • React/ZMQ/Ratchet - Websocket 服务器响应

    我目前已经有一个正在运行并使用 Ratchet PHP 的 Web 套接字服务器 我还没有处于希望外部脚本与我的服务器进行通信的阶段 我可以使用 ZMQ 成功地将数据推送到它 push php json name gt Joe Bloggs
  • ZeroMQ (clrzmq4) 轮询问题

    我想要完成的是实现从两个套接字之一读取消息 无论消息首先到达何处 据我了解轮询 zmq poll 是正确的做法 如指南中的 mspoller http zguide zeromq org cs mspoller 在这里我将提供小的伪代码片段
  • 如何使用 Zeromq 的 inproc 和 ipc 传输?

    我是 ZERMQ 的新手 ZeroMQ 具有 TCP INPROC 和 IPC 传输 我正在寻找在 Winx64 和 python 2 7 中使用 python 和 inproc 的示例 这些示例也可以用于 Linux 另外 我一直在寻找
  • Zeromq:如何在 C++ 中访问 tcp 消息

    我是 ZeroMQ 的新手 正在学习 echo 客户端 服务器模式 请求 回复 的 C hello world 示例 服务器看起来像 Hello World server in C Binds REP socket to tcp 5555
  • 为分布式系统构建数据收集和监控的中间件[关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我目前正在寻找一个好的中间件来构建监控和维护系统的解决方案 我们面临的挑战是监控 收集数据并维护由多达
  • jeromq 生产准备好了吗?

    我过去曾通过 JVM 应用程序使用 ZeroMQjzmq图书馆 我计划在一个新项目中使用 Zeromq 其中一些服务是在 JVM 上实现的 我刚刚发现jeromq https github com zeromq jeromq 一个 Zero
  • ZeroMQ性能测试。准确的延迟是多少?

    我正在使用 zmq 跨进程传输消息 并且我想做一些性能测试来获取延迟和吞吐量 官方网站给出了指南讲述如何运行性能测试 http zeromq org results perf howto 例如 我尝试过 local lat tcp 1521
  • Python ZeroMQ PUSH/PULL——丢失消息?

    我正在尝试使用python with zeroMQ in PUSH PULL模式 发送大小的消息4 MB 每隔几秒钟 由于某种原因 虽然看起来所有消息都已发送 但服务器似乎只收到了其中一些消息 我在这里缺少什么 这是客户端的代码 clien
  • clrzmq 在 Xamarin Studios/C# 应用程序中找不到 libzmq

    我在 Mac 上使用 Xamarin Studio clrzmq通过 NuGet 包含 libzmq dll 上的 clrzmq 引用 我的应用程序编译得很好 但是当我尝试运行它时 我得到了这个 Unhandled Exception Sy
  • 如果可以使用 JZMQ,为什么还需要 JeroMQ?

    简单的问题 为什么在 Java 上 移植 zmq 并将其称为 JeroMQ 是个好主意 JeroMQ是ZeroMQ社区的官方项目 它是 C libzmq 库的完整移植 支持 3 2 版本 优点 纯Java 因此无需通过JNI链接C C 这对
  • ZeroMQ套接字在什么情况下会丢弃或无法传递消息?

    是否有某种规范或其他解释来描述正常情况 对于每种通信类型 您可以预期在 ZeroMQ 套接字上发送的消息不会被 所有 侦听进程接收 例如 我有一个实验程序 它基本上假设所有订阅者PUB套接字接收在该套接字上发送的所有消息 在初始化握手之后
  • ZeroMQ 在 python 多处理类/对象解决方案中挂起

    我正在尝试将 Python pyzmq 中的 ZeroMQ 与多处理一起使用 作为一个最小的 不是 工作示例 我有一个服务器类和一个客户端类 它们都继承自multiprocessing Process 客户端作为子进程应向服务器子进程发送消
  • Python zmq SUB 套接字未接收 MQL5 Zmq PUB 套接字

    我正在尝试在 MQL5 中设置一个 PUB 套接字 并在 Python 中设置一个 SUB 套接字来接收消息 我在 MQL5 中有这个 include
  • 等待连接的正确方法是什么?

    我正在尝试使用 NetMQ 在两个应用程序之间实现简单的消息传递 下面是我想要实现的目标的更详细的描述 经过一番尝试和错误后 我发现我不能在 Connect Bind 调用后立即发送或接收消息 因为它们是非阻塞的 并且即使尚未建立连接也会实

随机推荐