RabbitMQ:快速生产者和慢速消费者

2023-12-29

我有一个应用程序,它使用 RabbitMQ 作为消息队列在两个组件(发送者和接收者)之间发送/接收消息。发送者以非常快的方式发送消息。接收方收到消息后会做一些非常耗时的工作(主要是数据量非常大的数据库写入)。由于接收方需要很长时间才能完成任务然后检索队列中的下一条消息,因此发送方将快速填满队列。所以我的问题是:这会导致消息队列溢出吗?

消息消费者如下所示:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}

消费者收到的每条消息都包含一个 caseID。对于每个caseID,它都会将大量数据保存到数据库中,这需要很长时间。目前,RabbitMQ 只设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅 caseID。那么如何加快消费者吞吐量,让消费者能够赶上生产者,避免队列中的消息溢出呢?消费部分是否应该使用多线程来加快消费速度?或者我应该使用多个消费者同时消费传入的消息?或者有没有异步的方式让消费者异步消费消息而不需要等待它完成?欢迎任何建议。


“这会导致消息队列溢出吗?”

是的。 RabbitMQ 会进入“流量控制”状态,以防止随着队列长度的增加而消耗过多的内存。它还将开始将消息持久保存到磁盘,而不是将它们保存在内存中。

“那么我怎样才能加快消费者吞吐量,以便消费者 可以赶上生产者并避免消息溢出 队列”

您有 2 个选择:

  1. 添加更多消费者。请记住,如果您选择此选项,您的数据库现在将由多个并发进程操作。确保 DB 能够承受额外的压力。
  2. 增加QOS消费渠道的价值。这将从队列中提取更多消息并将其缓冲到消费者上。这会增加整体处理时间;如果缓冲了 5 条消息,则第 5 条消息将花费消息 1...5 的处理时间来完成。

“我应该在消费者部分使用多线程来加速 消费率?”

除非你有一个精心设计的解决方案。向应用程序添加并行性将会增加消费者端的大量开销。您最终可能会耗尽线程池或限制内存使用。

在处理 AMQP 时,您确实需要考虑每个流程的业务需求,以便设计最佳解决方案。您收到的消息对时间有多敏感?它们是否需要尽快保存到数据库,或者数据是否立即可用对您的用户来说重要吗?

