RabbitMQ / AMQP:单个队列,同一消息的多个消费者?

2023-11-22

我刚刚开始使用 RabbitMQ 和 AMQP。

  • 我有一个消息队列
  • 我有多个消费者,我想用它们做不同的事情相同的消息.

大多数 RabbitMQ 文档似乎都专注于循环,即单个消息由单个消费者消费,负载分布在每个消费者之间。这确实是我亲眼所见的行为。

一个例子:生产者有一个队列,每 2 秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

这是一个消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

如果我启动消费者两次,我可以看到每个消费者都在循环行为中消费替代消息。例如,我将在一个终端中看到消息 1、3、5,在另一个终端中看到消息 2、4、6.

我的问题是:

  • 我可以让每个消费者收到相同的消息吗?即两个消费者都收到消息1,2,3,4,5,6?这在 AMQP/RabbitMQ 中被称为什么?一般情况下是如何配置的呢?

  • 这是常见的做法吗?我是否应该让交换器将消息路由到两个单独的队列中,并使用单个消费者?


我可以让每个消费者收到相同的消息吗?即两个消费者都收到消息1,2,3,4,5,6?这在 AMQP/RabbitMQ 中被称为什么?一般情况下是如何配置的呢?

不,没有消费者在同一个队列上。来自 RabbitMQAMQP 概念 guide:

重要的是要了解,在 AMQP 0-9-1 中,消息在消费者之间进行负载平衡。

这似乎暗示着队列中的循环行为是给定的,并且不可配置。即,需要单独的队列才能让多个消费者处理相同的消息ID。

这是常见的做法吗?我是否应该让交换器将消息路由到两个单独的队列中,并使用单个消费者?

不,不是,单个队列/多个消费者,每个消费者处理相同的消息 ID 是不可能的。让交换器将消息路由到两个单独的队列确实更好。

由于我不需要太复杂的路由,扇出交换会很好地处理这个问题。我之前并没有过多关注交换,因为 node-amqp 有“默认交换”的概念,允许您直接将消息发布到连接,但是大多数 AMQP 消息都会发布到特定交换。

这是我的扇出交换,包括发送和接收:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   
 
    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })
  
    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ / AMQP:单个队列,同一消息的多个消费者? 的相关文章

