RocketMQ的顺序消息(顺序消费)

2023-11-04

简单介绍了消息有序性的概念,以及RocketMQ如何实现消息的顺序消费。

1 消息的有序性

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。

顺序消息分为全局顺序消息与部分顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

如果想要实现全局顺序消息,那么只能使用一个队列,以及单个生产者,这是会严重影响性能。
在这里插入图片描述

因此,我们常说的顺序消息通常是只的部分顺序消息,就上面的例子来说,我们不用管不同的订单ID的消息之间的总体消费顺序,只需要保证同样订单ID的消息能按照订单创建、订单付款、订单完成这个顺序消费就可以了。

顺序消费实际上有两个核心点,一个是生产者有序存储,另一个是消费者有序消费。

2 生产者有序发送

先看如何实现生产者有序存储。我们知道RocketMQ中生产者生产的消息会放置在某个队列中,基于队列先进先出的特性天然的可以保证存入队列的消息顺序和拉取的消息顺序是一致的,因此,我们只需要保证一组相同的消息按照给定的顺序存入同一个队列中,就能保证生产者有序存储。

普通发送消息的模式下,生产者会采用轮询的方式将消费均匀的分发到不同的队列中,然后被不同的消费者消费,因为一组消息在不同的队列,此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性了。

在这里插入图片描述

这个问题很好解决,因为RocketMQ支持生产者在投放消息的时候自定义投放策略,我们实现一个MessageQueueSelector接口,使用Hash取模法来保证同一个订单在同一个队列中就行了,即通过订单ID%队列数量得到该ID的订单所投放的队列在队列列表中的索引,然后该订单的所有消息都会被投放到这个队列中。

生产者发送消息的方法中就有一些添加队列选择器的方法,保证消息发送顺序。

比如只有两个队列,那么订单ID为1,2,3的三组消息中,1、3组消息存放于第一个队列,而2组消息存放于第二个队列,如下图是一种消息可能的消息存放顺序:
在这里插入图片描述

根据上图可以,上面的方法可以实现一组消息被顺序的存放,不同组的消息之间的顺序无法保证,这就是部分顺序。

另外,顺序消息必须使用同步发送的方式才能保证生产者发送的消息有序。

实际上,采用队列选择器的方法不能保证消息的严格顺序,我们的目的是将消息发送到同一个队列中,如果某个broker挂了,那么队列就会减少一部分,如果采用取余的方式投递,将可能导致同一个业务中的不同消息被发送到不同的队列中,导致同一个业务的不同消息被存入不同的队列中,短暂的造成部分消息无序。同样的,如果增加了服务器,那么也会造成短暂的造成部分消息无序。

3 消费者有序消费

生产者有序存储实现了,那么该如何实现消费者有序消费呢?RockerMQ的MessageListener回调函数提供了两种消费模式,有序消费模式MessageListenerOrderly和并发消费模式MessageListenerConcurrently。

在消费的时候,还需要保证消费者注册MessageListenerOrderly类型的回调接口实现顺序消费,如果消费者采用Concurrently并行消费,则仍然不能保证消息消费顺序。

实际上,每一个消费者的的消费端都是采用线程池实现多线程消费的模式,即消费端是多线程消费。虽然MessageListenerOrderly被称为有序消费模式,但是仍然是使用的线程池去消费消息。

MessageListenerConcurrently是拉取到新消息之后就提交到线程池去消费,而MessageListenerOrderly则是通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据。

