RocketMQ 用法详解,你学会了吗?

2023-11-14

大家好,我是指北君。

消息中间件是我们工作中使用最频繁的一类中间件,它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。今天,指北君就来详细讲讲RocketMQ生产者和消费者在使用时的一些注意事项。

一. 生产者

1.1 发送消息注意事项

1)消息大小

建议消息大小不要超过512K。

2)异步发送

默认的发送为同步发送,send方法会一直阻塞,等待broker端的响应。如果你关注性能问题,可以通过send(msg, callback)来发起异步调用。

3)生产者组

正常情况下生产者组是没有作用的,但是在发送事务消息时,如果producer中途意外宕机,broker会主动回调producer group 内的任意一台机器来确认事务的状态。(目前开源版本还不支持事务消息)。

4)线程安全问题

生产者实例是线程安全的,在应用中只需要实例化一次即可。

5)性能问题

如果你希望在一个jvm进程内使用多个producer实例来提高发送能,我们建议:

使用异步发送,并且producer实例只需要3 ~ 5个即可 对每一个producer 调用 setInstanceName,区别不同的生产者。

6)发送超时时间

当客户端向broker发送请求超时时,客户端会抛出 RemotingTimeoutException,默认的超时时间是3秒。通过调用send(msg, timeout) 可以设置超时时间。超时时间建议不要设置过小,因为 broker 可能需要时间刷盘或向 slave 同步数据。

7)对于同一个应用最好只使用一个Topic,消息的子类型可以使用 tags 来标识,tags 可以由应用自由设置。当发送的消息设置了 tags 时,消费方在订阅消息时可以使用 tags 在 broker 做消息过滤。注意这里的命名虽然是复数,但是一条消息只能有一个tag。

8)消息在业务层面的唯一标识可以设置到 keys 字段,方便根据 keys 来定位消息。broker 会为每个消息创建索引(哈希索引),应用可以通过topic 、key 查询这条消息的内容(MessageExt),以及消息被谁消费(MessageTrack,精确到consumer group)。由于是哈希索引,请尽量保证key 的唯一,这样可以避免潜在的哈希冲突。

9)消息发送不管是成功还是失败都要打印消息日志,日志内容务必包含 sendResult 和 key 字段。

10)对于消息不可丢失的应用,务必要有消息重发机制。例如如果消息发送失败,可以将消息存储到数据库,然后通过定时程序或者人工的方式触发重发。

11)调用send 同步发送消息时,假定此时设置了 isWaitStoreMsgOK=true(default is true),只要不抛出异常就代表发送成功,但当 isWaitStoreMsgOK = false 时,发送永远返回 SEND_OK。但是对于发送“成功”会有多个状态,在 SendStatus 中定义如下:

FLUSH_DISK_TIMEOUT

如果 broker 设置的 FlushDiskType = SYNC_FLUSH,当 broker 的在刷盘超时时(MessageStoreConfig.syncFlushTimeout,默认5秒)会返回该状态。此时消息任然保存在内存中,只有broker 宕机时消息才会丢失。

FLUSH_SLAVE_TIMEOU

如果 broker 的 role 是 SYNC_MASTER,当 slave 同步数据的时间超过了 MessageStoreConfig.syncFlushTimeout (默认5秒) 时会返回此状态。此时只有主从都宕机,并且主也没有刷盘时,消息才会丢失。

SLAVE_NOT_AVAILABLE

如果 broker 的 role 是 SYNC_MASTER,并且此时 slave 不可用时会返回该状态。

SEND_OK

发送成功。为了保证消息不丢失还需要配置 SYNC_MASTER or SYNC_FLUSH。

12)消息重复

当发送消息时返回 FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT,若非常不幸的 broker 也宕机了,消息将会丢失。此时如果什么都不做,消息可能会丢失,如果重发消息,消息可能会出现重复。

通常我们建议发送端重发消息,由消费方来保证消息消费的幂等性。

1.2 消息发送失败如何处理

Producer 的 send 方法本生支持内部重试,重试逻辑如下:

