使用 RabbitMq 锁定和批量获取消息

2024-04-19

我正在尝试以一种更非常规的方式使用 RabbitMq(尽管此时我可以根据需要选择任何其他消息队列实现)。消费者不会将 Rabbit 推送消息留给我的消费者,而是连接到一个队列并获取一批 N 条消息(在此期间它会消费一些消息,并可能拒绝一些消息),然后跳转到另一个队列,依此类推。这样做是为了冗余。如果某些消费者崩溃,所有消息都保证被其他消费者消费。

问题是我有多个消费者,我不希望他们竞争同一个队列。有没有办法保证队列上的锁?如果不是,我至少可以确保如果两个消费者连接到同一个队列,他们不会读取相同的消息吗?事务可能在某种程度上对我有帮助,但我听说它们将从 RabbitMQ 中删除。

其他架构建议也受到欢迎。

Thanks!

EDIT:正如评论中指出的,我需要如何处理消息有一个特殊性。它们只有在分组时才有意义,并且相关消息很有可能聚集在队列中。例如,如果我提取一批 100 条消息,那么我很有可能能够对消息 1-3、4-5,6-10 等执行某些操作。如果我无法找到某些消息的组,我会会将它们重新提交到队列中。 WorkQueue 不起作用,因为它将消息从同一组传播到多个工人,而这些工人不知道如何处理它们。


您看过这本免费的在线书籍吗?企业集成模式 http://eaipatterns.com/?

听起来您确实需要一个工作流程,在消息到达您的工作人员之前,您需要一个批处理组件。使用 RabbitMQ 有两种方法可以做到这一点。要么使用一种可以为您进行批处理的交换类型(和消息格式),要么拥有一个队列,以及一个对批次进行排序并将每个批次放入其自己的队列中的工作人员。批处理程序可能还应该向控制队列发送“批处理就绪”消息,以便工作人员可以发现新批处理队列的存在。处理完批次后,工作人员可以删除批次队列。

如果您可以控制消息格式,则可以让 RabbitMQ 通过多种方式隐式执行批处理。通过主题交换,您可以确保每条消息上的路由密钥的格式为 work.batchid.something,然后获悉批次 xxyzz 存在的工作人员将使用像 #.xxyzz.# 这样的绑定密钥来仅消费这些消息。无需重新发布。

另一种方法是在标头中包含批次 ID 并使用较新的标头交换类型。当然,如果您愿意编写少量的 Erlang 代码,您也可以实现自己的自定义交换类型。

