消息队列(MQ)

2023-11-11

一、为什么要用消息队列(消息队列的应用场景)

  • 应用解耦
  • 异步任务
  • 流量削峰

问题背景:学生向老师请教问题。如果学生A正在向老师请教问题,那么后面的学生依次排队等候,直到轮到自己请教问题。这样的模式会使整个系统的效率较低,学生排队等待时间太久,而且排队学生过多,老师也会忙不过来。引入消息队列后,即右图的班长,班长负责将所有学生的问题汇总,即学生按照固定的格式将请教的问题写在纸上,并按照先后顺序交给班长,即可做自己的事情去了。老师可以从班长那里依次获取问题,并逐一进行解答。

解决的问题:

(1)学生不需要直接面对老师,学生和老师通过班长进行通信,即解决了耦合调用的问题。

(2)老师无需当场回答问题,可以按照自己的处理进度依次回答问题,即解决了异步通信的问题。

(3)当提问的学生数量过多,通过班长(MQ)可以抵御洪峰流量,达到保护主业务的目的。

总结:

(1)解耦

系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用支付系统,库存系统,物流系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

在使用消息队列(MQ)解耦后,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

(2)异步

A系统接收一个请求,需要在自己本地写入数据库,还需在B,C,D三个系统中写入数据库,如采用直接调用方式,最终的请求总延时非常高,用户体验感很差。如果使用了消息队列(MQ),系统A接收到请求后,只需连续发送3条消息到消息队列(MQ)中,即可返回响应给用户,而无须等待B,C,D三个系统的响应结果。

一个系统跟另一个系统之间进行通信的时候,假如系统A希望发送一个消息给系统B,让他去处理,但是系统A不关注B到底怎么处理或者有没有处理好,所以系统A将消息发送给消息队列(MQ),然后就不管这条消息的“死活”了,接着B从消息队列里取出消息进行处理即可。至于怎么处理,是否处理完毕,什么时候处理,都是系统B的事儿,与系统A无关。

(3)削峰

应用系统如果遇到系统请求流量的瞬间猛增,有可能将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提高系统的稳定性和用户体验。

流量削峰的经济考量:

业务系统正常时段的QPS如果是1000,流量最高峰时10000,为了应对流量高峰配置高性能的服务器显然不划算,这样可以使用消息队列对峰值流量削峰

 

二、各种消息队列产品的比较

小结:

  • ActiveMQ,早期使用较多,没有经过大规模吞吐量场景的验证,社区也不是很活跃,但是现在确实大家用的不多了,不推荐。
  • RabbitMQ,开发语言erlang,阻止了大量的Java工程师去深入研究和掌握它,对公司而言,几乎处于不可控的状态,但是RabbitMQ是开源的,比较稳定的支持,活跃度也高,如不考虑二次开发,追求性能和稳定性,推荐使用。
  • RocketMQ,开发语言是Java,在阿里内部经受过高并发业务的考验,稳定性和性能均不错,考虑后期可能二次开发,推荐使用
  • Kafka,大数据领域的实时计算,日志采集等场景,用Kafka是业内标准的,社区活跃度很高,推荐使用。大数据领域日志采集等业务推荐使用。

 

三、消息队列的优点和缺点

优点:

  • 解耦
  • 异步
  • 削峰

缺点:

  • 系统可用性降低
  • 系统复杂度提高
  • 一致性问题

(1)系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦消息队列(MQ)宕机,就会对业务造成影响。

如何保证MQ的高可用?

(2)系统复杂度提高

MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。

消息丢失怎么办?

重复消息怎么处理?

如何保证消息传递的顺序性?

(3)一致性问题

A系统处理完业务,通过MQ给B,C,D三个系统发消息数据,如果B系统,C系统处理成功,D系统处理失败。

如何保证消息数据处理的一致性?

 

四、如何保证消息队列的高可用

项目中引入MQ导致系统的可用性降低,面试官想知道的是面试者针对可用性降低的思考和解决思路。