至多重试3次 如果发送失败,则轮转到下一个broker 这个方法的总耗时时间不超过 sendMsgTimeout,默认3秒 所以发送消息已经产生超时异常的话就不会再重试。以上策略仍不能保证消息发送一定成功,为保证消息发送一定成功,建议应用这么做:如果调用 send 同步发送失败,则尝试将消息存储到db,由后台线程定时重试,保证消息一定到达 Broker。

1.3 oneway 的发送形式

对于可靠性要求不高的应用,可以采用 oneway 的发送形式,oneway 形式不等待应答。

1.4 发送顺序消息

顺序消息分为分区有序和全局有序。

分区有序要求 producer 在send 时传入 MessageQueueSelector 的实现类,最终将某一类消息发送到同一队列。但是一旦发生通信异常、broker 重启等,由于队列总数发生变化,哈希取模后定位的队列会变化,会产生短暂的顺序不一致。如果业务能容忍在集群异常情况下(如某个 broker 宕机或者重启)消息短暂的乱序,使用分区有序比较合适。

全局严格有序的消息即便在异常情况下也能保证消息的有序性,但是却牺牲了分布式的 failover 特性,即 broker 集群中只有要一台机器不可用,则整个集群都不可用,服务可用性会大大降低。

顺序消息的缺点:

发送顺序消息无法利用集群的 FailOver 特性 消费顺序消息的并行度依赖于队列数量 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消费堆积问题 遇到消费失败的消息,无法跳过,当前队列需要暂停 5.发送事务消息 目前暂不支持。

二. 消费者

2.1 消费者组和订阅

不同的消费者组可以独立消费相同的topic,这点类似于ActiveMQ的虚拟 topic. 另外对于相同的消费者组,需要确保组内的消费者订阅消息的规则是一致的!

MQ 里的一个Consumer Group 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个 Consumer Group 下通常会挂载多个 Consumer 实例。订阅关系一致指的是同一个 Consumer Group 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

由于 MQ 的订阅关系主要由 Topic+Tag 共同组成,因此,保持订阅关系一致意味着同一个 Consumer Group 下所有的实例需在以下两方面均保持一致:

订阅的 Topic 必须一致;订阅的 Topic 中的 Tag 必须一致。

技术架构 > Consumer 最佳实践 > image2017-11-15 15:50:13.png

2.2 MessageListener

1)顺序消费 MessageListenerOrderly

顺序消费时消费者会锁定队列,以确保消息被顺序消费,但是这样也会造成一定的性能损耗。当消费出现异常的时候,建议不要抛出异常,而是返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,让消费暂停一会,暂停时间由 context.setSuspendCurrentQueueTimeMillis 方法指定。

2)并发消费

并发消费是推荐的消费方式,在此种模式下,消息将被并发的消费。消费出现异常时不建议抛出异常,只需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可。为了保证消息肯定被至少消费一次,消息将会被重发回 broker (topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置,通过 delayLevelWhenNextConsume 和 MessageStoreConfig.messageDelayLevel 设置)后,再次投递到这个 ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认是16次,DefaultMQPushConsumer.maxReconsumeTimes),就会投递到DLQ队列。应用可以监控死信队列来做人工干预。

3)返回状态

在并行消费时可以通过返回 RECONSUME_LATER 来告诉 Consumer 当前无法消费该消息,等延时一段时间再重新消费,但是此时消费不会停止,你可以继续消费其他消息。但在顺序消费时,因为要保证消费的顺序性,所以你不能跳过失败的消息,此时你可以通过返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 来告诉 Consumer 先暂停一会。

4)阻塞

不建议阻塞Listener,因为这会阻塞住线程池,同时也有可能造成消费者线程终止。

2.3 线程数

consumer 内部通过一个 ThreadPoolExecutor 来消费消息,可以通过 setConsumeThreadMin 和 setConsumeThreadMax 来改变线程池的大小。

2.4 ConsumeFromWhere

当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。

如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息。

CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍。

CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 注意:这些配置只对全新的消费组有效,老的消费组都是按已经存储过的消费进度继续消费。

对于老消费组想跳过历史消息可以采用以下几种方法:

1)判断消息的发送时间,太老的消息直接返回 CONSUME_SUCCESS。

2)判断消息的 offset 和 MAX_OFFSET 的差距,如果落后太多,可以直接。返回 CONSUME_SUCCESS。