不过,我确实建议您查看这本书,因为它比大多数人开始使用的典型工作队列概念更好地概述了消息传递架构。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 RabbitMq 锁定和批量获取消息 的相关文章

  • 即使设置了 cookie,RabbitMQ 身份验证也会失败

    我最近在运行 lattePanda 的 Windows 10 上安装了带有 ErlanOTP 的rabbitmq 我运行rabbitmqctl status并收到以下错误 C Program Files RabbitMQ Server ra
  • RabbitMQ Java 客户端自动重新连接

    当我的应用程序失去与 RabbitMQ 的连接时 我将其连接工厂设置为自动尝试并重新连接 ConnectionFactory factory new ConnectionFactory factory setUsername usernam
  • RabbitMQ 管理插件窗口呈现为空白页面

    I have installed Erlang RabbitMQ and configured the management plugin as per the instructions on the website https www r
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • RabbitMQ:如何创建和恢复备份

    我是 RabbitMQ 的新手 我需要一些帮助 如何备份和恢复到RabbitMQ 以及我需要保存哪些重要数据 谢谢 如果您安装了管理插件 您可以在Overview页 在底部你会看到导入 导出定义您可以使用它来下载代理的 JSON 表示形式
  • 服务器在 pika.exceptions.StreamLostError: Stream 连接丢失后关闭

    我的队列中有一些图像 我将每个图像传递到我的 Flask 服务器 在其中完成图像处理 并在我的rabbitmq 服务器中收到响应 收到响应后 我收到此错误 pika exceptions StreamLostError 流连接丢失 104
  • Rabbit mq - 等待 Mnesia 表时出错

    我已经在 Kubernetes 集群上使用 Helm Chart 安装了 RabbitMQ rabbitmq pod不断重新启动 在检查 pod 日志时 我收到以下错误 2020 02 26 04 42 31 582 warning lt
  • rabbitmq 的 REST API

    有没有办法从 ajax 向 RabbitMQ 发送数据 我的应用程序由数千个 Web 客户端 用 js 编写 和 WCF REST 服务组成 现在我试图弄清楚如何为我的应用程序创建可扩展点 这个想法是有一个rabbitmq实例 它从放置在一
  • RabbitMQ 等待消息超时

    我想向 RabbitMQ 服务器发送一条消息 然后等待回复消息 在 回复 队列上 当然 我不想永远等待 以防处理这些消息的应用程序出现故障 需要有一个超时 这听起来像是一项非常基本的任务 但我找不到方法来做到这一点 我现在在使用 Java
  • Apache Kafka 是否提供异步订阅回调 API?

    我的项目正在将 Apache Kafka 视为老化的基于 JMS 的消息传递方法的潜在替代品 为了让这个过渡尽可能的顺利 如果替代的排队系统 Kafka 有一个异步订阅机制那就更理想了 类似于我们当前项目使用的JMS机制MessageLis
  • 使用 Spring 与 RabbitMQ 集成

    我正在为我们的一个应用程序开发消息传递界面 该应用程序是一种服务 旨在接受 作业 进行一些处理并返回结果 实际上以文件的形式 这个想法是使用 RabbitMQ 作为消息传递基础设施 并使用 Spring AMQP 来处理协议特定的细节 我不
  • 消息队列与套接字

    我没有太多的套接字编程经验 但我尝试阅读一些相关内容 我对 MDB 和消息队列非常熟悉 有人告诉我队列 例如 MDB 只不过是直接套接字连接 有人可以帮我比较一下这两个吗 两者是无与伦比的 因为它们代表不同的layers 这就像将关系数据库
  • RabbitMQ 3.1.3 和丢失的时间戳头

    如果消息中缺少时间戳头 是否可以将代理配置为插入时间戳头 因此 如果发布客户端没有添加时间戳标头 代理是否可以插入与交易所收到消息的时刻相匹配的时间戳值 我应该在哪里寻找该配置 或者这是一个坏主意 截至2015年 原来的问题有了新的答案 这
  • 如何停止本地主机上的 RabbitMQ 服务器

    我在 OS X 上安装了 RabbitMQ 服务器 并在命令行上启动它 现在 我应该如何阻止它运行还不清楚 我这样做之后 sudo rabbitmq server detached I get Activating RabbitMQ plu
  • Kafka中如何同时实现分布式处理和高可用?

    我有一个由 n 个分区组成的主题 为了进行分布式处理 我创建了两个在不同机器上运行的进程 他们使用相同的 groupd id 订阅主题并分配 n 2 个线程 每个线程处理单个流 每个进程 n 2 个分区 这样我就可以实现负载分配 但现在如果
  • RabbitMq 和“致命错误:握手失败 -handshake_decode_error”

    我正在使用 Windows Server 2012 Erlang 19 2 和 RabbitMq 3 6 6 我在使用 TLS 配置端点之间的连接时遇到问题 我已经尝试了所有关于 SO 的答案 以及所有 RabbitMq 文档here ht
  • PHPUnit RabbitMQ:为创建连接函数编写测试

    我面临以下问题 我编写了一个函数 根据所需参数创建连接对象 AMQPConnection 现在我想编写相应的单元测试 我只是不知道在没有运行 RabbitMQ 代理的情况下如何做到这一点 这是有问题的函数 public function g
  • 具有延迟的简单可扩展工作/消息队列

    我需要设置一个作业 消息队列 并可以选择为任务设置延迟 以便空闲工作人员不会立即拾取它 而是在一定时间后 可能因任务而异 我研究了几个 Linux 队列解决方案 rabbitmq gearman memcacheq 但它们似乎都没有提供开箱
  • 如何让Spring RabbitMQ创建一个新的队列?

    根据我对rabbit mq的 有限 经验 如果您为尚不存在的队列创建新的侦听器 则会自动创建该队列 我正在尝试将 Spring AMQP 项目与rabbit mq 一起使用来设置侦听器 但出现错误 这是我的 xml 配置
  • 使用主题交换运行多个 Celery 任务

    我正在用 Celery 替换一些自制代码 但很难复制当前的行为 我期望的行为如下 创建新用户时 应向tasks与交换user created路由键 该消息应该触发两个 Celery 任务 即send user activate email

