MQ如何保证消息不丢失

2023-11-16

如何保证消息不丢失

哪些环节会造成消息丢失

其实主要就是跨网络的环境中需要考虑消息的丢失,主要是有以下几个方面

  • 生产者往MQ发送消息
  • MQ的Broker是集群有主从的,主节点把消息同步到从节点时也需要考虑消息丢失问题
  • 消息从内存持久化到硬盘时,MQ的消息是工作在内存中的,但是内存是断电就丢失数据,所以需要持久化到磁盘,这一步也需要考虑消息丢失问题
  • 消费者消费MQ的消息

如下图所示的四个步骤都有可能造成消息丢失

在这里插入图片描述

如何去防止消息丢失

其实也就是针对上面四个环节来分析,保证每个环节的消息不丢失

  • 生产者发送消息不丢失

    • kafka:消息发送+回调。生产者向MQ发送一个消息之后,MQ会向生产者发送一个请求执行相应的回调函数,如果一直没有执行回调函数Produce就知道消息发送失败了就可以重新发送消息

    • RocketMQ:它是Kafka之后出来的,也支持消息发送+回调的机制。同时它还支持事务消息来保证生产者发送消息不丢失

      RocketMQ的事务消息是保证生产者 本地事务和发送消息两步是原子性的:

      1. Producer向MQ发送一个half消息(对于Consumer是不可见的),首先确定MQ是正常运行的

      2. 执行本地事务

      3. 向MQ发送真正的消息,并携带本地事务的执行结果

      4. 如果本地事务执行结果是成功,那么消费者就可以消费此消息;如果本地事务执行结果是失败,那么MQ就会丢弃此消息;如果本地事务执行结果是未知,那么就会经过以下的步骤

      5. 过一段时间MQ向Producer发送一个请求回查本地事务的执行结果

      6. Producer会执行相应的操作去检查当前 本地事务的执行结果,并将结果发送给MQ

      7. MQ再判断本地事务的执行结果,如果还是未知就继续重复执行5~7步,默认会重复执行15次。

        在这里插入图片描述

    • RabbitMQ:

      也支持消息发送+回调的机制。

      它还支持手动事务机制。

      RabbitMQ的提供了这些API方法,让我们程序自己去实现事务的逻辑

      channel.txSelect()开启事务 channel.txCommit()提交事务 channel.txRollback()回滚事务

      我们首先开启事务,然后执行本地事务,再执行后面两个Api方法。这种手动事务机制有一个问题就是对channel是会产生阻塞的,会造成吞吐量下降

      在RabbitMQ3.*版本开始,它还支持Publisher Confirm机制,相当于是生产者确认机制,整个处理流程和RocketMQ事务消息的处理流程基本是一样的

  • MQ主从同步消息不丢失

    • RocketMQ

      在RocketMQ中对于普通集群,主从数据复制有两种方式:同步复制和异步复制。同步复制就能保证消息不丢失,异步复制效率高但是可能丢消息

      第二种方式就是Dledger集群,它在主从数据复制时采用两阶段提交来保证消息不丢失

      ​ 普通集群就是我们指定哪个节点是Master,哪个节点是Slave;而Dledger集群会频繁的隔一段时间从至少三个节点中选举一个节点成为Master,其余的为Slave,当Producer发送消息到Master后,Master会直接返回给Producer,然后当前消息会标记为UnCommited,再给Slave进行消息同步,当大部分Slave都同步成功后才会把消息的状态改为Commited。这就是这里的两阶段。

    • RabbitMQ

      普通集群:消息是分散存储在各个节点,节点之间不会主动进行消息同步,只有在消费时才会进行消息同步。就比如Producer生产一个消费发送到了A节点,这个时间集群中各个节点是不同步消息的,Consumer却在B节点上消费这条消息,这个时候才会把A节点的消息同步到B节点来 再进行消费。这种方式是可以丢失消息的

      镜像集群:当Producer生产一个消息后,各个节点之间会主动的进行消息的同步,这样数据安全性会更高

    • Kafka

      它通常是在允许少量消息丢失的场景,它可以通过配置acks,配置为0 1 all 。这就相当于RocketMQ的同步复制/异步复制

  • MQ消息持久化存盘时消息不丢失

    • RocketMQ:提供了一种配置的方式,可以选择同步刷盘,也可以选择异步刷盘
    • RabbitMQ:将队列配置成持久化队列,这样就可以保证消息不丢失。在RabbitMQ3.*版本中还有一个Quorum类型的队列,会采用Raft协议来进行消息同步
  • 消费者消费消息时消息不丢失

    一般情况下,MQ的队列中会有一个offset偏移量指向当前消费消息的位置,Consumer消费消息之后会往MQ返回一个消息,然后MQ就会把offset偏移量往前移动。如果consumer消费失败了,那么MQ的offset也不会移动,下次Consume再重新消费就行了。

    会造成消费时消息丢失才场景是:Consumer消费消息变为异步的方式,刚开始收到需要消费的消息就往MQ发送一个消息,然后再去执行本地事务,这个时候如果本地事务执行失败,可是发送给MQ的确认消息却已经发送成功了,这就造成了消费端消息丢失。

    解决消费者消息消息时不丢失的方式:

    • RocketMQ:使用默认的消费方式就行,不要采用异步方式
    • RabbitMQ:它在消费消息时有一个autocommit自动提交的机制,我们将autocommit关闭,不要让它自动提交,改为手动提交offset
    • Kafka:也是一样的改为手动提交offset
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MQ如何保证消息不丢失 的相关文章

  • 使用AWS SQS作为Aurora数据库的写入队列来提高系统性能是否有效

    我正在 AWS 上开发一个 Web 应用程序服务器 需要支持高吞吐量的读写 我的老板给了我这样的高级设计 我被困在 写入队列 上 团队告诉我 我们需要它来提高写入性能 因为我们只能有 1 个可以写入的主副本 我对 SQS 和 RabbitM
  • 使用Camel的spring-rabbitmq组件时如何自动声明交换?

    我正在尝试从 Camel 3 x 迁移到 Camel 4 x 版本 因此我需要从rabbitmq替换组件spring rabbitmq With rabbitmq我正在使用的组件declare https camel apache org
  • 何时使用 RabbitMQ 铲子以及何时使用 Federation 插件?

    对于我工作的公司 我们希望使用 RabbitMQ 作为我们的主要消息总线 我们的想法是 每个应用程序都使用自己的虚拟主机进行内部通信 并且通过 shovel 或联合插件 我们可以在多个虚拟主机 甚至可能是多台机器 非集群 之间共享某些类型的
  • 列出与rabbitmq java客户端API交换的绑定

    我似乎在文档中找不到任何信息 所以我想知道是否可以通过某种方式使用 java RabbitMQ API 获取与交换相关的所有绑定 我在查询 api bindings 时正在寻找类似 http api 结果的内容 api definition
  • 如何在多租户系统中的 RabbitMQ 中使队列私有/安全?

    我已阅读开始使用 http www rabbitmq com getstarted htmlRabbitMQ 提供的指南 甚至还贡献了第六个示例暴风雨 amqp https github com paolo losi stormed amq
  • 多个队列在一个通道中消耗

    我使用rabbitMq 来管理和使用队列 我有多个队列 它们的数量并不具体 我使用直接交换来发布消息 我怎样才能仅使用一个队列来消费每个队列的所有消息 基于routing key 渠道 此时我假设我有 5 个队列 我使用了 for 循环并为
  • 使用 Celery(RabbitMQ、Django)检索队列长度

    我在 django 项目中使用 Celery 我的代理是 RabbitMQ 我想检索队列的长度 我浏览了 Celery 的代码 但没有找到执行此操作的工具 我在 stackoverflow 上发现了这个问题 从客户端检查 RabbitMQ
  • 在 Windows 10 和 PHP 7.3 中安装 AMQP

    我想在 Windows 10 中使用 PHP 7 3 安装 AMQP 以便在 symfony 4 中使用 Windows 不使用任何 apache iis nginx 并直接由 symfony 运行 一切还好 直到 我决定在项目中使用rab
  • MassTransit 生成我想忽略的_skipped 队列

    任何人都可以猜出问题是什么 因为我不知道如何解决这个问题 大众运输产生 skipped队列 我不知道为什么它会生成这些队列 它是在执行发布请求响应时生成的 请求客户端是使用 MassTransit RequestClientExtensio
  • 如何重置rabbitmq管理用户

    使用rabbitmq 我们可以安装管理插件 然后我们通过浏览器访问http localhost 55672 使用访客 访客 问题是 我无法再登录 因为我更改了密码并为角色输入了空白 有没有办法重置rabbitmq管理的用户 您可以通过以下方
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接
  • 服务器在 pika.exceptions.StreamLostError: Stream 连接丢失后关闭

    我的队列中有一些图像 我将每个图像传递到我的 Flask 服务器 在其中完成图像处理 并在我的rabbitmq 服务器中收到响应 收到响应后 我收到此错误 pika exceptions StreamLostError 流连接丢失 104
  • RabbitMQ:无法启动rabbitmq_management插件

    Version gt sudo rabbitmqctl status grep rabbit RabbitMQ rabbit RabbitMQ 3 5 6 Error gt sudo rabbitmq plugins enable rabb
  • rabbitmq 的 REST API

    有没有办法从 ajax 向 RabbitMQ 发送数据 我的应用程序由数千个 Web 客户端 用 js 编写 和 WCF REST 服务组成 现在我试图弄清楚如何为我的应用程序创建可扩展点 这个想法是有一个rabbitmq实例 它从放置在一
  • ECONNREFUSED:无法连接到集群内默认端口上的 RabbitMQ pod

    我的本地集群中有一个运行 RabbitMQ 的 pod 我已经将其配置为 apiVersion v1 kind Service metadata name service rabbitmq spec selector app service
  • RabbitMQ 3.1.3 和丢失的时间戳头

    如果消息中缺少时间戳头 是否可以将代理配置为插入时间戳头 因此 如果发布客户端没有添加时间戳标头 代理是否可以插入与交易所收到消息的时刻相匹配的时间戳值 我应该在哪里寻找该配置 或者这是一个坏主意 截至2015年 原来的问题有了新的答案 这
  • 如何停止本地主机上的 RabbitMQ 服务器

    我在 OS X 上安装了 RabbitMQ 服务器 并在命令行上启动它 现在 我应该如何阻止它运行还不清楚 我这样做之后 sudo rabbitmq server detached I get Activating RabbitMQ plu
  • 使用 Spring Boot 的多个 Rabbitmq 队列

    来自 Spring Boot 教程 https spring io guides gs messaging rabbitmq https spring io guides gs messaging rabbitmq 他们给出了创建 1 个队
  • RabbitMQ + Windows + LDAP 无需发送密码

    我正在尝试在 Windows 7 上使用 RabbitMQ 3 6 2 进行 LDAP 身份验证 授权 我已经在应用程序发送用户名 密码的情况下进行了基本身份验证 但密码位于我需要弄清楚如何进行的代码中避免 有没有人在不提供密码的情况下成功
  • 如何停止rabbitmq服务器

    我正在尝试启动一个节点应用程序 但我认为rabbitmq 妨碍了我 与此线程类似 名为 rabbit 的节点已经在运行 但也 无法连接到节点 rabbit https stackoverflow com questions 8737754