3)消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用命令 resetOffsetByTimeStamp,详见 ResetOffsetByTimeCommand.java。

2.5 消息幂等

由于 RocketMQ 无法避免消费重复,所以如果业务对消息重复非常敏感,务必在业务层面去重。

2.6 消费速度慢处理方式

1)提高消费并行度

大部分消息消费行为都属于 IO 密集型业务,适当的提高并发度可以显著的改善消费的吞吐量。

2)批量方式消费

默认情况下 consumer 的 consumeMessageBatchMaxSize 为1,即一次只消费一个消息,如果应用可以批量消费消息,则可以很大程度上提高消费吞吐量。

3)跳过非重要消息

当消堆积严重时可以丢弃不重要的消息。

4)优化消息消费过程

2.7 打印消费日志

建议在消费入口方法打印消息,方便后续排查问题,消费失败时也打印失败日志。

2.8 利用broker过滤消息,避免多余的消息传输

三. 小结

好了,RocketMQ生产者与消费者的使用事项就总结完毕了,相信大家对RocketMQ的使用应该会更有信心了。

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ 用法详解,你学会了吗? 的相关文章

  • 如何迭代所有注册表项?

    我正在尝试迭代所有注册表项以查找 包含 并删除 jre1 5 0 14 值 有办法做到吗 下面的代码只是在特定键下找到jre1 5 0 14 我确实想迭代所有的键 顺便说一句 if 子句获取是否等于 jre1 5 0 14 但如果它包含 j
  • JavaFX Platform.runLater 的使用以及从不同线程访问 UI

    我有几个问题Platform runLater 我有一个 JavaFX 应用程序类 在这个类中 我运行一个线程 该线程从网络套接字读取数据 现在当我创建一个新的Stage在线程内部 系统抛出异常 JavaFX 事件调度程序线程和我的网络读取
  • 使用 jpql 和 jpa 从日期字段中提取年份

    我想从数据库中的一行中提取年份部分 以便将其与值进行比较 这是我的功能 public List
  • 如何测试调用父类的受保护(不需要的)方法的方法?

    我陷入了一个非常奇怪的情况 我有一些需要测试的特定代码 这里是 public class A The real method of real class is so big that I just don t want to test it
  • Spring-boot中将redis-cache反序列化为对象的问题

    我在 Client 类中使用 JsonNode 来处理 MySQL 8 数据库中 JSON 类型的字段 即使对于 API 请求 它也能很好地工作 但是当我使用 Redis 启用缓存 我确实需要它 时 我注意到 Redis 无法序列化 Jso
  • 使用除 SINGLE_TABLE 之外的任何其他 Hibernate 继承策略时 JVM 崩溃

    好吧 这可能不太可能 但还是这样吧 在Java JRE 1 6 0 26 b03 中我有两个类 SuperControl及其子类SubControl 它们都需要是持久对象 我正在使用 Hibernate Annotations 来实现这一点
  • 在 Hibernate 中创建 UPDATE RETURNING 查询

    在 Oracle 中 我们可以创建一个更新查询 该查询将使用 RETURNING 子句返回更新的记录 Hibernate中有类似的功能吗 除了数据库生成的值之外 Hibernate 显然不需要返回更新的实例 因为对象传递给Session s
  • 用 java 编写解释器时的 switch 或 if 语句

    当前的作业需要我编写一个程序 以一种非常微小且基本的编程语言 行为有点像 FORTRAN 来读取包含指令的文件并执行这些指令 基本上它是我猜的语言的简单解释器 它是完全线性的 所有语句都是按顺序定义的 并且只有字符串和整数变量 我需要查找和
  • 从关卡堆栈中获取相对比例的数学

    为这个可怕的标题道歉 我花了 10 分钟试图用一句话来解释这一点 但失败了 虽然提示这个问题的应用程序是用Java Android 编写的 但我认为它非常通用并且适用于任何语言 欢迎使用伪代码 或简单的英语 回复 我不确定是否应该标记所有通
  • 不要模拟值对象:过于通用的规则,没有解释

    以下是 Mockito 单元测试框架的引用 不要模拟值对象 为什么有人会想要这样做呢 因为实例化对象太痛苦了 gt 无效 原因 如果创造新的装置太困难 那就是一个迹象 代码可能需要一些认真的重构 另一种方法是创建 价值对象的构建者 有一些工
  • Java:使用 Java.util.concurrent 线程访问读取线程串行端口

    我正在尝试编写一个 Java 串行设备驱动程序并想使用 对我来说是新的 java util concurrent包裹 我有一种发送数据包然后等待 ACK 的方法 我打算有炭 接收在不同的线程中运行 如果接收线程收到 ACK 它应该使用发送数
  • 在 eclipse 之外将 Spring MVC 应用程序部署到 tomcat 的幕后会发生什么?

    我猜想使用像 eclipse 这样很棒的 IDE 的一个缺点是你会忽略应用程序幕后发生的事情 我是一名 Ruby 开发人员 所以不是一名 Java 老手 所以我一直在用 java 编写一个项目 并使用 spring 框架进行 IOC 和 M
  • 抽象类或接口。哪种方式是正确的?

    有两种方法可以选择抽象类或接口 微软解决方案和Oracle解决方案 微软 设计指南 请使用抽象 在 Visual Basic 中为 MustInherit 类而不是接口来将协定与实现分离 http msdn microsoft com en
  • 为什么现在()? (客观化)

    为什么我想要异步加载 Objectify 实体 异步加载到底意味着什么 根据客观化有关加载的文档 https code google com p objectify appengine wiki BasicOperations Loadin
  • 如何使用云打印打印Android活动显示

    我正在尝试将 Google 云打印实现到应用程序中 遵循集成指南 https developers google com cloud print docs android 我试图通过打印 google com 来保持基本 单击我创建的打印按
  • 战争库中的罐子爆炸

    我们可以将分解的 jar 文件放入 war web inf 库中吗 它在 JBOSS 4 2 中对我不起作用 我收到以下错误并且无法部署应用程序 Caused by javax management RuntimeOperationsExc
  • Google Cloud Messaging - 立即收到或长时间延迟收到的消息

    我在大学最后一年的项目中使用谷歌云消息传递 一切正常 但我在使用 GCM 时遇到了一些麻烦 通常 消息要么几乎立即传递 要么有很大的延迟 我读过这篇文章 但我真的认为它不适用于这种情况 GCM 通常会在消息发送后立即传送消息 然而 这并不总
  • 如果抛出RuntimeException,是否可以将其作为异常捕获?

    如果我有一个try抛出一个块RuntimException子类 可以是后续的catch块将其捕获为Exception 具体来说 public class MyAppException extends RuntimeException In
  • java.io.EOFException:没有更多可用数据 - 预期结束标记 关闭开始标记

    我正在使用 xmpp 开发一个聊天应用程序 根据我们的要求 我们有三台服务器 Apache Tomcat 7 ejabbered 2 1 11 和 mysql 5 5 to run xmppbot on tomcat used below
  • 我找不到 IntelliJ 快捷方式

    我使用 vim 一段时间 我知道有一个 intellij vim 插件 我很好奇内置的 IntelliJ 文本导航存在什么 如何打开实时模板来创建模板 如何查看以 tr 开头的现有模板列表 如何进行全局搜索并在当前文档中进行搜索 然后转到下