如果数据不需要立即保留,您可以修改应用程序,以便使用者只需从队列中删除消息并将它们保存到缓存集合中,例如 Redis 中。引入第二个进程,然后按顺序读取并处理缓存的消息。这将确保您的队列长度不会增长到足以导致流量控制,同时防止您的数据库受到写入请求的轰炸,写入请求通常比读取请求更昂贵。您的消费者现在只需从队列中删除消息,稍后由另一个进程处理。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ:快速生产者和慢速消费者 的相关文章

  • 在内存中使用 byte[] 创建 zip 文件。 Zip 文件总是损坏

    我创建的 zip 文件有问题 我正在使用 Java 7 我尝试从字节数组创建一个 zip 文件 其中包含两个或多个 Excel 文件 应用程序始终完成 没有任何异常 所以 我以为一切都好 当我尝试打开 zip 文件后 Windows 7 出
  • Java 枚举与创建位掩码和检查权限的混淆

    我想将此 c 权限模块移植到 java 但是当我无法将数值保存在数据库中然后将其转换为枚举表示形式时 我很困惑如何执行此操作 在 C 中 我创建一个如下所示的枚举 public enum ArticlePermission CanRead
  • .properties 中的通配符

    是否存在任何方法 我可以将通配符添加到属性文件中 并且具有所有含义 例如a b c d lalalala 或为所有以结尾的内容设置一个正则表达式a b c anything 普通的 Java 属性文件无法处理这个问题 不 请记住 它实际上是
  • 异步填充数据集

    下面的方法用于填充数据集 如果我们以同步方式调用这个方法 它就可以正常工作 但现在我们需要以异步方式调用这个方法 那么我需要做哪些更改才能使下面的方法正常工作而不会出现任何问题 public DataSet Filldata string
  • 动态选择端口号?

    在 Java 中 我需要获取端口号以在同一程序的多个实例之间进行通信 现在 我可以简单地选择一些固定的数字并使用它 但我想知道是否有一种方法可以动态选择端口号 这样我就不必打扰我的用户设置端口号 这是我的一个想法 其工作原理如下 有一个固定
  • 如何使用assertEquals 和 Epsilon 在 JUnit 中断言两个双精度数?

    不推荐使用双打的assertEquals 我发现应该使用带有Epsilon的形式 这是因为双打不可能100 严格 但无论如何我需要比较两个双打 预期结果和实际结果 但我不知道该怎么做 目前我的测试如下 Test public void te
  • org.apache.hadoop.security.AccessControlException:客户端无法通过以下方式进行身份验证:[TOKEN,KERBEROS] 问题

    我正在使用 java 客户端通过 Kerberos 身份验证安全访问 HDFS 我尝试打字klist在服务器上 它显示已经存在的有效票证 我收到的异常是客户端无法通过以下方式进行身份验证 TOKEN KERBEROS 帮助将不胜感激 这是一
  • Spring AspectJ 在双代理接口时失败:无法生成类的 CGLIB 子类

    我正在使用Spring的
  • 过滤两次 Lambda Java

    我有一个清单如下 1 2 3 4 5 6 7 和 预期结果必须是 1 2 3 4 5 6 7 我知道怎么做才能到7点 我的结果 1 2 3 4 5 6 我也想知道如何输入 7 我添加了i gt i objList size 1到我的过滤器
  • 在 Jar 文件中运行 ANT build.xml 文件

    我需要使用存储在 jar 文件中的 build xml 文件运行 ANT 构建 该 jar 文件在类路径中可用 是否可以在不分解 jar 文件并将 build xml 保存到本地目录的情况下做到这一点 如果是的话我该怎么办呢 Update
  • 使用主题交换运行多个 Celery 任务

    我正在用 Celery 替换一些自制代码 但很难复制当前的行为 我期望的行为如下 创建新用户时 应向tasks与交换user created路由键 该消息应该触发两个 Celery 任务 即send user activate email
  • 检测并缩短字符串中的所有网址

    假设我有一条字符串消息 您应该将 file zip 上传到http google com extremelylonglink zip http google com extremelylonglink zip not https stack
  • 无法创建请求的服务[org.hibernate.engine.jdbc.env.spi.JdbcEnvironment]-MySQL

    我是 Hibernate 的新手 我目前正在使用 Spring boot 框架并尝试通过 hibernate 创建数据库表 我知道以前也问过同样的问题 但我似乎无法根据我的环境找出如何修复错误 休眠配置文件
  • java for windows 中的文件图标叠加

    我正在尝试像 Tortoise SVN 或 Dropbox 一样在文件和文件夹上实现图标叠加 我在网上查了很多资料 但没有找到Java的解决方案 Can anyone help me with this 很抱歉确认您的担忧 但这无法在 Ja
  • 如何使用 jUnit 将测试用例添加到套件中?

    我有 2 个测试类 都扩展了TestCase 每个类都包含一堆针对我的程序运行的单独测试 如何将这两个类 以及它们拥有的所有测试 作为同一套件的一部分执行 我正在使用 jUnit 4 8 在 jUnit4 中你有这样的东西 RunWith
  • Eclipse 启动时崩溃;退出代码=13

    I am trying to work with Eclipse Helios on my x64 machine Im pretty sure now that this problem could occur with any ecli
  • 我如何在java中读取二进制数据文件

    因此 我正在为学校做一个项目 我需要读取二进制数据文件并使用它来生成角色的统计数据 例如力量和智慧 它的设置是让前 8 位组成一个统计数据 我想知道执行此操作的实际语法是什么 是不是就像读文本文件一样 这样 File file new Fi
  • 使用 svn 1.8.x、subclise 1.10 的 m2e-subclipse 连接器在哪里?

    我读到 m2e 的生产商已经停止生产 svn 1 7 以外的任何版本的 m2e 连接器 Tigris 显然已经填补了维护 m2e subclipse 连接器的空缺 Q1 我的问题是 使用 svn 1 8 x 的 eclipse 更新 url
  • 如何防止在Spring Boot单元测试中执行import.sql

    我的类路径中有一个 import sql 文件 其中包含一些 INSERT 语句 当使用 profile devel 运行我的应用程序时 它的数据被加载到 postgres 数据库中 到目前为止一切正常 当使用测试配置文件执行测试时 imp
  • Spring Boot 无法更新 azure cosmos db(MongoDb) 上的分片集合

    我的数据库中存在一个集合 documentDev 其分片键为 dNumber 样本文件 id 12831221wadaee23 dNumber 115 processed false 如果我尝试使用以下命令通过任何查询工具更新此文档 db