(1)RabbitMQ高可用---镜像集群

  • 在多台机器上分别启动RabbitMQ实例
  • 多个实例之间可以相互通信
  • 每次生产者写消息到queue的时候,都会自动把消息同步到多个实例的queue上。每个RabbitMQ节点上都有queue的消息数据和元数据。
  • 某一个节点宕机,其他节点依然保存完整数据,不影响客户端消费。

(2)RocketMQ高可用---双主双从

  • 生产者通过Name Server发现Broker(生产者发送消息时首先会询问name server,给name server给其一个borker地址,随后生产者发送消息到broker)
  • 生产者发送队列消息到2个Broker
  • Broker主节点分别和各自从节点同步数据(当borker master1宕机,其从节点borker slave1也有备份数据,不影响正常的服务;与此同时,borker master1宕机,由于其从节点不能接收生产者的消息,但是另外一个主节点borker master2可以接收消息,不影响正常的服务
  • 消费者从主或从节点订阅消息

 

五、如何保证消息不丢失

消息丢失的原因:

情况一:消息生产者没有成功发送到MQ。

情况二:消息发送给MQ,但是MQ宕机导致内存中的消息数据丢失。

情况三:消费者获取到消息,但消费者还没有来得及处理宕机了,但此时MQ中的消息已经删除,消费者重启后不能再消费之前的消息。

确保消息不丢失的方案:

  • 消息发送者发送给MQ后,MQ给生产者确认收到
  • MQ收到消息后进行消息持久化
  • 消费者收到消息处理完毕后手动进行ack确认
  • MQ收到消费者ack确认后删除持久化的消息

总结:

消息丢失的原因:

  • 发送方,MQ,消费方都有可能导致消息丢失。

如何保证消息不丢失:

  • 发送方可靠发送
  • MQ进行消息持久化
  • 消费方消费完毕后进行ACK确认,MQ收到消费方的ACK确认再删除本地消息

 

六、如何保证消息不被重复消费(幂等性)

消息重复的根本原因是网络不可达

发送时消息重复

当一条消息已被成功发送到服务端,此时出现网络闪断,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同的消息。

消费时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当消费方给MQ服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,MQ服务端在网络恢复后再次尝试投递之前已被消费方处理过的消息,此时消费者就会收到两条内容相同的消息。

解决方案:

  • 消息发送者发送消息时携带一个全局唯一的消息id
  • 消费者获取消费后先根据id在Redis/DB中查询是否存在消费记录
  • 如果没有消费过就正常消费,消费完毕后写入Redis/DB
  • 如果消息消费过就直接舍弃

 

七、如何保证消息消费的顺序性

消息的顺序性消费:

消息有序指的是按照消息的发送顺序来消费,例如一笔订单产生了3条消息,分别是订单创建,订单付款,订单完成。消费时,要按照顺序依次消费才有意义。与此同时,多笔订单之间有时可以并行消费的。

局部顺序消费:

  • 生产者根据消息ID将同一组消息发送到同一个Queue中。
  • 多个消费者同时获取Queue中的消息进行消费。
  • MQ使用分段锁保证单个Queue中的有序消费。(分段锁保证队列中的消息按顺序执行,当一个消息出队列被消费时,此时该队列被分段锁锁起来,直到该消息被消费完成,分段锁打开,继续消费第二个消息)

 

八、基于MQ的分布式事务实现

  • 用户提交订单
  • 库存服务操作库存DB,减库存
  • 订单服务操作订单DB,生成订单数据
  • 库存服务和订单服务要么同时成功,要么同时失败

本质上讲,分布式事务就是为了保证不同数据库的数据一致性。

消息的发送方在处理完自己的业务后,将消息首先保存到本地,并将消息状态置为1(代表消息未处理),接下来将消息发送到消息队列(MQ)中,消息的消费方监听到消息后,为了保证消息的幂等性(消息不被重复消费),首先判断是否为重复消息,若消息未被执行,则执行相应的业务逻辑,并将消息记录到本地数据库中,最后向MQ发送消息,以更改发送方消息的状态(将消息的状态置为2)

特殊情况:若消息在发送方执行成功,但是在消息的消费方执行失败,此时两边业务系统的数据不一致。消息发送方的定时任务定时扫描消息发送方的本地消息表中teye=1(未处理的消息)的消息,并将该消息继续发送到消息队列(MQ)中,以供消费方重新消费。若消息未被成功消费,则定时任务一直发送消息到MQ中,这样可以保证所有消息一定会被消费。

总结:

(1)消息发送方:

  • 处理业务逻辑
  • 保存消息到本地数据库
  • 发送消息给MQ
  • 监听MQ消息方通知消息,更改消息状态为已处理
  • 定时任务将长期未处理消息重新发送到MQ

(2)消息消费方:

  • 监听MQ中间件消息
  • 判断消息是否重复,重复就丢弃
  • 消息未重复,执行本地业务
  • 业务处理完毕,写消息记录到本地数据库
  • 发送通知消息到MQ
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

消息队列(MQ) 的相关文章

  • INSERT..RETURNING 在 JOOQ 中不起作用

    我有一个 MariaDB 数据库 我正在尝试在表中插入一行users 它有一个生成的id我想在插入后得到它 我见过this http www jooq org doc 3 8 manual sql building sql statemen
  • 多个 Maven 配置文件激活多个 Spring 配置文件

    我想在 Maven 中构建一个环境 在其中我想根据哪些 Maven 配置文件处于活动状态来累积激活多个 spring 配置文件 目前我的 pom xml 的相关部分如下所示
  • 加速代码 - 3D 数组

    我正在尝试提高我编写的一些代码的速度 我想知道从 3d 整数数组访问数据的效率如何 我有一个数组 int cube new int 10 10 10 我用价值观填充其中 然后我访问这些值数千次 我想知道 由于理论上所有 3d 数组都存储在内
  • 控制Android的前置LED灯

    我试图在用户按下某个按钮时在前面的 LED 上实现 1 秒红色闪烁 但我很难找到有关如何访问和使用前置 LED 的文档 教程甚至代码示例 我的意思是位于 自拍 相机和触摸屏附近的 LED 我已经看到了使用手电筒和相机类 已弃用 的示例 但我
  • JavaMail 只获取新邮件

    我想知道是否有一种方法可以在javamail中只获取新消息 例如 在初始加载时 获取收件箱中的所有消息并存储它们 然后 每当应用程序再次加载时 仅获取新消息 而不是再次重新加载它们 javamail 可以做到这一点吗 它是如何工作的 一些背
  • Spring Data JPA 应用排序、分页以及 where 子句

    我目前正在使用 Spring JPA 并利用此处所述的排序和分页 如何通过Spring data JPA通过排序和可分页查询数据 https stackoverflow com questions 10527124 how to query
  • 斯坦福 NLP - 处理文件列表时 OpenIE 内存不足

    我正在尝试使用斯坦福 CoreNLP 中的 OpenIE 工具从多个文件中提取信息 当多个文件 而不是一个 传递到输入时 它会给出内存不足错误 All files have been queued awaiting termination
  • 如何在PreferenceActivity中添加工具栏

    我已经使用首选项创建了应用程序设置 但我注意到 我的 PreferenceActivity 中没有工具栏 如何将工具栏添加到我的 PreferenceActivity 中 My code 我的 pref xml
  • 禁止的软件包名称:java

    我尝试从数据库名称为 jaane 用户名 Hello 和密码 hello 获取数据 错误 java lang SecurityException Prohibited package name java at java lang Class
  • 如何为俚语和表情符号构建正则表达式 (regex)

    我需要构建一个正则表达式来匹配俚语 即 lol lmao imo 等 和表情符号 即 P 等 我按照以下示例进行操作http www coderanch com t 497238 java java Regular Expression D
  • Java TestNG 与跨多个测试的数据驱动测试

    我正在电子商务平台中测试一系列商店 每个商店都有一系列属性 我正在考虑对其进行自动化测试 是否有可能有一个数据提供者在整个测试套件中提供数据 而不仅仅是 TestNG 中的测试 我尝试不使用 testNG xml 文件作为机制 因为这些属性
  • 在两个活动之间传输数据[重复]

    这个问题在这里已经有答案了 我正在尝试在两个不同的活动之间发送和接收数据 我在这个网站上看到了一些其他问题 但没有任何问题涉及保留头等舱的状态 例如 如果我想从 A 类发送一个整数 X 到 B 类 然后对整数 X 进行一些操作 然后将其发送
  • 在 Mac 上正确运行基于 SWT 的跨平台 jar

    我一直致力于一个基于 SWT 的项目 该项目旨在部署为 Java Web Start 从而可以在多个平台上使用 到目前为止 我已经成功解决了由于 SWT 依赖的系统特定库而出现的导出问题 请参阅相关thread https stackove
  • 仅将 char[] 的一部分复制到 String 中

    我有一个数组 char ch 我的问题如下 如何将 ch 2 到 ch 7 的值合并到字符串中 我想在不循环 char 数组的情况下实现这一点 有什么建议么 感谢您花时间回答我的问题 Use new String value offset
  • 如何从指定日期获取上周五的日期? [复制]

    这个问题在这里已经有答案了 如何找出上一个 上一个 星期五 或指定日期的任何其他日期的日期 public getDateOnDay Date date String dayName 我不会给出答案 先自己尝试一下 但是 也许这些提示可以帮助
  • 在 Maven 依赖项中指定 jar 和 test-jar 类型

    我有一个名为 commons 的项目 其中包含运行时和测试的常见内容 在主项目中 我添加了公共资源的依赖项
  • Firebase 添加新节点

    如何将这些节点放入用户节点中 并创建另一个节点来存储帖子 我的数据库参考 databaseReference child user getUid setValue userInformations 您需要使用以下代码 databaseRef
  • 使用 JMF 创建 RTP 流时出现问题

    我正处于一个项目的早期阶段 需要使用 RTP 广播DataStream创建自MediaLocation 我正在遵循一些示例代码 该代码目前在rptManager initalize localAddress 出现错误 无法打开本地数据端口
  • 当我从 Netbeans 创建 Derby 数据库时,它存储在哪里?

    当我从 netbeans 创建 Derby 数据库时 它存储在哪里 如何将它与项目的其余部分合并到一个文件夹中 右键单击Databases gt JavaDB in the Service查看并选择Properties This will
  • 节拍匹配算法

    我最近开始尝试创建一个移动应用程序 iOS Android 它将自动击败比赛 http en wikipedia org wiki Beatmatching http en wikipedia org wiki Beatmatching 两

随机推荐

  • 快速编写json数据

    1 打开idea 2 新建txt文件 alt 单击快速加 编写json数据
  • C语言面试必问的经典问题(纯”gan“货)

    C语言面试必问的经典问题 1 预处理 1 预编译 编译过程最先做的工作是啥 何时需要预编译 指令有什么 答 预编译就是预处理 就是把一些文本的替换工作工作 预编译指令 include ifdef ifndef else endif 编译 字
  • 高德地图Js API的使用

    申请JSAPI的开发者key 申请地址 http lbs amap com dev key 引入高德地图JavaScript API文件 创建地图容器 在页面body里你想展示地图的地方创建一个div 容器 并指定id标识 div div
  • Python-Pyqt6之QIntValidator,QDoubleValidator无法限制数值范围的正则表达式解决方案

    在使用Pyqt6进行GUI设计的时候 在需要输入数值 整型 浮点型 的时候选择使用了QLineEdit这个组件控件 详情介绍 QLineEdit组件详情 QLineEdit自带的setValidator包含 QIntValidator QD
  • promise函数几种写法与坑

    promise是ES6中引入的处理异步函数的强大特性 但是对promise的不恰当使用可能会达不到最终目的 对这个问题的探究来源于这篇文章关于promises 你理解了多少 几个异步函数如下 resolve或reject在回调函数里被调用
  • 网络编程的几种I/O模式

    1 非阻塞I O 非阻塞I O 若想网络编程时调用I O函数不想让程序阻塞 需要使用I O复用技术 一个方法是poll 轮询 所谓轮询就是执行函数时 如果内核不能立即对应用的函数进行响应时 就返回给应用一个错误 而应用不停的循环调用该函数
  • JavaScript表示不背小数计算存在误差的锅

    浮点数的最高精度是17位小数 但是在实际计算时会产生莫名其妙的问题 如0 1 0 2的结果不是0 3 而是0 30000000000000004 这个舍入误差会导致无法测试特定的浮点数值 例如 var a 0 1 b 0 2 if a b
  • 【数据结构】采用邻接矩阵表示法创建无向网、无向图、有向图、有向网

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 目录 一 无向网 权值 对称 1 思路 2 代码 3 运行结果 三 其他 1 无向图 0 1 对称 2 有向网 权值 不对称 3 有向图 0 1 不对称 一 无向网 1 思路
  • 使用python的pandas包查询数据库数据导出到excel

    文章目录 前言 1 实现分析 2 实现过程 2 1安装环境 2 3功能逻辑 2 4完整代码 3 总结 前言 前几天接到一个业务的需求 让我把当前数据库里面的结果数据导出到excel中 然后供业务查看 问题是当前结果数据都是列式表 所以需要把
  • 强化学习 reward 曲线的绘制

    每隔一段取一个均值 然后把均值曲线绘制出来 包含全部点的曲线淡化处理 摘自 Z Mou Y Zhang F Gao H Wang T Zhang and Z Han Deep Reinforcement Learning based Thr
  • paddlepaddle安装问题protobuf package to 3.20.x or lower.

    按照官方文档安装paddlepaddle 2 3 0后 进行环境验证时 总是提示如下错误 TypeError Descriptors cannot not be created directly If this call came from
  • 【编程测试题】数字游戏

    题目描述 小易邀请你玩一个数字游戏 小易给你一系列的整数 你们俩使用这些整数玩游戏 每次小易会任意说一个数字出来 然后你需要从这一系列数字中选取一部分出来让它们的和等于小易所说的数字 例如 如果 2 1 2 7 是你有的一系列数 小易说的数
  • 【删除列表中最后一条数据遇到的问题】

    项目场景 列表有多页数据 如果最后一页只有一条数据 当删除这条数据时 页码能够改变并且数据正确展示 问题描述 当删除这条数据时 页码可以正确展示 但是数据没有正确显示 handleDelete row const id row userId
  • 服务器 虚拟san,关于服务器 SAN 和 SDS

    注明 本文内容基于VMware VSAN beta版本撰写 请访问http www vmware com products virtual san 获得有关正式版本的更新信息 我很高兴在Wikibon上读到了我的同事Stu Miniman
  • 【微信小程序】项目初始化

    var CSS 函数可以插入一个自定义属性 有时也被称为 CSS 变量 的值 用来代替非自定义 属性中值的任何部分 1 初始化样式与颜色 view text box sizing border box page themColor ad90
  • linux---配置bond方法

    配置bond方法 原始配置文件1 DEVICE eth0 BOOTPROTO dhcp HWADDR 00 0C 29 04 AE 65 IPV6INIT no NM CONTROLLED no ONBOOT yes TYPE Ethern
  • 不用插拔网线鼠标点击自动切换网线和WIFI

    因为之前在zf单位工作 政务内网需要插网线 而访问外网又需要连wifi 切换就需要拔掉网线插上网线很麻烦 旁边老哥教我了一手 bat程序自动切换方法 bat文件代码如下 以下代码的 bat文件执行后会切换到以太网 同时关闭掉wifi和以太网
  • 监听对象中属性变化(一个或多个属性、全部属性)

    一 数据监听器 什么是数据监听器 数据监听器用于监听和响应任何属性和数据自动的变化 从而执行特定的操作 它的作用类似于vue中的watch侦听器 在小程序中 基本语法格式如下 Component observers 字段A 字段B func
  • 积分规划:构建全面会员积分管理系统

    在当今竞争激烈的市场环境中 企业要想保持用户的忠诚度和活跃度 建立一个全面的会员积分管理系统是至关重要的 积分制度不仅可以激励用户参与 还可以增加用户的消费频次和购买金额 本文将深入探讨如何构建全面的会员积分管理系统 以实现更好的私域营销效
  • 消息队列(MQ)

    一 为什么要用消息队列 消息队列的应用场景 应用解耦 异步任务 流量削峰 问题背景 学生向老师请教问题 如果学生A正在向老师请教问题 那么后面的学生依次排队等候 直到轮到自己请教问题 这样的模式会使整个系统的效率较低 学生排队等待时间太久