随机推荐

  • 线性表的查找算法-C语言

    文章目录 一 实验目的 二 实验内容 三 实验工具 四 实验代码 五 实验结果 六 总结与思考 一 实验目的 了解查找的基本概念 理解顺序查找 折半查找和分块查找的思想 掌握折半查找过程的判定树构造方法 实现线性表的查找算法 二 实验内容
  • mybatis中的if-else语句的使用解答

    1 mybatis中if else语句的语法 使用模板样例
  • 什么是沙箱技术?与容器有什么区别

    沙箱和容器的 隔离 机制 首先 什么是沙箱 它本身就是一种线下生活现象的虚拟化 现实世界里 小孩子们在沙地 沙滩上用木板隔离出一个方盒子 在盒子里堆砌 创造各种东西 城堡 房屋 山丘 这就是一个沙箱 它有两个根本特点 一 它有边界 通过木板
  • aix oracle 11 补丁包,oracle 11g for aix6.1安装基本步骤(含升级11.1.0.7)

    oracle 11g for aix6 1安装基本步骤 含升级11 1 0 7 1 检查物理内存 swap空间以及tmp空间 usr sbin lsattr E l sys0 a realmem 检查内存至少1G usr sbin lsps
  • anaconda创建python环境

    1 前提 系统中安装了anaconda沙箱环境 下载地址 anaconda官网 conda V 检验是否安装以及当前conda的版本 2 conda常用的命令 1 conda list 查看安装了哪些包 2 conda env list 或
  • bcd码和十进制码之间的转换

    BCD码转十进制 static u8 BCDToInt u8 value unsigned char temp 0 temp value gt gt 4 10 temp value 0x0F return temp 十进制转BCD码 sta
  • java awt linux_解决在linux下awt调用错误的问题

    在java中使用awt在服务器上处理图片的时候发现有错 第一遍执行 500 Servlet Exception java lang InternalError Can t connect to X11 window server using
  • MyBatis自动生成实体类

    MyBatis MySQL生成实体类 需要的工具jar包 mybatis generator core 1 3 2 jar mysql connector java 5 0 4 jar 第一步 编写一个MybatisGeneratorUti
  • android 之 如何让app没有图标

    我们有时候需要让我们的app没有图标 不要问我没有图标要干啥 我只是说的一个需求 单讲技术不说别的 首先我们要获得 PackageManager 的对象 PackageManager packageManager getPackageMan
  • Caffeine缓存的使用

    1 springboot集成Caffeine
  • KeePass搭建一个私人密码库

    文章作者 GoodBoyboy 文章链接 https blog goodboyboy top posts 2546190081 html 版权声明 本博客所有文章除特别声明外 均采用 CC BY NC SA 4 0 许可协议 转载请注明来自
  • 线程(进程)的同步与互斥实例

    1 有一个队列 线程1负责从网络接收分组 并将收到的分组放到队列尾 然后再次从网络中接收下一个到达的分组 并进行同样的队列操作 线程2从此队列头中取出一个分组进行处理 处理完毕后 再次从队列中取出一个分组进行处理 处理完毕后再次从队列头取出
  • [转]Python实现多功能音乐播放器

    前言 就是用Python做一个简易的音乐播放器 废话不多说 咱们直接开干 当然 今天做这个肯定不是最简单的 最简单的音乐播放器 9行代码足以 import time import pygame file r 歌曲路径 pygame mixe
  • torch.hub.load()解析,如何加载本地权重

    用yolov5训练了一个权重 项目只能部署在本地 官方文档 torch hub load repo or dir model args source github force reload False verbose True skip v
  • Python 第一阶段

    第一章 安装 1 1 开发环境 官网 https www python org稳定版 Stable Releases检验 cmd 命令 python version 1 2 开发工具 PyCharm官网 https www jetbrain
  • (Struts2学习篇) Struts2配置文件之 struts-default.xml

    对struts default xml的一些注释
  • zmq+protobuf 的坑点难点

    zmq protobuf 的坑点难点 之前项目要用到zmq protobuf的方式传递数据 软件采用前后端分离的方式开发 其中前端是异地同事用python开发的 后端是我们这边用C 开发的 1 中间有遇到问题是前后端传送zmq信息时 发现字
  • DRM驱动(七)之atomic_commit

    上节已经把应用的参数check了一遍 这次就可以把对应的参数配置到硬件里进行刷图操作了 int drm atomic commit struct drm atomic state state struct drm mode config c
  • Qt 控制台运行无法弹出小黑框

    Qt Console Application Qt 主要是GUI界面的设计 但在学习的时候控制台运行显得更加方便一些 小编在第一次新建控制台运行的时候 点击运行没有弹出小黑框 解决方法 主要是因为没有执行qmake 就需要在Qt的pro文件
  • RocketMQ 用法详解,你学会了吗?

    大家好 我是指北君 消息中间件是我们工作中使用最频繁的一类中间件 它具有低耦合 可靠投递 广播 流量控制 最终一致性等一系列功能 成为异步RPC的主要手段之一 当今市面上有很多主流的消息中间件 如老牌的ActiveMQ RabbitMQ 炙