使用RabbitMQ模拟订单超时自动关闭订单及释放库存

2023-05-16

使用MQ模拟订单超时自动关闭订单及释放库存

    • 依赖
    • RabbitMQ 消息可靠投递配置、yml配置
    • springBean自动创建交换机、队列(订单超时-死信队列)、绑定
    • 监听队列处理示例
      • 1、模拟创建订单
      • 2、监听到订单超时了,开始业务处理,并通知释放库存队列
      • 3、监听释放库存队列,自动释放库存

依赖

        <!-- rabbitMQ 管理-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.1</version>
        </dependency>
        <!--dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>-->

RabbitMQ 消息可靠投递配置、yml配置

package cn.jf.system.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * RabbitMQ配置类
 *
 * @author jf
 * @version 1.0
 * @Description 描述
 * @date 2022/07/05 15:47
 */
@Slf4j
@Configuration
public class MyRabbitMQConfig {

    @Autowired
//    @Qualifier("redisTemplate")
    private RabbitTemplate rabbitTemplate;

//    @Primary
//    @Bean(name = "rabbitTemplate")
//    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
//        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//        rabbitTemplate.setMessageConverter(messageConverter());
//        initRabbitTemplate();
//        return rabbitTemplate;
//    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate<p>
     * -@PostConstruct: MyRabbitMQConfig 对象创建完成以后,执行这个方法<p>
     * 1、服务收到消息就会回调<p>
     * ---- spring.rabbitmq.publisher-confirms: true<p>
     * ---- 设置确认回调 setConfirmCallback<p>
     * 2、消息正确抵达队列就会进行回调<p>
     * ---- spring.rabbitmq.publisher-returns: true<p>
     * ---- spring.rabbitmq.template.mandatory: true<p>
     * ---- 设置确认回调ReturnCallback<p>
     * <p>
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)<p>
     * ---- 默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息<p>
     * -------- 问题: 在处理信息时宕机了,会把所有的消息确认了,<p>
     * -------- 解决:需要手动确认信息,手动ack消息,不使用默认的消费端确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual<p>
     * ---- 签收了货物 channel.basicAck(deliveryTag, false);拒绝签收货物channel.basicNack(deliveryTag, false, false);<p>
     */
    @PostConstruct
    public void initRabbitTemplate() {
        /*
         * 设置确认回调
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("confirm." + correlationData + "==>ack:[" + ack + "]==>errorMsg:[" + cause + "]");
            }
        });
        /*
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路由键
         */
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("==>errorMsg[" + returned.getMessage() + "] ==>code[" + returned.getReplyCode() + "]" +
                        "==>text[" + returned.getReplyText() + "] ==>exchange[" + returned.getExchange() + "] ==>routingKey[" + returned.getRoutingKey() + "].\r\n");
            }
        });
    }
}


yml 程序配置

spring: 
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 虚拟主机配置
    virtual-host: /
    # 开启发送端消息抵达Broker确认类型【simple、correlated、none】
    publisher-confirm-type: simple
    # 开启发送端消息抵达Queue确认
    publisher-returns: true
    # 只要消息抵达Queue,就会异步发送优先回调 returnfirm
    template:
      mandatory: true
    # 手动ack消息,不使用默认的消费端确认
    listener:
      simple:
        acknowledge-mode: manual

springBean自动创建交换机、队列(订单超时-死信队列)、绑定

