Rabbitmq消息的有序性、消息不丢失、不被重复消费

2023-11-20

如何保证消息的顺序性

如图所示,RabbitMQ保证消息的顺序性,就是拆分多个 queue,每个 queue 对应一个 consumer(消费者),就是多一些 queue 而已,确实是麻烦点;

或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

如何保证消息不丢失?

1,生产者发送消息至MQ的数据丢失

解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,

然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了

Channel channel = connection.createChannel();
channel.confirmSelect();      // 开启confirm模式

2,MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中。

rabbitmq的持久化分为队列持久化、消息持久化和交换器持久化。一般三个都要设置持久化

3,消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完

解决方式:用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以手动ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

如何保证消息不被重复消费? (幂等性)

方式1: 消息全局 ID 或者写个唯一标识(如时间戳、UUID 等) :每次消费消息之前根据消息 id 去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局 ID 作为数据库表的主键,防止重复)

方式2: 利用 Redis 的 setnx 命令:给消息分配一个全局 ID,消费该消息时,先去 Redis 中查询有没消费记录,无则以键值对形式写入 Redis ,有则不消费该消息。

死信队列

生产者生产一条1分钟后超时的订单信息到正常交换机exchange-a中,消息匹配到队列queue-a,但一分钟后仍未消费。

消息会被投递到死信交换机dlx-exchange中,并发送到私信队列中。

死信队列dlx-queue的消费者拿到信息后,根据消息去查询订单的状态,如果仍然是未支付状态,将订单状态更新为超时状态。

如何处理消息堆积情况?

场景题:几千万条数据在MQ里积压了七八个小时。

1、出现该问题的原因:

消息堆积往往是生产者的生产速度与消费者的消费速度不匹配导致的。有可能就是消费者消费能力弱,渐渐地消息就积压了,也有可能是因为消息消费失败反复复重试造成的,也有可能是消费端出了问题,导致不消费了或者消费极其慢。比如,消费端每次消费之后要写mysql,结果mysql挂了,消费端hang住了不动了,或者消费者本地依赖的一个东西挂了,导致消费者挂了。

所以如果是 bug 则处理 bug;如果是因为本身消费能力较弱,则优化消费逻辑,比如优化前是一条一条消息消费处理的,那么就可以批量处理进行优化。

2、临时扩容,快速处理积压的消息:

(1)先修复 consumer 的问题,确保其恢复消费速度,然后将现有的 consumer 都停掉;

(2)临时创建原先 N 倍数量的 queue ,然后写一个临时分发数据的消费者程序,将该程序部署上去消费队列中积压的数据,消费之后不做任何耗时处理,直接均匀轮询写入临时建立好的 N 倍数量的 queue 中;

(3)接着,临时征用 N 倍的机器来部署 consumer,每个 consumer 消费一个临时 queue 的数据

(4)等快速消费完积压数据之后,恢复原先部署架构 ,重新用原先的 consumer 机器消费消息。

这种做法相当于临时将 queue 资源和 consumer 资源扩大 N 倍,以正常 N 倍速度消费。

4、MQ长时间未处理导致MQ写满的情况如何处理:

如果消息积压在MQ里,并且长时间都没处理掉,导致MQ都快写满了,这种情况肯定是临时扩容方案执行太慢,这种时候只好采用 “丢弃+批量重导” 的方式来解决了。首先,临时写个程序,连接到mq里面消费数据,消费一个丢弃一个,快速消费掉积压的消息,降低MQ的压力,然后在流量低峰期时去手动查询重导丢失的这部分数据

5 短时间内无法扩容或者扩容无法完全解决问题的话:

可以尝试降低一些非核心业务的消息处理,其次可以通过监控排查,优化消费者端的业务代码或者查看是否存在一些消息被重复消息的情况。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rabbitmq消息的有序性、消息不丢失、不被重复消费 的相关文章

  • 谁能告诉我 python 中的 pika 和 kombu 消息传递库有什么区别?

    我想在我的应用程序中使用消息传递库与rabbitmq交互 谁能解释一下 pika 和 kombu 库之间的区别吗 Kombu 和 pika 是两个不同的 Python 库 它们从根本上服务于相同的目的 向消息代理发布消息和使用消息代理发送消
  • 在 RabbitMQ 监听器中隐藏运行时异常

    在某些故意发生的情况下 我使用了一些异常来拒绝消息 但在控制台中显示了乍一看似乎不太正常的异常 如何在登录控制台 文件时隐藏该特定异常 我正在使用 spring boot 和默认记录器 public static class Undispa
  • 如何在nodejs中验证rabbitmq?

    错误 握手被服务器终止 403 ACCESS REFUSED 消息 ACCESS REFUSED 使用身份验证拒绝登录 旋转机制平原 有关详细信息 请参阅代理日志文件 我单独尝试了 authMechanism PLAIN AMQPLAIN
  • RabbitMQ C# API:如何检查绑定是否存在?

    使用 RabbitMQ C API 我如何检查给定队列到给定交换是否存在绑定 很多 RabbitMQ 调用都是幂等的 所以有些人可能会说在这些情况下检查是不必要的 但我认为它们在测试中很有用 您可以使用他们的 REST API 来调用并查看
  • RabbitMQ - 升级到新版本并收到很多“PRECONDITION_FAILED Unknown Delivery Tag 1”

    刚刚升级到新版本的 RabbitMQ 2 3 1 现在出现以下错误 PRECONDITION FAILED unknown delivery tag 1 随后通道关闭 这适用于较旧的 RabbitMQ 无需客户端更改 在应用程序行为方面 当
  • MongoDB 架构设计 - 实时聊天

    我正在启动一个项目 我认为该项目特别适合 MongoDB 因为它提供的速度和可扩展性 我目前感兴趣的模块是与实时聊天有关的 如果我要在传统的 RDBMS 中执行此操作 我会将其分为 频道 一个频道有很多用户 用户 一个用户有一个频道但有多条
  • 如何使用 Celery、RabbitMQ 和 Django 确保每个用户的任务执行顺序?

    我正在运行 Django Celery 和 RabbitMQ 我想要实现的是确保与一个用户相关的任务按顺序执行 具体来说 一次执行一个 我不希望每个用户执行任务并发 每当为用户添加新任务时 它应该取决于最近添加的任务 如果此类型的任务已为此
  • AMQPRuntimeException:读取数据时出错。收到 0 而不是预期的 7 字节

    它曾经有效 但现在不再有效了 我正在使用 php amqplib 和 RabbitMQ 当我尝试创建新的 AMQP 连接时 connection new AMQPConnection localhost 5672 username pass
  • RabbitMQ 管理插件窗口呈现为空白页面

    I have installed Erlang RabbitMQ and configured the management plugin as per the instructions on the website https www r
  • Amazon EC2 实例上和本地的 RabbitMQ?

    是否可以设置一个RabbitMQ服务器上的Amazon EC2 instance 并将我办公室的机器连接到此RabbitMQ服务器并向其发送 接收消息 我会被收取费用吗Amazon对于流入 流出我的带宽 消息RabbitMQ EC2 ins
  • RabbitMQ:如何创建和恢复备份

    我是 RabbitMQ 的新手 我需要一些帮助 如何备份和恢复到RabbitMQ 以及我需要保存哪些重要数据 谢谢 如果您安装了管理插件 您可以在Overview页 在底部你会看到导入 导出定义您可以使用它来下载代理的 JSON 表示形式
  • 生产者/消费者的不同语言

    我想知道是否可以通过 AMQP 和 RabbitMQ 对生产者和消费者使用不同的语言 例如 Java 代表生产者 python php 代表消费者 或者反之亦然 是的 AMQP 与语言无关 这意味着只要您有可以连接到 AMQP 的客户端sa
  • RabbitMQ - 无法联系统计数据库。消息速率和队列长度将不会显示

    我已经设置了一个兔子经纪人集群 并且在管理门户插件中我收到以下消息 无法联系统计数据库 消息速率和队列长度将不会显示 我已经搜索过这个错误 但谷歌并不友善 任何人都可以阐明这一点吗 我最近在旧安装的RabbitMQ 2 8 7 上遇到了同样
  • 如何使用 Java 在 RabbitMQ 中实现标头交换?

    我是一个新手 试图在java客户端中实现标头交换 我知道这就是 x match 绑定参数的用途 当 x match 参数设置为 any 时 只需一个匹配的标头值就足够了 或者 将 x match 设置为 all 强制所有值必须匹配 但任何人
  • 从 Java/Spring 检索 RabbitMQ 队列中未确认消息的数量

    有没有办法返回未确认的消息数 我正在使用此代码来获取队列中的消息数 DeclareOk declareOk amqpAdmin getRabbitTemplate execute new ChannelCallback
  • RabbitMQ 中 Pub/Sub 与工作队列的混合

    我正在评估使用 RabbitMQ 作为消息队列 消息总线 并一直在查看示例教程 https www rabbitmq com getstarted html在 RabbitMQ 页面上 我正在寻找教程中未涵盖的特定场景 并且我不确定是否以及
  • Django Celery 和多个数据库(Celery、Django 和 RabbitMQ)

    是否可以设置与 Django Celery 一起使用的不同数据库 我有一个配置了多个数据库的项目 并且不希望 Django Celery 使用默认数据库 如果我仍然可以使用 django celery 管理页面并读取存储在这个不同数据库中的
  • 如何停止本地主机上的 RabbitMQ 服务器

    我在 OS X 上安装了 RabbitMQ 服务器 并在命令行上启动它 现在 我应该如何阻止它运行还不清楚 我这样做之后 sudo rabbitmq server detached I get Activating RabbitMQ plu
  • RabbitMQ + Windows + LDAP 无需发送密码

    我正在尝试在 Windows 7 上使用 RabbitMQ 3 6 2 进行 LDAP 身份验证 授权 我已经在应用程序发送用户名 密码的情况下进行了基本身份验证 但密码位于我需要弄清楚如何进行的代码中避免 有没有人在不提供密码的情况下成功
  • Rabbitmq 忽略 Ubuntu 12 上的配置

    I have rabbitmq server从系统包安装乌班图12 无论我做什么 它似乎都会忽略任何配置文件 网络上的所有内容都表明服务器正在寻找 etc rabbitmq rabbitmq conf 但即使我创建该文件 服务器也报告没有配