随机推荐

  • 在 Mac OS X 中更改声音(或其他)系统偏好设置

    我希望能够在 Mac OS X 中切换声音输出源 而无需任何 GUI 交互 有一些工具可以控制声音输出 例如声源 http rogueamoeba com freebies and an applescript 打开首选项对话框 http
  • Django AttributeError:“DatabaseOperations”对象没有属性“select”

    我有一个连接到 PostGIS 数据库后端的 GeoDjango 实例 当我查询该数据库中的表时 标题中出现错误 AttributeError DatabaseOperations object has no attribute selec
  • 使用 reCAPTCHA 和 ajax....javascript 加载问题

    我试图在我的其中一个表单中实现 reCAPTCHA 但我使用 ajax 作为提交 更具体地说是 ajax updater 原型 一旦我提交并错误检查我的表单 我尝试加载 reCAPTCHA 小部件 在我更新的 div 元素中 它基本上只是调
  • 递归子集和函数

    我们的教授为我们的课程分享了以下有关递归的 Python 代码 这是 子集和 问题的解决方案 我一遍又一遍地阅读它 并尝试使用在线工具检查它并逐步遵循参数 但我根本不明白 我知道代码检查列表 L 的子集是否有可能使总和为 0 但我不明白该函
  • WPF 无法找到资源

    我有一个资源文件和两个视图 该视图使用资源文件
  • 如何检查自定义android是否已root?

    我们购买了一些定制的 Android 平板电脑 我们将把它们用作信息亭平板电脑 为了使我们的自助服务终端应用程序正常工作 该应用程序需要 root 访问权限 如何检查设备是否已正确 root 如果不是的话我该如何root它 表明设备已获得
  • 使用 Pow on Rails 4 进行遥控撬

    我正忙于 最终 升级到 Rails 4 并且遇到了 Pry remote 的问题 Problem 添加时binding remote pry对于我的代码 它会破坏代码 但是当我无法通过键入进入调试器时pry remote 这曾经在运行 Ra
  • 如何检查多维数组是否为空?

    基本上 我有一个多维数组 我需要检查它是否为空 我目前有一个if声明尝试这样做 if empty csv array My code goes here if the array is not empty 虽然 那if statement无
  • 在Android上解密“SunJCE”AES加密数据

    我们需要编写一些 Android 代码来解密从我们的服务器发送的一些数据 我们的服务器团队为我们提供了一些使用 SunJCE 提供程序的示例解密代码 遗憾的是 Android 上不存在该提供程序 Cipher cipher Cipher g
  • mingw C++ 无法编译 j0 函数

    我正在尝试使用 MingW msys2 在 Windows 上编译程序 但由于 j0 函数而失败 在Linux上编译没有问题 当我在编译器上使用 std c 11 标志时 它似乎很讨厌 如何正确编译并打开 std c 11 标志 示例代码
  • jar 内的文件对于 spring 不可见

    All 我创建了一个 jar 文件 其中包含以下 MANIFEST MF Manifest Version 1 0 Ant Version Apache Ant 1 8 3 Created By 1 6 0 25 b06 Sun Micro
  • 基于傅里叶变换创建 iPhone 音乐可视化工具

    我正在为 iPhone 设计一个音乐可视化应用程序 我想通过 iPhone 的麦克风采集数据 对其运行傅里叶变换 然后创建可视化来实现这一点 我能得到的最好的例子是奥里奥图奇 http developer apple com library
  • 从 Android 开始:Java 或 Python (SL4A)

    我刚刚订购了一部 Android 智能手机 并想开始尝试创建自己的应用程序 现在的问题是使用哪种语言 是原生 Java 还是使用 SL4A 以前的 ASE 的 Python 我倾向于使用 Python 因为我比 Java 更了解它 但我想知
  • 401 Unauthorized 与 403 Forbidden:当用户未登录时,哪个是正确的状态代码? [复制]

    这个问题在这里已经有答案了 经过大量谷歌搜索和 Stackoverflowing 后 我仍然不清楚 因为许多文章和问题 答案太笼统了 包括403 Forbidden 与 401 Unauthorized HTTP 响应 https stac
  • R ggplot:图例周围的线

    我正在尝试使用 ggplot2 进行数据绘图 出于纯粹肤浅的原因 我想在图例周围画一条线 以便更好地将其与绘图区分开来 即图例框周围的黑色轮廓 我在任何论坛上都找不到这个问题的答案 但也许你有一个提示 library ggplot2 Res
  • $setIsSubset 用于 Mongo 中的常规查询

    我想做相当于 setIsSubset http docs mongodb org manual reference operator aggregation setIsSubset http docs mongodb org manual
  • 混合模板函数重载和继承

    打印以下代码 generic overload 但我想要的是在这两种情况下都调用重载或专业化 而不是通用的 我并不是想将重载与模板专业化混合在一起 它们在一起是因为没有一个按我的预期工作 有什么模板魔法可以实现这一点吗 include
  • 当我使用 Validator.TryValidateObject 时验证不起作用

    DataAnnotations 不适用于好友类 以下代码始终验证 true 为什么 var isValid Validator TryValidateObject new Customer Context results true 这是好友
  • 添加自定义 DLL 搜索路径@应用程序启动

    我正在绞尽脑汁试图想出一个优雅的解决方案来解决 DLL 加载问题 我有一个应用程序静态链接到加载 DLL 的其他 lib 文件 我没有直接加载 DLL 我希望在可执行文件所在的文件夹之外的另一个文件夹中拥有一些 DLL 例如 working
  • RabbitMQ:快速生产者和慢速消费者

    我有一个应用程序 它使用 RabbitMQ 作为消息队列在两个组件 发送者和接收者 之间发送 接收消息 发送者以非常快的方式发送消息 接收方收到消息后会做一些非常耗时的工作 主要是数据量非常大的数据库写入 由于接收方需要很长时间才能完成任务