我会在这里回答我自己的问题。设置 ZMQ_CONFLATE“仅保留最后一条消息”似乎很有希望,但它不适用于订阅过滤器。它只在队列中保留一条消息。如果您有多个过滤器,则其他过滤器类型的旧消息和新消息都会被丢弃。
同样,zeromq 指南建议简单地杀死慢速订阅者,但这似乎不是现实的解决方案。让具有不同阅读速度的订阅者订阅同一个快速发布者,应该是正常的用例。其中一些订阅者可能生活在慢速计算机上,其他人可能生活在快速计算机上,等等。ZeroMQ 应该能够以某种方式处理这个问题。
http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicidal-Snail-Pattern http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicidal-Snail-Pattern
我最终在客户端手动删除旧的排队消息。看起来效果很好。我通过这种方式向客户端订阅了不到 3 毫秒的消息(通过 tcp localhost)。即使在队列中有 5000 条 10 秒的旧消息,位于后面那几条实时消息前面的情况下,这种方法也有效。这对我来说已经足够好了。
我不禁认为这是图书馆应该提供的东西。它可能可以做得更好。
无论如何,这是客户端,旧消息丢失,代码:
bool Empty(zmq::socket_t& socket) {
bool ret = true;
zmq::pollitem_t poll_item = { socket, 0, ZMQ_POLLIN, 0 };
zmq::poll(&poll_item, 1, 0); //0 = no wait
if (poll_item.revents & ZMQ_POLLIN) {
ret = false;
}
return ret;
}
std::vector<std::string> GetRealtimeSubscribedMessageVec(zmq::socket_t& socket_sub, int timeout_ms)
{
std::vector<std::string> ret;
struct MessageTmp {
int id_ = 0;
std::string data_;
boost::posix_time::ptime timestamp_;
};
std::map<int, MessageTmp> msg_map;
int read_msg_count = 0;
int time_in_loop = 0;
auto start_of_loop = boost::posix_time::microsec_clock::universal_time();
do {
read_msg_count++;
//msg format sent by publisher is: filter, timestamp, data
MessageTmp msg;
msg.id_ = boost::lexical_cast<int>(s_recv(socket_sub));
msg.timestamp_ = boost::posix_time::time_from_string(s_recv(socket_sub));
msg.data_ = s_recv(socket_sub);
msg_map[msg.id_] = msg;
auto now = boost::posix_time::microsec_clock::universal_time();
time_in_loop = (now - start_of_loop).total_milliseconds();
if (time_in_loop > timeout_ms) {
std::cerr << "Timeout reached. Publisher is probably sending messages quicker than we can drop them." << std::endl;
break;
}
} while ((Empty(socket_sub) == false));
if (read_msg_count > 1) {
std::cout << "num of old queued up messages dropped: " << (read_msg_count - 1) << std::endl;
}
for (const auto &pair: msg_map) {
const auto& msg_tmp = pair.second;
auto now = boost::posix_time::microsec_clock::universal_time();
auto message_age_ms = (now - msg_tmp.timestamp_).total_milliseconds();
if (message_age_ms > timeout_ms) {
std::cerr << "[SUB] Newest message too old. f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
}
else {
std::cout << "[SUB] f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
ret.push_back(msg_tmp.data_);
}
}
return ret;
}