即顺序消费模式使用3把锁来保证消费的顺序性:

  1. broker端的分布式锁:
    1. 在负载均衡的处理新分配队列的updateProcessQueueTableInRebalance方法,以及ConsumeMessageOrderlyService服务启动时的start方法中,都会尝试向broker申请当前消费者客户端分配到的messageQueue的分布式锁。
    2. broker端的分布式锁存储结构为ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>>,该分布式锁保证同一个consumerGroup下同一个messageQueue只会被分配给一个consumerClient。
    3. 获取到的broker端的分布式锁,在client端的表现形式为processQueue. locked属性为true,且该分布式锁在broker端默认60s过期,而在client端默认30s过期,因此ConsumeMessageOrderlyService#start会启动一个定时任务,每过20s向broker申请分布式锁,刷新过期时间。而负载均衡服务也是每20s进行一次负载均衡。
    4. broker端的分布式锁最先被获取到,如果没有获取到,那么在负载均衡的时候就不会创建processQueue了也不会提交对应的消费请求了。
  2. messageQueue的本地synchronized锁:
    1. 在执行消费任务的开头,便会获取该messageQueue的本地锁对象objLock,它是一个Object对象,然后通过synchronized实现锁定。
    2. 这个锁的锁对象存储在MessageQueueLock.mqLockTable属性中,结构为ConcurrentMap<MessageQueue, Object>,所以说,一个MessageQueue对应一个锁,不同的MessageQueue有不同的锁。
    3. 因为顺序消费也是通过线程池消费的,所以这个synchronized锁用来保证同一时刻对于同一个队列只有一个线程去消费它。
  3. ProcessQueue的本地consumeLock:
    1. 在获取到broker端的分布式锁以及messageQueue的本地synchronized锁的之后,在执行真正的消息消费的逻辑messageListener#consumeMessage之前,会获取ProcessQueue的consumeLock,这个本地锁是一个ReentrantLock。
    2. 那么这把锁有什么作用呢?
      1. 在负载均衡时,如果某个队列C被分配给了新的消费者,那么当前客户端消费者需要对该队列进行释放,它会调用removeUnnecessaryMessageQueue方法对该队列C请求broker端分布式锁的解锁。
      2. 而在请求broker分布式锁解锁的时候,一个重要的操作就是首先尝试获取这个messageQueue对应的ProcessQueue的本地consumeLock。只有获取了这个锁,才能尝试请求broker端对该messageQueue的分布式锁解锁。
      3. 如果consumeLock加锁失败,表示当前消息队列正在消息,不能解锁。那么本次就放弃解锁了,移除消息队列失败,只有等待下次重新分配消费队列时,再进行移除。
    3. 如果没有这把锁,假设该消息队列因为负载均衡而被分配给其他客户端B,但是由于客户端A正在对于拉取的一批消费消息进行消费,还没有提交消费点位,如果此时客户端A能够直接请求broker对该messageQueue解锁,这将导致客户端B获取该messageQueue的分布式锁,进而消费消息,而这些没有commit的消息将会发送重复消费。
    4. 所以说这把锁的作用,就是防止在消费消息的过程中,该消息队列因为发生负载均衡而被分配给其他客户端,进而导致的两个客户端重复消费消息的行为。

下面是消费消息的源码:

  1. RocketMQ源码(21)—ConsumeMessageConcurrentlyService并发消费消息源码
  2. RocketMQ源码(22)—ConsumeMessageOrderlyService顺序消费消息源码

目前来说,消费者使用MessageListenerOrderly顺序消费有个两个问题:

  1. 使用了很多的锁,降低了吞吐量。
  2. 前一个消息消费阻塞时后面消息都会被阻塞。如果遇到消费失败的消息,会自动对当前消息进行重试(每次间隔时间为1秒),无法自动跳过,重试最大次数是Integer.MAX_VALUE,这将导致当前队列消费暂停,因此通常需要设定有一个最大消费次数,以及处理好所有可能的异常情况。RocketMQ的消费者消息重试和生产者消息重投

相关文章:

RocketMQ

如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ的顺序消息(顺序消费) 的相关文章