package cn.jf.system.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * 自动创建 Exchange类型:点对点(direct:精准匹配(完全匹配、单播模式)、header)、订阅(fanout:广播模式、topic:路由键模式)、队列、绑定<br/>
 * <p>
 * -- new Exchange (字符串名称、布尔持久型、布尔型自动删除、参数);<br/><p>
 * ---- String name, boolean durable, boolean autoDelete, Map< String, Object> arguments<br/><p>
 * -- new Queue(队列名字, 是否持久化, 是否独占, 是否自动删除, 参数);<br/><p>
 * ---- name - 队列的名称 - 不能为空;设置为 "" 让代理生成名称。<br/>
 * ---- durable - 持久 - 如果我们声明一个持久队列,则为真(该队列将在服务器重新启动后继续存在)<br/>
 * ---- exclusive - 独占 - 如果我们声明一个独占队列,则为真(该队列将仅由声明者的连接使用)<br/>
 * ---- autoDelete – 如果服务器在不再使用队列时应该删除队列,则为 true<br/>
 * ---- arguments - 用于声明队列的参数<br/>
 * <p>
 * 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下<br/>
 *
 * @author jf
 * @version 1.0
 * @Description 描述
 * @date 2022/07/05 15:47
 */
@Configuration
public class MyAutoBeanMQConfig {
//交换机///

    /**
     * 创建 Topic 类型的Exchange
     *
     * @return
     */
    @Bean
    public Exchange demoEventTopicExchange() {
        return new TopicExchange("demo-topic-exchange", true, false);
    }

//消息队列///

    /*
     * 死信队列
     * <p>
     * 对队列设置过期,而不是对消息设置过期
     * <p>
     * 设计建议规范:(基于事件模型的交换机设计)<p>
     * 1、交换机命名:业务+ exchange; 交换机为Topic<p>
     * 2、路由键:事件+需要感知的业务(可以不写)<p>
     * 3、队列命名:事件+想要监听服务名+ queue<p>
     * 4、绑定关系:事件+感知的业务(#)<p>
     */

    /**
     * 订单延迟队列
     *
     * @return
     */
    @Bean
    public Queue demoDelayQueue() {
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "demo-topic-exchange");
        arguments.put("x-dead-letter-routing-key", "demo.release.order");
        // 消息过期时间 单位(毫秒)测试以1分钟为例
        arguments.put("x-message-ttl", 60000);
        //消息过期后丢到demo.delay.queue队列,而不是删除
        return new Queue("demo.delay.queue", true, false, false, arguments);
    }

    /**
     * 释放订单队列
     *
     * @return
     */
    @Bean
    public Queue demoReleaseQueue() {
        return new Queue("demo.release.order.queue", true, false, false);
    }

    /**
     * 释放库存队列
     *
     * @return
     */
    @Bean
    public Queue stockReleaseQueue() {
        return new Queue("demo.release.stock.queue", true, false, false);
    }

/绑定

    /**
     * 绑定订单延迟队列
     * <p>
     * 消息队列:demo.delay.queue --> 交换机:demo-topic-exchange --路由键:demo.create.order
     *
     * @return
     */
    @Bean
    public Binding demoCreateBinding() {
        return new Binding("demo.delay.queue",
                Binding.DestinationType.QUEUE,
                "demo-topic-exchange",
                "demo.create.order",
                null);
    }

    /**
     * 绑定释放订单队列
     * <p>
     * 消息队列:demo.release.order.queue --> 交换机:demo-topic-exchange --路由键:demo.release.order
     */
    @Bean
    public Binding demoReleaseBinding() {
        return new Binding("demo.release.order.queue",
                Binding.DestinationType.QUEUE,
                "demo-topic-exchange",
                "demo.release.order.#",
                null);
    }

    /**
     * 订单释放直接和库存释放进行绑定
     */
    @Bean
    public Binding demoReleaseOtherBinding() {
        return new Binding("demo.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "demo-topic-exchange",
                "demo.release.stock.#",
                null);
    }

}


监听队列处理示例

1、模拟创建订单

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 创建订单消息
     */
    @GetMapping("/sendOrderMessage")
    public R sendOrderMessage() {
        for (int i = 1; i < 4; i++) {
            MqEntity mqEntity = new MqEntity();
            mqEntity.setExchange("demo-topic-exchange");
            mqEntity.setMessage("订单id--" + i);
            rabbitTemplate.convertAndSend(mqEntity.getExchange(), "demo.create.order",
                    mqEntity, new CorrelationData(UUID.randomUUID().toString()));
        }
        return R.success("创建订单消息发送完成");
    }