随机推荐

  • 如何在 Ubuntu 22.04 上使用 uWSGI 和 Nginx 为 Flask 应用程序提供服务

    介绍 在本指南中 您将使用FlaskUbuntu 22 04 上的微框架 本文的大部分内容将讨论如何设置uWSGI应用服务器以及如何启动应用程序和配置Nginx充当前端反向代理 先决条件 在开始本指南之前 您应该 安装了 Ubuntu 22
  • 如何使用 Nginx、Let's Encrypt 和 Docker Compose 保护容器化 Node.js 应用程序

    介绍 有多种方法可以增强您的灵活性和安全性Node js应用 用一个反向代理 like Nginx为您提供负载平衡请求 缓存静态内容和实施传输层安全 TLS 在服务器上启用加密 HTTPS 可确保与应用程序之间的通信保持安全 在容器上使用
  • Python 中的 numpy.append()

    Python numpyappend 函数用于合并两个数组 该函数返回一个新数组 原数组保持不变 NumPy append 语法 函数语法为 numpy append arr values axis None The arr可以是类似数组的
  • 如何使用 Apache 和 Nginx 创建临时和永久重定向

    介绍 HTTP 重定向或 URL 重定向是一种将一个域或地址指向另一个域或地址的技术 重定向有很多用途 并且需要考虑几种不同类型的重定向 每当站点需要将请求一个地址的人定向到另一个地址时 就会使用重定向 当您创建内容和管理服务器时 您经常会
  • eclipse.ini vm 参数 - eclipse.ini 文件位置 Mac、Windows

    eclipse ini是用于控制Eclipse启动的配置文件 我们可以使用 Xms Xmx 参数配置 Eclipse VM 参数 例如要使用的 JDK eclipse ini vm permgen 空间 最大和最小堆大小 eclipse i
  • 如何在 Java 中将字符串转换为数组

    有时我们不得不分开字符串到数组基于分隔符或某些正则表达式 例如 读取 CSV 文件行并解析它们以将所有数据获取到字符串数组 在本教程中 我们将学习如何在 Java 程序中将字符串转换为数组 Java 中的字符串到数组 字符串类split S
  • Java优先级队列

    我们时不时地需要以特定的顺序处理队列中的项目 优先级队列是一种完成这项工作的数据结构 Java优先级队列与 普通 不同queue 它不是 先进先出 而是按优先级顺序检索项目 Java优先级队列 The java util PriorityQ
  • Java 中的 CallableStatement 示例

    java中的CallableStatement用于从java程序中调用存储过程 存储过程是我们在数据库中为某些任务编译的一组语句 当我们处理复杂场景的多个表时 存储过程很有用 我们可以将所需的数据发送到存储过程 并在数据库服务器本身中执行逻
  • Docker 详解:如何容器化并使用 Nginx 作为代理

    Status 已弃用 本文已弃用 不再维护 Reason 本文中的技术已经过时 可能不再反映 Docker 最佳实践 请参阅 Docker 生态系统 常用组件简介 如何在 Ubuntu 14 04 上的 Docker 容器中运行 Nginx
  • 如何在 Ruby 中使用字符串方法

    介绍 Ruby strings有许多内置方法可以轻松修改和操作文本 这是许多程序中的常见任务 在本教程中 您将使用字符串方法来确定字符串的长度 索引和拆分字符串以提取子字符串 添加和删除空格和其他字符 更改字符串中字符的大小写以及查找和替换
  • Python 字符串包含

    Python String 类有 contains 我们可以使用它来检查它是否包含另一个字符串的函数 Python 字符串包含 Python字符串 contains 是一个实例方法 根据字符串对象是否包含指定的字符串对象 返回布尔值 Tru
  • 如何在 Ubuntu 14.04 上使用 Docker 和 Docker Compose 配置持续集成测试环境

    文章来自Docker 介绍 持续集成 CI 是指开发人员的实践整合尽可能频繁地编写代码 并且每次提交在合并到共享存储库之前和之后都会经过测试自动构建 CI speeds up your development process and min
  • Android BroadcastReceiver 示例教程

    今天我们将讨论并实现Android BroadcastReceiver 它是Android Framework中非常重要的组件 Android 广播接收器 Android BroadcastReceiver是android的一个休眠组件 用
  • 蟒蛇集

    In this tutorial we are going to learn Python Set In our previous article we learnt about Python String You can learn it
  • 如何在 Ubuntu 12.10 上使用 Python 创建 Nagios 插件

    介绍 Python 是 Linux 上默认提供的流行命令处理器 我们之前已经介绍过如何在Ubuntu 12 10 x64上安装Nagios监控服务器 这次 我们将扩展这个想法并使用 Python 创建 Nagios 插件 这些插件将在客户端
  • 如何编辑 Sudoers 文件

    介绍 权限分离是 Linux 和类 Unix 操作系统中实现的基本安全范例之一 普通用户以有限的权限进行操作 以减少对自己环境 而不是更广泛的操作系统 的影响范围 一个特殊的用户 称为root has 超级用户特权 这是一个管理帐户 没有普
  • JSTL 教程、JSTL 标签示例

    JSTL 代表JSP 标准标签库 JSTL 是标准标记库 它提供标记来控制 JSP 页面行为 JSTL 标签可用于迭代和控制语句 国际化 SQL 等 我们将在本 JSTL 教程中详细研究 JSTL 标签 之前我们看到了如何使用JSP EL
  • 了解 Python 3 中的类继承

    介绍 面向对象编程创建可重用的代码模式 以减少开发项目中的冗余 面向对象编程实现可回收代码的一种方法是通过继承 此时一个子类可以利用另一个基类的代码 本教程将介绍 Python 中继承的一些主要方面 包括父类和子类如何工作 如何重写方法和属
  • 电商java 面试题_JAVA电商项目面试题(一)

    需要按照功能点把系统拆分 拆分成独立的功能 单独为某一个节点添加服务器 需要系统之间配合才能完成整个业务逻辑 叫做分布式 集群 同一个工程部署到多台服务器上 优点 1 把模块拆分 使用接口通信 降低模块之间的耦合度 2 把项目拆分成若干个子
  • Rabbitmq消息的有序性、消息不丢失、不被重复消费

    如何保证消息的顺序性 如图所示 RabbitMQ保证消息的顺序性 就是拆分多个 queue 每个 queue 对应一个 consumer 消费者 就是多一些 queue 而已 确实是麻烦点 或者就一个 queue 但是对应一个 consum