我有一个应用程序,它使用 RabbitMQ 作为消息队列在两个组件(发送者和接收者)之间发送/接收消息。发送者以非常快的方式发送消息。接收方收到消息后会做一些非常耗时的工作(主要是数据量非常大的数据库写入)。由于接收方需要很长时间才能完成任务然后检索队列中的下一条消息,因此发送方将快速填满队列。所以我的问题是:这会导致消息队列溢出吗?
消息消费者如下所示:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
消费者收到的每条消息都包含一个 caseID。对于每个caseID,它都会将大量数据保存到数据库中,这需要很长时间。目前,RabbitMQ 只设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅 caseID。那么如何加快消费者吞吐量,让消费者能够赶上生产者,避免队列中的消息溢出呢?消费部分是否应该使用多线程来加快消费速度?或者我应该使用多个消费者同时消费传入的消息?或者有没有异步的方式让消费者异步消费消息而不需要等待它完成?欢迎任何建议。
“这会导致消息队列溢出吗?”
是的。 RabbitMQ 会进入“流量控制”状态,以防止随着队列长度的增加而消耗过多的内存。它还将开始将消息持久保存到磁盘,而不是将它们保存在内存中。
“那么我怎样才能加快消费者吞吐量,以便消费者
可以赶上生产者并避免消息溢出
队列”
您有 2 个选择:
- 添加更多消费者。请记住,如果您选择此选项,您的数据库现在将由多个并发进程操作。确保 DB 能够承受额外的压力。
- 增加QOS消费渠道的价值。这将从队列中提取更多消息并将其缓冲到消费者上。这会增加整体处理时间;如果缓冲了 5 条消息,则第 5 条消息将花费消息 1...5 的处理时间来完成。
“我应该在消费者部分使用多线程来加速
消费率?”
除非你有一个精心设计的解决方案。向应用程序添加并行性将会增加消费者端的大量开销。您最终可能会耗尽线程池或限制内存使用。
在处理 AMQP 时,您确实需要考虑每个流程的业务需求,以便设计最佳解决方案。您收到的消息对时间有多敏感?它们是否需要尽快保存到数据库,或者数据是否立即可用对您的用户来说重要吗?
如果数据不需要立即保留,您可以修改应用程序,以便使用者只需从队列中删除消息并将它们保存到缓存集合中,例如 Redis 中。引入第二个进程,然后按顺序读取并处理缓存的消息。这将确保您的队列长度不会增长到足以导致流量控制,同时防止您的数据库受到写入请求的轰炸,写入请求通常比读取请求更昂贵。您的消费者现在只需从队列中删除消息,稍后由另一个进程处理。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)