2、监听到订单超时了,开始业务处理,并通知释放库存队列

package cn.jf.system.listener;

import cn.jf.system.entity.MqEntity;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 监听关闭订单队列,定时关闭订单
 *
 * @author jf
 * @version 1.0
 * @Description 描述
 * @date 2022/07/05 19:32
 */
@Slf4j
@RabbitListener(queues = "demo.release.order.queue")
@Component
public class OrderListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    public void listener(MqEntity mqEntity, Channel channel, Message message) throws IOException {
        log.info("准备关闭订单:{}", mqEntity.getMessage());
        try {
            log.info("关闭订单{}--OK", mqEntity.getMessage());
            //并通知库存队列
            mqEntity.setMessage("释放库存");
            rabbitTemplate.convertAndSend(mqEntity.getExchange(), "demo.release.stock", mqEntity);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("关闭订单{}--error", mqEntity.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }

    }

}

3、监听释放库存队列,自动释放库存

package cn.jf.system.listener;

import cn.jf.system.entity.MqEntity;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 监听释放库存队列,自动释放库存
 *
 * @author jf
 * @version 1.0
 * @Description 描述
 * @date 2022/07/12 18:19
 */
@Slf4j
@RabbitListener(queues = "demo.release.stock.queue")
@Component
public class StockListener {

    @RabbitHandler
    public void listener(MqEntity mqEntity, Channel channel, Message message) throws IOException {
        log.info("准备释放库存:{}", mqEntity.getMessage());
        try {
            //TODO Service层处理关闭
            log.info("释放库存--OK");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("释放库存--error");
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }

    }
}


查看交换机
在这里插入图片描述查看队列
在这里插入图片描述模拟创建订单:http://127.0.0.1:9210/jf-system-dev/mq/sendOrderMessage