随机推荐

  • YUV420数据格式详解

    YUV简介 YUV格式有两大类 planar和packed 对于planar的YUV格式 先连续存储所有像素点的Y 紧接着存储所有像素点的U 随后是所有像素点的V 对于packed的YUV格式 每个像素点的Y U V是连续交叉存储的 YUV
  • 无盘游戏服务器软件,安网卫士

    2018年10月12号更新说明 请注意此版本无后台 需要注册号及收银编码的请联系客服 服务端 1 更改默认备份目录 2 当客户机无SSD施工时在BV进行显示无硬盘 3 添加游戏时 支持拖动 4 删除游戏时 取消 删除客户机文件 选项 5 取
  • gcc中-c和-o参数

    c和 o都是gcc编译器的可选参数 c表示只编译 compile 源文件但不链接 会把 c或 cc的c源程序编译成目标文件 一般是 o文件 o用于指定输出 out 文件名 不用 o的话 一般会在当前文件夹下生成默认的a out文件作为可执行
  • 次表面散射

    专题介绍 在实时渲染和离线渲染领域 对场景模型表面以及空间介质的精细化建模是增加场景真实感的重要手段 计算机图形学领域的许多科研工作者设计出一系列复杂精巧的技术理论 模拟出光线从宏观世界到微观粒子的变化规律 本期专题精选了近年来关于微表面模
  • Android 11 Activity启动流程分析

    Android 11 Activity启动流程分析 本片文章是基于Android 11版本来分析应用Activity的启动的 Activity是Android四大组件中最重要的一个 因为我们所有的页面基本上都是基于Activity开发的 所
  • Spring Data JPA 讲解大全

    https yangbingdong com 2019 spring boot data jpa learning
  • Spring Boot干货系列:(七)默认日志logback配置解析

    原本地址 Spring Boot干货系列 七 默认日志logback配置解析博客地址 tengj top 前言 今天来介绍下Spring Boot如何配置日志logback 我刚学习的时候 是带着下面几个问题来查资料的 你呢 如何引入日志
  • Window 能访问某些网站,不能访问一些网的解决方法

    转自 http blog sina com cn s blog 53dd443a01014pfn html 这几天遇到一个奇怪的事 一台电脑能上google 也能上baidu 就是不能上微软 ping 微软也不行 当然 这个肯定不是公司防火
  • java附近的人_es6.2.4学习----java实现附近搜索(附近的人)

    阅读本文需先了解es对地理位置的处理 本文讲述java代码实现搜索附近的人的功能 第一步 创建可存储地理位置信息的索引 public static void createIndex throws IOException RestHighLe
  • 【Xilinx AX7103 MicroBalze学习笔记7】MicroBlaze AXI4 接口之 DDR 读写实验

    目录 AXI4 协议介绍 实验任务 硬件设计 Vivado 部分 自定义 IP MicroBlaze 配置 配置 PLL IP 配置 MIG IP 添加源文件 IP 软件设计 SDK 部分 lt
  • Spring MVC 拦截器执行时机

    一 准备工作 搭建好Spring MVC环境以后 我们创建一个拦截器 名为MyInterceptor并实现HandlerInterceptor接口 实现接口方法 便于观察我们只在控制台输出对应的方法名 package com jd inte
  • Qt5的插件机制(7)--插件开发示例代码(Lower-level API)

    插件代码 接口类头文件 MyPluginInterface h cpp view plain copy ifndef INTERFACES H define INTERFACES H include
  • Nginx配置https的wordpress站点,wp-content目录下资源404解决方案

    Nginx配置https的wordpress站点 wp content目录下资源404解决方案 参考文章 1 Nginx配置https的wordpress站点 wp content目录下资源404解决方案 2 https www cnblo
  • pandas DataFrame数据的合并与拼接

    转发 Python pandas DataFrame数据的合并与拼接 merge join concat 总结得很全面 比如将一个文件夹下所有文件合并 merge import os import pandas as pd file lis
  • 数据结构——图解求单链表的长度及插入操作C语言

    单链表的插入属于单链表的基本操作之一 关于单链表的初始化的解释在我的上篇文章中已经详细说明过了 一 求单链表长度 求单链表长度的操作很简单 其实在初始化赋值或遍历那块就可以实现 但是为了让结构层次独立清楚 我还是把求长度写成了一个函数 单链
  • 在GCP上创建Cloud SQL的三种方式(Console,gcloud,Terraform)

    1 简介 Cloud SQL 是GCP上的关系型数据库 常用的有三种方式来创建 1 界面操作 2 命令行 gcloud 3 Terraform 在开始之前 可以查看 初始化一个GCP项目并用gcloud访问操作 2 GCP 操作界面 登陆G
  • git 删除右键菜单

    首先 我表示git默认的右键菜单很烦 太多项了 而我们平时用的最多的无非是一个Git Bash 删除msGit右键菜单 如果是windows 64位系统 cmd进入 C Program Files x86 Git git cheetah 目
  • 恢复U盘分区:windows自带工具diskpart

    步骤 如下图 cmd命令行处执行diskpart命令 运行该工具 然后list disk 列出所有磁盘 然后select disk xxx 选中自己的磁盘 比如下图的是磁盘2 然后clean 清空分区 然后creat partition p
  • 我们这个年龄应该要做的事

    大家好 我是一名入门的菜鸟 如果你不经意间翻开了我的文章 谢谢您 您的支持是我前进的动力 让我们一起加油 由于不是名牌大学 只是一个普普通通的专科生 所以 我想通过自己的努力来获得我想要的 我不会放弃我的梦想 我也曾幻想着我成功的时候在朋友
  • MQ如何保证消息不丢失

    如何保证消息不丢失 哪些环节会造成消息丢失 其实主要就是跨网络的环境中需要考虑消息的丢失 主要是有以下几个方面 生产者往MQ发送消息 MQ的Broker是集群有主从的 主节点把消息同步到从节点时也需要考虑消息丢失问题 消息从内存持久化到硬盘