随机推荐

  • 如何从xamarin表单应用程序将图像上传到服务器

    我正在尝试使用 post 请求将图像从我的 xamarin 表单应用程序发送到 asp net core 服务器 我需要将图像保存在某个服务器文件夹中 但我做不到 这是我在 mediaFile 中选择图像后发送图像的方法 private a
  • 如何使用 Identity Server 4 颁发基于 Windows 身份验证的访问令牌

    我的目标是保护 Web API 以便客户端只能使用 IS 基于 Windows 身份验证颁发的访问令牌来访问它 我完成了这个基本示例 http docs identityserver io en release quickstarts 1
  • 全局运算符和成员运算符的区别

    定义一个接受类的两个引用的全局运算符和定义一个仅接受正确操作数的成员运算符之间有区别吗 Global class X public int value bool operator X left X right return left val
  • Tensorflow - 保存模型

    我有以下代码 在尝试保存模型时出现错误 我可能做错了什么 我该如何解决这个问题 import tensorflow as tf data labels cifar tools read data C Users abc Desktop Te
  • 如何从 JObject 获取第一个键?

    我在用Newtonsoft Json在我的项目中 我有JObject像这样 4781 Name 1 1577 Name 2 9973 Name 3 我成功解析它JObject Parse 我需要从此 JObject 获取第一个密钥 4781
  • Javascript CRC16 示例代码或实现

    有人可以分享一个链接或示例代码来实现 JavaScript 中字符串的校验和吗 预先非常感谢 你想要什么 你需要更具体 CRC16 算法数量众多 每种算法都有自己的多项式并用于特定用途 一些 CRC16 算法非常适合创建哈希 例如 对于 R
  • 如何在ggplot2中实现手绘铅笔填充? [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我发现了这篇关于可视化的精彩博客文章 http www darkhorseanalytics com b
  • 进程的异步生成:设计问题 - Celery 或 Twisted

    全部 我正在寻求意见 指导 和设计理念 我的目标是找到一种精简但可靠的方法来从 HTTP POST 获取 XML 有效负载 这部分没有问题 对其进行解析 并异步生成一个相对寿命较长的进程 生成的进程是 CPU 密集型进程 将持续大约三分钟
  • Excel:如果在另一列中发现重复的单元格值,则突出显示绿色

    有人可以帮助我 我不知道该使用什么公式 我突出显示了图片中的单元格以展示我的意思的示例 What I want to do is highlight the cell in column A where the value matches
  • Python OrderedDict 不保持元素顺序[重复]

    这个问题在这里已经有答案了 我正在尝试创建一个 OrderedDict 对象 但我一创建它 元素就变得混乱了 这就是我所做的 from collections import OrderedDict od OrderedDict 0 0 2
  • Sublime Text 默认保存选项

    为什么当我在 Sublime Text 3 中保存文件时 默认保存位置是 Sublime 安装目录 为什么默认文件类型是什么 我想将默认保存位置设置为桌面并将默认文件类型设置为 txt 我该如何执行此操作 这是我的设置 font size
  • iOS 上的背景图像随着用户交互而闪烁 [Ionic 5]

    我正在尝试让背景图像在我正在从 Ionic 3 更新到 5 的多页 Ionic 应用程序上工作 除了加载的第一页之外 我在 iOS 上的任何页面上都遇到了闪烁的背景图像问题 我尝试实施这个解决方案 如何在 Ionic 中将图像同时放入 和
  • 应用程序在 Play 商店中上线后 Android 应用程序链接不起作用

    我已经根据以下链接实现了 Android 应用程序链接 https developer android com studio write app link indexing html https developer android com
  • 嵌套类模板特化

    A class template
  • 流星合并同一集合的光标

    在我的社交应用程序 如 FB 中 我有一个奇怪的需要 将同一集合用户的两个光标合并到一个发布中 Meteor 服务器打印此错误 发布函数为集合用户返回了多个游标 也许这在 Meteor 0 7 2 中无法完成 也许我的方法是错误的 但我发现
  • 如何在Python中获取Linux控制台窗口宽度

    python 有没有办法以编程方式确定控制台的宽度 我的意思是一行中不换行的字符数 而不是窗口的像素宽度 Edit 寻找适用于 Linux 的解决方案 不确定为什么它在模块中shutil 但它在 Python 3 3 中出现了 看 查询输出
  • 如何在 java 类方法或构造函数中插入前提条件?

    这是我正在上的 Java 课程 本书提到了前置条件和后置条件 但没有给出任何如何对其进行编码的示例 它继续讨论断言 我已经把它记下来了 但是我正在做的作业特别指出插入前提条件并用断言测试前提条件 任何帮助都会很棒 像 Eiffel 这样的语
  • 使用独立对齐和附加的 Listview 元素反应本机 Listview 网格布局

    我有一个关于 Listview 元素对齐的问题 这些元素应该以比行样式更盒装的样式显示 在图片中 您可以看到当前状态 这是通过使用 Listview 的 contentContainerStyle prop 中使用的样式表代码生成的 Lis
  • 带有默认参数的 Swift 选择器

    我在这里编写简单的代码 self navigationItem leftBarButtonItem UIBarButtonItem barButtonSystemItem UIBarButtonSystemItem Cancel targe
  • 使用 RabbitMq 锁定和批量获取消息

    我正在尝试以一种更非常规的方式使用 RabbitMq 尽管此时我可以根据需要选择任何其他消息队列实现 消费者不会将 Rabbit 推送消息留给我的消费者 而是连接到一个队列并获取一批 N 条消息 在此期间它会消费一些消息 并可能拒绝一些消息