在这里插入图片描述消息投递到消息服务端情况
在这里插入图片描述
查看结果
在这里插入图片描述
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用RabbitMQ模拟订单超时自动关闭订单及释放库存 的相关文章

  • Akka 的语言和产品替代品是什么?

    现在我正在看游戏框架 https www playframework com 并且非常喜欢它 Play 中提供的功能中最受宣传的部分之一是Akka http akka io 为了更好地理解 Akka 以及如何正确使用它 您能告诉我其他语言或
  • 何时使用 RabbitMQ 铲子以及何时使用 Federation 插件?

    对于我工作的公司 我们希望使用 RabbitMQ 作为我们的主要消息总线 我们的想法是 每个应用程序都使用自己的虚拟主机进行内部通信 并且通过 shovel 或联合插件 我们可以在多个虚拟主机 甚至可能是多台机器 非集群 之间共享某些类型的
  • rabbitmq 通道因 PRECONDITION_FAILED 关闭 - 快速回复消费者不存在

    当我们从 Spring Boot 服务向rabbitmq 发布消息时 出现以下错误 而且这是间歇性的 我们无法重现这一点 AMQP 连接 123 11 xxx xx 5672 错误 org springframework amqp rabb
  • 每次发布后我应该关闭通道/连接吗?

    我在 Node js 中使用 amqplib 但我不清楚代码中的最佳实践 基本上 我当前的代码调用amqp connect 当 Node 服务器启动时 然后为每个生产者和每个消费者使用不同的通道 而不会真正关闭它们中的任何一个 我想知道这是
  • 无法在Windows上启用rabbitmq管理插件

    所以 这就是我所做的 在我的 Windows x64 位机器上安装了 Erlang 安装 RabbitMQ 启动 RabbitMQ 服务 这一步我没有任何错误 但是 当我尝试启用rabbitmq management时 我在控制台中收到一些
  • Django 1.6 + RabbitMQ 3.2.3 + Celery 3.1.9 - 为什么我的 celery 工作人员会死掉:WorkerLostError:工作人员过早退出:信号 11 (SIGSEGV)

    这似乎解决了一个非常相似的问题 但并没有给我足够的洞察力 https github com celery billiard issues 101 https github com celery billiard issues 101听起来尝
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • RabbitMQ - 升级到新版本并收到很多“PRECONDITION_FAILED Unknown Delivery Tag 1”

    刚刚升级到新版本的 RabbitMQ 2 3 1 现在出现以下错误 PRECONDITION FAILED unknown delivery tag 1 随后通道关闭 这适用于较旧的 RabbitMQ 无需客户端更改 在应用程序行为方面 当
  • MongoDB 架构设计 - 实时聊天

    我正在启动一个项目 我认为该项目特别适合 MongoDB 因为它提供的速度和可扩展性 我目前感兴趣的模块是与实时聊天有关的 如果我要在传统的 RDBMS 中执行此操作 我会将其分为 频道 一个频道有很多用户 用户 一个用户有一个频道但有多条
  • 如何在多租户系统中的 RabbitMQ 中使队列私有/安全?

    我已阅读开始使用 http www rabbitmq com getstarted htmlRabbitMQ 提供的指南 甚至还贡献了第六个示例暴风雨 amqp https github com paolo losi stormed amq
  • RabbitMQ 失败,错误:无法连接到节点rabbit@TPAJ05421843:nodedown

    在 Windows 7 Enterprise 计算机上 我全新安装了 Erlang 17 4 和 RabbitMQ 3 4 3 x64 安装成功且顺利 我还没有尝试创建我的第一个队列或交换器 但我已经看到了麻烦 这个问题类似于另一个SO帖子
  • 更改 RabbitMQ 队列中的参数

    我有一个 RabbitMQ 队列 最初声明如下 var result channel QueueDeclare NewQueue true false false null 我正在尝试添加死信交换 因此我将代码更改为 channel Exc
  • 如何重置rabbitmq管理用户

    使用rabbitmq 我们可以安装管理插件 然后我们通过浏览器访问http localhost 55672 使用访客 访客 问题是 我无法再登录 因为我更改了密码并为角色输入了空白 有没有办法重置rabbitmq管理的用户 您可以通过以下方
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • 服务器在 pika.exceptions.StreamLostError: Stream 连接丢失后关闭

    我的队列中有一些图像 我将每个图像传递到我的 Flask 服务器 在其中完成图像处理 并在我的rabbitmq 服务器中收到响应 收到响应后 我收到此错误 pika exceptions StreamLostError 流连接丢失 104
  • rabbitmq 的 REST API

    有没有办法从 ajax 向 RabbitMQ 发送数据 我的应用程序由数千个 Web 客户端 用 js 编写 和 WCF REST 服务组成 现在我试图弄清楚如何为我的应用程序创建可扩展点 这个想法是有一个rabbitmq实例 它从放置在一
  • 从 RabbitMQ 迁移到 Amazon SQS [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我们的初创公司目前正在使用RabbitMQ with Python Django 对于消息队列 现在我们计划转移到Amazon SQS其高可用性
  • RabbitMQ 3.1.3 和丢失的时间戳头

    如果消息中缺少时间戳头 是否可以将代理配置为插入时间戳头 因此 如果发布客户端没有添加时间戳标头 代理是否可以插入与交易所收到消息的时刻相匹配的时间戳值 我应该在哪里寻找该配置 或者这是一个坏主意 截至2015年 原来的问题有了新的答案 这
  • 在 Spring 中设计复杂的通知系统

    我想设计和实现一个复杂的通知系统 其中我有一个用户将自动和手动订阅不同的事件 经过一番研究后 我决定使用 websockets 将通知推送到客户端 并订阅事件 我会选择 RabbitMQ 特别是直接交换 最初的想法是 在建立 websock
  • RabbitMq 和“致命错误:握手失败 -handshake_decode_error”

    我正在使用 Windows Server 2012 Erlang 19 2 和 RabbitMq 3 6 6 我在使用 TLS 配置端点之间的连接时遇到问题 我已经尝试了所有关于 SO 的答案 以及所有 RabbitMq 文档here ht

随机推荐