随机推荐

  • QT 信号槽 lambda

    QCheckBox checkBox connect checkBox QCheckBox clicked this emit this gt slot check btn connect checkBox static cast
  • 通过AT指令访问SIM卡

    不借助读写卡工具 直接通过AT指令访问SIM卡 一 命令类型 1 AT CRSM 对于SIM卡 参考3GPP 51 011 对于USIM卡 参考ETSI TS 102 221 2 AT CSIM 参考3GPP TS 27 007 二 常用命
  • Mybatis学习笔记1:CRUD与配置解析

    文章目录 1 简介 1 1 什么是Mybatis 1 2 持久化 1 3 持久层 1 4 为什么需要Mybatis 2 第一个Mybatis程序 2 1 搭建环境 2 2 创建一个模块 2 3 编写代码 2 4 测试 2 5 作用域 Sco
  • FPGA自学之路2(计数器or6分频器(偶分频))

    先说一个今天碰到的modelsim报错 原因在于代码模块名和文件名不一致 改成一致就不报错了 下面是modelsim波形图时间间隔调整 时间间隔 一般调成ns 下图是设置波形图数字进制 b是二进制 d是十进制 h是十六进制 下图左边红线是加
  • spring boot 内部执行 spark submit

    最近项目要用到大数据 边学边搞项目 一个坑接着一个坑地踩 好在头铁 总能柳暗花明 spark submit 提交任务到 yarn 集群执行官方资料写的很清楚 如果是用脚本方式执行看看说明分分钟搞定的 可偏偏好死不死 设计的方案是在 spri
  • springboot怎么返回ajax,springbootajax查看直接能够返回查询页面

    Controller RequestMapping activity public class ActivityController javascript Autowired private ActivityService activity
  • 大数据技术——HBase安装配置&DDL&DML操作

    HBase文章目录 1 HBase 安装 1 1 HBase下载 1 2 HBase安装 1 3 HBase 初始配置 2 配置高可用并且群起集群 2 1 HBase高可用 2 2 群起集群 2 3 进入HBase客户端 3 DDL DML
  • C++搭配PCL在点云里画直线的实现

    一个很简单的小功能 在点云里画自己给定点之间的连线 我这里实现的是把二维的点云用一个正方体围起来 并等分为n n个格子 代码例子如下所示 我这里的是三维的点云 然后将z坐标置0了 定义四个边界点 水平的最大最小 竖直的最大最小 float
  • 给视频嵌入字幕的神器 MKVToolNix

    我们经常会下载一些视频是不带字幕的 也就是俗称的生肉 然后下载一些外挂字幕 其实我们可以自己把字幕压缩进视频里 我推荐一个 免费开源的给视频加字幕的软件 MKVToolNix 下载地址 https www fosshub com MKVTo
  • Druid数据库连接

    我们知道 应用系统最频繁 最主要的操作还是数据库的操作 所以数据库的性能和安全对于整个系统平台的重要性不言而喻 为了提高数据库性能 我们可以使用数据库连接池 有时候我们需要增加一些列的日志或是数据库性能监控工具来确保数据库的性能 同时还得防
  • Qt信号与槽执行频繁导致程序崩溃

    问题情景 使用Qt编写运动控制上位机时 需要读取IO端口信号 并根据读取到的信号将传感器的状态显示在主界面 由于没办法读取到电平的上升沿或下降沿变化 所以只能在循环内一直读取IO的电平 而IO端口数量比较多 大概有30个 如果读到电平后就发
  • Android Studio中如何调整代码字体大小

    Android Studio中如何调整代码字体大小 Android Studio是一款广泛使用的集成开发环境 IDE 用于开发Android应用程序 在进行代码编写时 对于不同的开发者来说 合适的代码字体大小是非常重要的 因此 本文将介绍如
  • Android首次打开APP引导层

    推荐一个好用小巧的Android引导蒙版 浮层 库
  • 【C++11】智能指针的定义 和 种类 及 使用

    智能指针 定义 为什么需要智能指针 在C 中 动态分配内存是一项常见的任务 但手动管理分配和释放内存可能会导致很多问题 如内存泄漏 悬垂指针以及多次释放同一块内存等 为了避免这些问题 引入了智能指针的概念 它们提供了自动化的内存管理 以下是
  • PLC软元件

    1 概要 最近刚好接触到PLC 将最近了解到的记录下来 PLC的主要编程语言就是梯形图以及指令表 还有一种不是很常用的SFC编程 这些编程方式主要体现在逻辑控制上 是将底层C语言编程方式进行图形指令封装 因此PLC又叫可编程逻辑控制器 说到
  • vue前端服务器端口_Vue前后端不同端口实现方案

    前浏围开幸 业来很广例量站标闪择以近览着发端Vue 8080端口 后端Node js 8085端口 主要记录下前后端不同端口遇到的问友持都发很秀框事 应编差里互是过是来本商理类了如则处果 展 字到中图各近圈就不这多发架件大用程题 1享一多很
  • el-table高度适应外围容器,ag-gride高度适应外围容器

    1 1 el table
  • vue 使用自定义字体

    1 1 引入字体到 Vue 项目 1 1 1 创建字体文件夹 在 static 文件夹下创建 font 文件夹 并将下载好的字体拷贝到文件夹下 1 1 2 创建字体样式文件 CSS font face font family numberF
  • ddos云服务器防御,腾讯云服务器被ddos如何防御?

    免安装 绑定ip即可使用 零维护 ddos是什么 举例来说 我开了一家饭店 正常情况可以满足100人就餐需求 你进来就能找到位置并且很快能吃上饭 很不幸 我得罪地痞流氓 对方叫来300人涌进餐厅 叫着马上上菜 但是餐厅的容量只有100人 根
  • RocketMQ的顺序消息(顺序消费)

    简单介绍了消息有序性的概念 以及RocketMQ如何实现消息的顺序消费 文章目录 1 消息的有序性 2 生产者有序发送 3 消费者有序消费 1 消息的有序性 消息有序指的是一类消息消费时 能按照发送的顺序来消费 例如 一个订单产生了三条消息