ZeroMQ在多线程应用程序中处理中断

2024-05-03

多线程环境下ZeroMQ的优雅退出

规格:带有 c++11 的 ubuntu 16.04,libzmq:4.2.3

示例代码

static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
{
    s_interrupted = 1;
    //some code which will tell main thread to exit
}

static void s_catch_signals (void)
{
    struct sigaction action;
    action.sa_handler = s_signal_handler;
    action.sa_flags = 0;
    sigemptyset (&action.sa_mask);
    sigaction (SIGINT, &action, NULL);
    sigaction (SIGTERM, &action, NULL);
}

static void Thread((zsock_t *pipe, void *)
{
    zmq::context_t context(1);
    zmq::socket_t requester1(context,ZMQ_DEALER);
    zmq::socket_t requester2(context,ZMQ_DEALER);
    requester1.connect(address1);
    requester2.connect(address2);
    zmq_pollitem_t items []=
        {{requester1,0,ZMQ_POLLIN,0},
        {requester2,0,ZMQ_POLLIN,0}};

    while(true)
    {
        zmq::message_t message;
        zmq::poll (items, 2, -1);

         if (items [0].revents & ZMQ_POLLIN) 
         {
             requester1.recv(&message);
         }
         if (items [1].revents & ZMQ_POLLIN) 
         {
             requester2.recv(&message);
         }
    }
}

int main()
{
    .
    //some code
    .

    zactor_t *actor = zactor_new (Threaded, nullptr);
    s_catch_signals();
    .
    //continue
    .
    //wait till thread finishes to exit

    return 0;
}

现在,当中断发生时,它将从主线程调用信号处理程序。我以某种方式需要告诉线程(轮询器)从信号处理程序退出。有什么想法如何实现这一目标?


从 ZMQ 文档中,您有两种“惯用”的方式来处理这个问题:

  • 在管道上轮询 http://zguide.zeromq.org/c:interrupt,并在信号处理程序中的管道上写入。

  • 发送信号时捕获recv中抛出的异常 http://zguide.zeromq.org/cpp:interrupt.

经过测试,似乎 zmq::poll 不会在 SIGINT 上引发异常。 因此,解决方案似乎是使用专用于关闭的套接字。 解决方案如下所示:

#include <iostream>
#include <thread>
#include <signal.h>
#include <zmq.hpp>
zmq::context_t* ctx;
static void s_signal_handler (int signal_value)
{
    std::cout << "Signal received" << std::endl;
    zmq::socket_t stop_socket(*ctx, ZMQ_PAIR);
    stop_socket.connect("inproc://stop_address");
    zmq::message_t msg("0", 1);
    stop_socket.send(msg);
    std::cout << "end sighandler" << std::endl;
}

static void s_catch_signals (void)
{
    struct sigaction action;
    action.sa_handler = s_signal_handler;
    action.sa_flags = 0;
    sigemptyset (&action.sa_mask);
    sigaction (SIGINT, &action, NULL);
    sigaction (SIGTERM, &action, NULL);
}

void thread(void)
{
    std::cout << "Thread Begin" << std::endl;
    zmq::context_t context (1);
    ctx = &context;
    zmq::socket_t requester1(context,ZMQ_DEALER);
    zmq::socket_t requester2(context,ZMQ_DEALER);
    zmq::socket_t stop_socket(context, ZMQ_PAIR);
    requester1.connect("tcp://127.0.0.1:36483");
    requester2.connect("tcp://127.0.0.1:36483");
    stop_socket.bind("inproc://stop_address");

    zmq_pollitem_t items []=
    {
        {requester1,0,ZMQ_POLLIN,0},
        {requester2,0,ZMQ_POLLIN,0},
        {stop_socket,0,ZMQ_POLLIN,0}
    };

    while ( true )
    {
        //  Blocking read will throw on a signal

        int rc = 0;
        std::cout << "Polling" << std::endl;
        rc = zmq::poll (items, 3, -1);

        zmq::message_t message;
        if(rc > 0)
        {
            if (items [0].revents & ZMQ_POLLIN)
            {
                requester1.recv(&message);
            }
            if (items [1].revents & ZMQ_POLLIN)
            {
                requester2.recv(&message);
            }
            if(items [2].revents & ZMQ_POLLIN)
            {
                std::cout << "message stop received " << std::endl;
                break;
            }
        }
    }
    requester1.setsockopt(ZMQ_LINGER, 0);
    requester2.setsockopt(ZMQ_LINGER, 0);
    stop_socket.setsockopt(ZMQ_LINGER, 0);
    requester1.close();
    requester2.close();
    stop_socket.close();
    std::cout << "Thread end" << std::endl;
}

int main(void)
{
    std::cout << "Begin" << std::endl;
    s_catch_signals ();
    zmq::context_t context (1);
    zmq::socket_t router(context,ZMQ_ROUTER);
    router.bind("tcp://127.0.0.1:36483");

    std::thread t(&thread);
    t.join();
    std::cout << "end join" << std::endl;

}

请注意,如果您不想与信号处理程序共享上下文,您可以使用“ipc://...”。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ZeroMQ在多线程应用程序中处理中断 的相关文章

  • 如何使用 C# 中的参数将用户重定向到 paypal

    如果我有像下面这样的简单表格 我可以用它来将用户重定向到 PayPal 以完成付款
  • 我如何才能等待多个事情

    我正在使用 C 11 和 stl 线程编写一个线程安全队列 WaitAndPop 方法当前如下所示 我希望能够将一些内容传递给 WaitAndPop 来指示调用线程是否已被要求停止 如果 WaitAndPop 等待并返回队列的元素 则应返回
  • “构建”构建我的项目,“构建解决方案”则不构建

    我刚刚开始使用VS2010 我有一个较大的解决方案 已从 VS2008 成功迁移 我已将一个名为 Test 的控制台应用程序项目添加到解决方案中 选择构建 gt 构建解决方案不编译新项目 选择构建 gt 构建测试确实构建了项目 在失败的情况
  • 以文化中立的方式将字符串拆分为单词

    我提出了下面的方法 旨在将可变长度的文本拆分为单词数组 以进行进一步的全文索引处理 删除停止词 然后进行词干分析 结果似乎不错 但我想听听关于这种实现对于不同语言的文本的可靠性的意见 您会建议使用正则表达式来代替吗 请注意 我选择不使用 S
  • Web 客户端和 Expect100Continue

    使用 WebClient C NET 时设置 Expect100Continue 的最佳方法是什么 我有下面的代码 我仍然在标题中看到 100 continue 愚蠢的 apache 仍然抱怨 505 错误 string url http
  • 动态加载程序集的应用程序配置

    我正在尝试将模块动态加载到我的应用程序中 但我想为每个模块指定单独的 app config 文件 假设我的主应用程序有以下 app config 设置
  • 在结构中使用 typedef 枚举并避免类型混合警告

    我正在使用 C99 我的编译器是 IAR Embedded workbench 但我认为这个问题对于其他一些编译器也有效 我有一个 typedef 枚举 其中包含一些项目 并且我向该新类型的结构添加了一个元素 typedef enum fo
  • Asp.NET WebApi 中类似文件名称的路由

    是否可以在 ASP NET Web API 路由配置中添加一条路由 以允许处理看起来有点像文件名的 URL 我尝试添加以下条目WebApiConfig Register 但这不起作用 使用 URIapi foo 0de7ebfa 3a55
  • 使用实体框架模型输入安全密钥

    这是我今天的完美想法 Entity Framework 中的强类型 ID 动机 比较 ModelTypeA ID 和 ModelTypeB ID 总是 至少几乎 错误 为什么编译时不处理它 如果您使用每个请求示例 DbContext 那么很
  • C++ OpenSSL 导出私钥

    到目前为止 我成功地使用了 SSL 但遇到了令人困惑的障碍 我生成了 RSA 密钥对 之前使用 PEM write bio RSAPrivateKey 来导出它们 然而 手册页声称该格式已经过时 实际上它看起来与通常的 PEM 格式不同 相
  • 创建链表而不将节点声明为指针

    我已经在谷歌和一些教科书上搜索了很长一段时间 我似乎无法理解为什么在构建链表时 节点需要是指针 例如 如果我有一个节点定义为 typedef struct Node int value struct Node next Node 为什么为了
  • WCF 中 SOAP 消息的数字签名

    我在 4 0 中有一个 WCF 服务 我需要向 SOAP 响应添加数字签名 我不太确定实际上应该如何完成 我相信响应应该类似于下面的链接中显示的内容 https spaces internet2 edu display ISWG Signe
  • 如何设计以 char* 指针作为类成员变量的类?

    首先我想介绍一下我的情况 我写了一些类 将 char 指针作为私有类成员 而且这个项目有 GUI 所以当单击按钮时 某些函数可能会执行多次 这些类是设计的单班在项目中 但是其中的某些函数可以执行多次 然后我发现我的项目存在内存泄漏 所以我想
  • SolrNet连接说明

    为什么 SolrNet 连接的容器保持静态 这是一个非常大的错误 因为当我们在应用程序中向应用程序发送异步请求时 SolrNet 会表现异常 在 SolrNet 中如何避免这个问题 class P static void M string
  • 转发声明和包含

    在使用库时 无论是我自己的还是外部的 都有很多带有前向声明的类 根据情况 相同的类也包含在内 当我使用某个类时 我需要知道该类使用的某些对象是前向声明的还是 include d 原因是我想知道是否应该包含两个标题还是只包含一个标题 现在我知
  • WPF/C# 将自定义对象列表数据绑定到列表框?

    我在将自定义对象列表的数据绑定到ListBox in WPF 这是自定义对象 public class FileItem public string Name get set public string Path get set 这是列表
  • 如何从两个不同的项目中获取文件夹的相对路径

    我有两个项目和一个共享库 用于从此文件夹加载图像 C MainProject Project1 Images 项目1的文件夹 C MainProject Project1 Files Bin x86 Debug 其中有project1 ex
  • IEnumreable 动态和 lambda

    我想在 a 上使用 lambda 表达式IEnumerable
  • C# - OutOfMemoryException 在 JSON 文件上保存列表

    我正在尝试保存压力图的流数据 基本上我有一个压力矩阵定义为 double pressureMatrix new double e Data GetLength 0 e Data GetLength 1 基本上 我得到了其中之一pressur
  • C++ 中类级 new 删除运算符的线程安全

    我在我的一门课程中重新实现了新 删除运算符 现在我正在使我的代码成为多线程 并想了解这些运算符是否也需要线程安全 我在某处读到 Visual Studio 中默认的 new delete 运算符是线程安全的 但这对于我的类的自定义 new

随机推荐

  • 如何识别 R 中行的镜像重复项

    在下面的SO帖子中如何识别 R 中行的部分重复项 https stackoverflow com questions 54661129 how to identify partial duplicates of rows in r 5466
  • 在 O(nloglogn) 最坏情况时间内对具有 O(logn) 个不同元素的 n 元素数组进行排序

    目前的问题是标题本身的内容 即给出一种算法 该算法在 O log logn 最坏情况时间内对具有 O log n 个不同元素的 n 元素数组进行排序 有任何想法吗 此外 您通常如何处理具有多个非不同元素的数组 O 日志 日志 n 时间足以让
  • 如何在 flatpak google chrome 中从 intellij 打开浏览器预览

    当我尝试在 Google Chrome 中打开浏览器预览时 遇到以下错误 不幸的是 我通过以下方式安装了 Google Chromeflatpak https flathub org apps details com google Chro
  • #NUXT.JS 中常用组件方法的存储位置

    实际上我想知道 NUXT JS 中常见组件方法的存储位置 我尝试过的事情 gt 在中间件中存储公共代码 它没用 因为据我所知 中间件只能处理对服务器的请求和响应 methods states methods SwitchManager fu
  • 使用 redux-thunk 取消之前的异步操作

    我正在使用 redux thunk 中间件构建一个 React Redux 应用程序来创建和处理 Ajax 请求 我有一个经常触发的特定 thunk 我想在触发新请求之前取消任何先前启动的 Ajax 请求 这可能吗 一种方法是通过为这些请求
  • bootstrap 一般如何工作,特别是在 Zend Framework 中?

    我正在阅读 Zend Framework 手册 但无法理解引导程序如何工作 特别是在 ZF 和一般情况下 他们写 您的 Bootstrap 类定义了要使用哪些资源和组件 初始化 好的 这意味着应该首先实例化 Bootstrap 类 但随后他
  • Rails 渲染不必要的信息

    我一直在使用 RoR 和 Bootstrap 我试图将我的代码渲染成我在网上找到的片段 基本上我的索引中有这个 div class col md 6 div class well well sm div class row p p p p
  • 如何返回给定长度的所有列表元素?

    我正在尝试返回具有特定长度的单词 这是我到目前为止的代码 words是一个列表并且size是一个正整数 def by size words size for word in words if len word size 我不知道如何继续 b
  • Promise 完成后导出模块

    我实际上想将 read 函数放在不同的模块中 然后在我的主 app js 中需要它 我对使用承诺还很陌生 修改 var dir require node dir var files function getFiles return new
  • 使用自己的网络服务器实现一致的安全 Google Play 应用内购买场景

    我已多次阅读了中的所有文档Android 开发者指南 http developer android com guide google play billing index html并熟悉了精彩的 Google 演示躲避海盗并阻止吸血鬼 ht
  • 使用 JDBC 3.0 实现对嵌套事务的支持

    我们的遗留应用程序使用 JDBC 3 0 它通过实现自己的事务管理器来支持事务 该事务管理器能够为每个线程返回相同的 JDBC 连接 我最近发现的问题是它不支持嵌套事务 如果一个事务在另一个事务中启动 那么在内部事务上下文中运行的每个 SQ
  • 如何使用 gulp webpack-stream 生成正确命名的文件?

    目前我们正在使用Webpack https webpack github io 对于我们的模块加载器 以及Gulp http gulpjs com 对于其他一切 sass gt css 以及开发 生产构建过程 我想将 webpack 的东西
  • 与 Python 中的另一个命令行程序交互

    我需要编写一个 Python 脚本 它可以运行另一个命令行程序并与其标准输入和标准输出流交互 本质上 Python 脚本将从目标命令行程序中读取数据 通过写入其 stdin 进行智能响应 然后再次从程序中读取结果 它会重复执行此操作 我查看
  • Microsoft 认知 API 的正确密钥

    我目前正在尝试在 MS 认知服务 Bing 搜索 API 上进行新闻搜索 我读过很多文档 但似乎被困住了 这是我正在使用的代码 url https bingapis azure api net api v5 news search q mi
  • C++ 进程管理 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否有一个众所周知的 可移植的 好的 C 进程管理库 我发现了一个很有前途的图书馆叫做升压过程 htt
  • Java Keystore 是否存在性能问题? [复制]

    这个问题在这里已经有答案了 我们开发了一个应用程序来加密 解密来自服务器的请求 响应 我们正在做性能测试 加密 解密应用程序 我们观察到加密 解密过程需要时间 而许多线程 正在同时做 为了识别问题 我们记录了加密 解密过程中的所有方法 从记
  • ERRO[0001] 等待容器时出错:上下文已取消

    运行 docker 镜像时出现错误 看起来问题出在我的电脑上 我使用的是 MacOS 10 13 6 我已按照步骤创建 docker 映像 Sanjeet server api sanjeet docker build t apiconta
  • 删除 ggplot 地图/choropleth 中的边框线

    我想删除 ggplot 中生成的等值线区域之间的线 我的问题是由一张非常大的地图引起的 其中包含非常非常小的区域 人口普查区块组 这些区域数量如此之多 以至于鉴于边界的密度 不可能看到填充形状的颜色 我在 Mac 上使用更新后的 RStud
  • “isset构造”有捷径吗?

    我经常写这行代码 myParam isset params myParam params myParam defaultValue 通常 它会使嵌套数组的行变得很长 我可以把它改短一点吗 function getOr var default
  • ZeroMQ在多线程应用程序中处理中断

    多线程环境下ZeroMQ的优雅退出 规格 带有 c 11 的 ubuntu 16 04 libzmq 4 2 3 示例代码 static int s interrupted 0 static void s signal handler in