随机推荐

  • 将对象复制到对象(使用 Automapper ?)

    我有一堂课 public class Person public string FirstName get set public string LastName get set 我有两个 Person 实例 person1 和 person
  • playframework - IOException 无法分配内存

    我的系统是Mint 12 64位 我已经安装了类型安全堆栈并创建了一个新的播放应用程序 在不修改代码的情况下 我尝试运行它 这是输出 http pastebin com 6XwmsbAE Play 抱怨此错误消息 IOException C
  • NLTK/pyNLTK 可以“按语言”(即非英语)工作吗?如何工作?

    我如何告诉 NLTK 以特定语言处理文本 有时我会编写一个专门的 NLP 例程 在非英语 但仍然是印欧语 文本域上进行词性标记 标记化等 这个问题似乎只解决不同的语料库 而不是代码 设置的变化 德语 POS 标记 或者 是否有专门用于 py
  • 创建新的通用结构的正确方法是什么?

    我正在尝试创建一个可以初始化为某种类型的通用结构T 它看起来像这样 pub struct MyStruct
  • jQuery 选择器帮助 - 如何查找 ID 以特定字符开头和结尾的元素

    我有一个动态创建的页面 它可以有许多不同的带有 ID 的单选按钮 如下所示
  • 某些 HTML 标记中的“生成”属性有何用途?

    我看到它在 HTML 标签中使用 但我感觉它可以与大多数 HTML 标签一起使用 我大概能猜出这是什么意思 但我更好奇使用它有什么好处 我试图用谷歌搜索一些有关它的参考资料 但找不到任何资料 所以我来找你们专家 谢谢 Example
  • Android ACTION_IMAGE_CAPTURE 意图

    我们正在尝试使用本机相机应用程序让用户拍摄新照片 如果我们省略了 它就可以正常工作EXTRA OUTPUT extra并返回小位图图像 然而 如果我们putExtra EXTRA OUTPUT 在启动之前 一切都会正常进行 直到您尝试点击相
  • JasperReports 的最小依赖关系

    我希望在我的一个项目中使用 JasperReports 4 5 0 并且想知道仅生成 PDF 的最小依赖项是什么 我尝试浏览他们的网站和自述文档 但一无所获 我也遇到过这个安装维基这看起来非常过时 版本 1 2 2 是否有所需 jar 的最
  • 删除无效/不完整的多字节字符

    我在用户输入上使用以下代码时遇到一些问题 htmlentities string ENT COMPAT UTF 8 当检测到无效的多字节字符时 PHP 会抛出一个通知 PHP 警告 htmlentities path to file php
  • “原子”和“cstdatomic”有什么区别?

    有人可以澄清一下包含选项之间的区别吗 include
  • 原型中的函数列表

    我希望能够获取不同 JavaScript 对象的函数列表 特别是 String 和其他基元 我以为我能够以某种方式使用 String prototype 并神奇地获得原型中的函数列表 但没有骰子 有任何想法吗 我也尝试过使用下划线 例如 f
  • 根据谓词从列表中删除元素

    我想从列表中删除一个元素 使得该元素包含 X or N 我必须申请大型基因组 这是一个例子 input codon AAT XAC ANT TTA 预期输出 codon AAT TTA 出于基础目的 gt gt gt x for x in
  • 如何使用命令行在 VSCode 中安装多个扩展

    如何使用 cli 在 VSCode 中安装多个扩展 我试过 code install extension xyz local history jock svg 但它只安装第一个扩展xyz local history Installing e
  • OneDrive 上的 Visual Studio 解决方案

    我可以将我的 Visual Studio 解决方案放入 OneDrive 中并从那里工作吗 还是会以某种方式导致数据丢失 无论我身在何处 我都想随身携带我的项目 而不是每次都担心将它们放在闪存驱动器上 我不会推荐它 OneDrive 和其他
  • 不兼容的字符编码:ASCII-8BIT 和 UTF-8

    我使用 Ruby 1 9 2 和 Rails 3 0 5 我有以下错误 不兼容的字符编码 ASCII 8BIT 和 UTF 8 我认为这与数据库无关 错误发生在视图中的这一行 只是 div haml 调用 content 全栈 Action
  • Scaffold-DbContext 在 .net core 中抛出错误“无法找到程序集”

    我正在使用 net core 和实体框架 core 1 1 0 在尝试以下命令时包管理器控制台 Scaffold DbContext Server MyServer MyInstance Database MyDB user MyUsern
  • ADFS 作为 OAuth2 提供者/身份验证服务器可能吗?

    我们想要设置 ADFS 3 0 以启用基于 OAuth2 的身份验证 我已经阅读了大量文档 但仍不清楚这是否受支持 ADFS 是否可以用作 oauth 的授权服务器 或者 ADFS 中的 oauth2 支持仅意味着充当另一个授权服务器的客户
  • 查找位置:Google Play 位置服务或 Android 平台位置 API

    我正在尝试获取我的新导航应用程序的用户位置 我想经常检查用户的位置 并且它必须准确 我使用示例中的以下代码来获取位置 public class MainActivity extends Activity implements Locatio
  • 打印网页时如何隐藏元素?

    我的网页上有一个用于打印网页的链接 但是 该链接在打印输出本身中也可见 当我单击打印链接时 是否有 javascript 或 HTML 代码会隐藏链接按钮 Example Good Evening Print click Here To P
  • RabbitMQ / AMQP:单个队列,同一消息的多个消费者?

    我刚刚开始使用 RabbitMQ 和 AMQP 我有一个消息队列 我有多个消费者 我想用它们做不同的事情相同的消息 大多数 RabbitMQ 文档似乎都专注于循环 即单个消息由单个消费者消费 负载分布在每个消费者之间 这确实是我亲眼所见的行