Rabbitmq延迟队列实现定时任务

2023-11-13

场景

开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期、订单定时关闭、微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列、基于优先级队列的JDK延迟队列、时间轮等。因为我们项目中本身就使用到了Rabbitmq,所以基于方便开发和维护的原则,我们使用了Rabbitmq延迟队列来实现定时任务,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看我之前的文章Spring boot集成RabbitMQ

Rabbitmq延迟队列

Rabbitmq本身是没有延迟队列的,只能通过Rabbitmq本身队列的特性来实现,想要Rabbitmq实现延迟队列,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)

死信交换机

一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。

  1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

  2. 上面的消息的TTL到了,消息过期了。

  3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机

消息TTL(消息存活时间)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:
当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去

处理流程图

流程图

创建交换机(Exchanges)和队列(Queues)

创建死信交换机

创建死信交换机
如图所示,就是创建一个普通的交换机,这里为了方便区分,把交换机的名字取为:delay

创建自动过期消息队列

这个队列的主要作用是让消息定时过期的,比如我们需要2小时候关闭订单,我们就需要把消息放进这个队列里面,把消息过期时间设置为2小时
创建自动过期消息队列
创建一个一个名为delay_queue1的自动过期的队列,当然图片上面的参数并不会让消息自动过期,因为我们并没有设置x-message-ttl参数,如果整个队列的消息有消息都是相同的,可以设置,这里为了灵活,所以并没有设置,另外两个参数x-dead-letter-exchange代表消息过期后,消息要进入的交换机,这里配置的是delay,也就是死信交换机,x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key,跟发送消息的routing-key一个道理,根据这个key将消息放入不同的队列

创建消息处理队列

这个队列才是真正处理消息的队列,所有进入这个队列的消息都会被处理
创建消息处理队列
消息队列的名字为delay_queue2

消息队列绑定到交换机

进入交换机详情页面,将创建的2个队列(delay_queue1和delay_queue2)绑定到交换机上面
绑定自动过期消息队列
自动过期消息队列的routing key 设置为delay

绑定delay_queue2
绑定delay_queue2
delay_queue2 的key要设置为创建自动过期的队列的x-dead-letter-routing-key参数,这样当消息过期的时候就可以自动把消息放入delay_queue2这个队列中了

绑定后的管理页面如下图:
队列绑定后

当然这个绑定也可以使用代码来实现,只是为了直观表现,所以本文使用的管理平台来操作

发送消息

String msg = "hello word";
MessageProperties messageProperties = new MessageProperties();
		messageProperties.setExpiration("6000");
		messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
		Message message = new Message(msg.getBytes(), messageProperties);
		rabbitTemplate.convertAndSend("delay", "delay",message);

主要的代码就是

messageProperties.setExpiration("6000");

设置了让消息6秒后过期

注意:因为要让消息自动过期,所以一定不能设置delay_queue1的监听,不能让这个队列里面的消息被接受到,否则消息一旦被消费,就不存在过期了

接收消息

接收消息配置好delay_queue2的监听就好了

package wang.raye.rabbitmq.demo1;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DelayQueue {
	/** 消息交换机的名字*/
	public static final String EXCHANGE = "delay";
	/** 队列key1*/
	public static final String ROUTINGKEY1 = "delay";
	/** 队列key2*/
	public static final String ROUTINGKEY2 = "delay_key";

	/**
	 * 配置链接信息
	 * @return
	 */
	@Bean
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);
		
		connectionFactory.setUsername("kberp");
		connectionFactory.setPassword("kberp");
		connectionFactory.setVirtualHost("/");
		connectionFactory.setPublisherConfirms(true); // 必须要设置
		return connectionFactory;
	}
	
	/**  
	 * 配置消息交换机
     * 针对消费者配置  
        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        HeadersExchange :通过添加属性key-value匹配  
        DirectExchange:按照routingkey分发到指定队列  
        TopicExchange:多关键字匹配  
     */  
    @Bean  
    public DirectExchange defaultExchange() {  
    	return new DirectExchange(EXCHANGE, true, false);
    } 
   
    /**
     * 配置消息队列2
     * 针对消费者配置  
     * @return
     */
    @Bean
    public Queue queue() {  
       return new Queue("delay_queue2", true); //队列持久  
  
    }
    /**
     * 将消息队列2与交换机绑定
     * 针对消费者配置  
     * @return
     */
    @Bean  
    @Autowired
    public Binding binding() {  
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);  
    } 

    /**
     * 接受消息的监听,这个监听会接受消息队列1的消息
     * 针对消费者配置  
     * @return
     */
    @Bean  
    @Autowired
    public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
        container.setQueues(queue());  
        container.setExposeListenerChannel(true);  
        container.setMaxConcurrentConsumers(1);  
        container.setConcurrentConsumers(1);  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
        container.setMessageListener(new ChannelAwareMessageListener() {

			public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
				byte[] body = message.getBody();  
                System.out.println("delay_queue2 收到消息 : " + new String(body));  
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  
				
			}  
  
        });  
        return container;  
    }  
    
   
}

在消息监听中处理需要定时处理的任务就好了,因为Rabbitmq能发送消息,所以可以把任务特征码发过来,比如关闭订单就把订单id发过来,这样就避免了需要查询一下那些订单需要关闭而加重MySQL负担了,毕竟一旦订单量大的话,查询本身也是一件很费IO的事情

总结

基于Rabbitmq实现定时任务,就是将消息设置一个过期时间,放入一个没有读取的队列中,让消息过期后自动转入另外一个队列中,监控这个队列消息的监听处来处理定时任务具体的操作

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rabbitmq延迟队列实现定时任务 的相关文章

  • 如何使用自动装配的 Spring Boot 监听多个队列?

    我是 Spring Boot 的新手 正在尝试它 目前我已经构建了一些应用程序 我希望能够通过队列相互通信 我目前有一个侦听器对象 可以从特定队列接收消息 Configuration public class Listener final
  • 过期的消息不会从 RabbitMQ 中删除

    我通过生产者向 RabbitMQ 发送一条普通消息 然后发送第二条消息expiration属性分配给一个值 然后使用rabbitmqctl list queues命令我监视消息的状态 我发现如果我先发送一条普通消息 然后发送一条消息expi
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • rabbitmq-erlang-client,使用 rebar 友好的 pkg,在开发环境上工作在 rebar 版本上失败

    我成功地将rabbitmq erlang client的rebar友好包用于一个简单的Hello World rebarized和OTP 兼容 应用程序 并且在开发环境中工作正常 我能够启动 erl 控制台并执行我的操作applicatio
  • RabbitMQ C# API:如何检查绑定是否存在?

    使用 RabbitMQ C API 我如何检查给定队列到给定交换是否存在绑定 很多 RabbitMQ 调用都是幂等的 所以有些人可能会说在这些情况下检查是不必要的 但我认为它们在测试中很有用 您可以使用他们的 REST API 来调用并查看
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • MongoDB 架构设计 - 实时聊天

    我正在启动一个项目 我认为该项目特别适合 MongoDB 因为它提供的速度和可扩展性 我目前感兴趣的模块是与实时聊天有关的 如果我要在传统的 RDBMS 中执行此操作 我会将其分为 频道 一个频道有很多用户 用户 一个用户有一个频道但有多条
  • Celery 任务状态取决于 CELERY_TASK_RESULT_EXPIRES

    据我所知 任务状态完全取决于 CELERY TASK RESULT EXPIRES 设置的值 如果我在任务完成执行后检查此间隔内的任务状态 则返回的状态为 AsyncResult task id state 是正确的 如果没有 状态将不会更
  • 多个队列在一个通道中消耗

    我使用rabbitMq 来管理和使用队列 我有多个队列 它们的数量并不具体 我使用直接交换来发布消息 我怎样才能仅使用一个队列来消费每个队列的所有消息 基于routing key 渠道 此时我假设我有 5 个队列 我使用了 for 循环并为
  • springrabbitmq:无法将id设置为属性?

    我有一个属性文件 其中包含队列 其值为queue name 如果我在其他请使用该属性 那么它可以工作 但如果我在 id 上使用它 那么它会失败
  • Celery 与rabbitmq 创建结果多个队列

    我已经用 RabbitMQ 安装了 Celery 问题是 对于返回的每个结果 Celery 都会在 Rabbit 中创建队列 并在交换 celeryresults 中使用任务 ID 我仍然想得到结果 但在一个队列上 我的芹菜配置 from
  • 即使设置了 cookie,RabbitMQ 身份验证也会失败

    我最近在运行 lattePanda 的 Windows 10 上安装了带有 ErlanOTP 的rabbitmq 我运行rabbitmqctl status并收到以下错误 C Program Files RabbitMQ Server ra
  • 在 Celery 工作线程中捕获 Heroku SIGTERM 以优雅地关闭工作线程

    我对此进行了大量研究 令我惊讶的是我还没有在任何地方找到一个好的答案 我正在 Heroku 上运行一个大型应用程序 并且我有某些运行很长时间处理的 celery 任务 并在任务结束时保存结果 每次我在 Heroku 上重新部署时 它都会发送
  • MassTransit 生成我想忽略的_skipped 队列

    任何人都可以猜出问题是什么 因为我不知道如何解决这个问题 大众运输产生 skipped队列 我不知道为什么它会生成这些队列 它是在执行发布请求响应时生成的 请求客户端是使用 MassTransit RequestClientExtensio
  • Erl 无法连接到本地 EPMD。为什么?

    Erlang R14B04 erts 5 8 5 source 64 bit rq 1 async threads 0 kernel poll false Eshell V5 8 5 abort with G root ip 10 101
  • 如何停止本地主机上的 RabbitMQ 服务器

    我在 OS X 上安装了 RabbitMQ 服务器 并在命令行上启动它 现在 我应该如何阻止它运行还不清楚 我这样做之后 sudo rabbitmq server detached I get Activating RabbitMQ plu
  • 在 Spring 中设计复杂的通知系统

    我想设计和实现一个复杂的通知系统 其中我有一个用户将自动和手动订阅不同的事件 经过一番研究后 我决定使用 websockets 将通知推送到客户端 并订阅事件 我会选择 RabbitMQ 特别是直接交换 最初的想法是 在建立 websock
  • PHPUnit RabbitMQ:为创建连接函数编写测试

    我面临以下问题 我编写了一个函数 根据所需参数创建连接对象 AMQPConnection 现在我想编写相应的单元测试 我只是不知道在没有运行 RabbitMQ 代理的情况下如何做到这一点 这是有问题的函数 public function g
  • 具有延迟的简单可扩展工作/消息队列

    我需要设置一个作业 消息队列 并可以选择为任务设置延迟 以便空闲工作人员不会立即拾取它 而是在一定时间后 可能因任务而异 我研究了几个 Linux 队列解决方案 rabbitmq gearman memcacheq 但它们似乎都没有提供开箱
  • 使用主题交换运行多个 Celery 任务

    我正在用 Celery 替换一些自制代码 但很难复制当前的行为 我期望的行为如下 创建新用户时 应向tasks与交换user created路由键 该消息应该触发两个 Celery 任务 即send user activate email

随机推荐

  • do while循环语句的学习以及练习

    今天学的是do while循环语句 先执行循环体 直到条件的表达式为false 与while循环语句的区别 while语句先判断条件 满足时执行循环体 do while语句先执行循环体 满足条件在执行 语法 do 循环体 while 条件
  • kinova-jaco2使用Moveit!控制真实机械臂抓取固定点物体

    kinova jaco2使用Moveit 控制真实机械臂抓取固定点物体 一 机械臂坐标系 坐标系方向 位姿方向 轴的起始点 二 启动机械臂和Moveit 三 实现抓取 python代码 python文件建议直接用python启动 四 遇到的
  • react hook之useMemo

    useMemo的作用 Pass a create function and an array of dependencies useMemo will only recompute the memoized value when one o
  • LogisticRegression(逻辑回归)

    LogisticRegression定义 logistic回归 是一种广义的线性回归分析模型 常用于数据挖掘 疾病自动诊断 经济预测等领域 例如 探讨引发疾病的危险因素 并根据危险因素预测疾病发生的概率等 以胃癌病情分析为例 选择两组人群
  • SVG转为Png

    1 pom中引入maven依赖
  • 自定义实现OAuth2.0 授权码模式

    文章目录 OAuth2 0 授权码模式 实践 依赖知识 术语 授权码流程 认证服务器 拉起请求用户授权页面 用户手动授权 提交授权 生成code 下发Token 第三方应用 收到code并请求Token 访问受保护的资源 项目结构 Tomc
  • 类EMD的“信号分解方法”及MATLAB实现(第四篇)——VMD

    重头戏来了 在以往的应用经验里 VMD方法在众多模态分解方法中可以说是非常好的 从催更力度上看 这个方法也是格外受关注 笔者决定加快进度快一些写完这个方法 十月份了有些同学要开始做毕设 希望这篇文能帮上忙 1 VMD 变分模态分解 的概念
  • poj1338【丑数·DP】

    我记得这道题以前写过 而且是写出来了 DP吧 然后现在想了好久 没想出来 然后考虑一下递推 mdzz 直接就是让之前的这个每次乘以2 3 5就好了嘛 然后每轮取最小 include
  • jquery form表单.serialize()序列化后中文乱码问题原因及解决decodeURIComponent

    jquery form表单 serialize 序列化后中文乱码问题原因及解决 原因 serialize 自动调用了encodeURIComponent方法将数据编码了 解决方法 调用decodeURIComponent XXX true
  • 列表list转树形结构(python/golang/js/php)

    文章目录 1 原数据 2 利用对象内存共享生成嵌套结构 2 1 算法原理 2 2 算法实现 2 2 1 JS 2 2 2 Python 2 2 3 go 2 2 4 php 2 3 运行结果 3 递归 3 1 算法实现 3 1 1 pyth
  • 基于协同过滤算法的书籍推荐 毕业设计-附源码101555

    摘 要 21世纪的今天 随着社会的不断发展与进步 人们对于信息科学化的认识 已由低层次向高层次发展 由原来的感性认识向理性认识提高 管理工作的重要性已逐渐被人们所认识 科学化的管理 使信息存储达到准确 快速 完善 并能提高工作管理效率 促进
  • 微服务restful风格,用Post在服务之间发送请求接收不到参数接收不到问题(@RequestParam和@RequestBody)

    上代码 发送端 接收端 问题 发送端可以接受从前段传过来的数据 但是请求接收端时 接收端可以接收url请求 但是参数传不到接收端 分析 用get和post传输的数据是截然不同的 用get是追加在url之后 直接放在请求头 但是post请求的
  • 体验css:repeat和grid

    文章目录 一 repeat 1 语法 2 auto fill和auto fit 3 专属尺寸 fr auto max content min content 二 grid 1 设置grid布局 2 设置列宽行高 3 设置间距 4 设置分区
  • 【C++实现】 数据库连接池

    文章目录 涉及知识 为什么要弄连接池 功能介绍 成员变量讲解 代码剖析 Connection h Connection cpp ConnectionPool h ConnectionPool cpp 性能测试 难点 总结 涉及知识 MySQ
  • 解一元二次方程——Java

    解一元二次方程 可以使用下面的公式求元二次方程ax x bx c 0的两个根 b b 4ac称作一元二次方程的判别式 如果它是正值 那么一元二次方程就有两个实数根 如果它为0 方程式就只有一个根 如果它是负值 方程式无实数根 编写程序 提示
  • js中使用websocket

    后端地址是http的 websocket地址 ws开头 后端地址是https的 websocket地址wss开头 对于websocket没有跨域的问题 import MessageBox from element ui let url ws
  • Linux学习笔记--8(文件权限)

    文件权限与归属 Linux不同的字符来区分文件类型 常见如下 普通文件 d 目录文件 l 链接文件 b 块设备文件 c 字符设备文件 p 管道文件 对应目录文件 可读 表示能够读取目录内的文件列表 可写 表示能够在目录内新增 删除 重命名文
  • Oracle : ORA-02290: 违反检查约束条件

    背景 一个oracle表 有个字段开始被设置不为空 后来我想测试 把这个不为空 去掉了 然后保存 就报错 om dtwave meteor connector common exception ConnectorException Writ
  • 感知机原始形式、对偶形式的Python实现

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 感知机学习的目标就是求得一个能够将训练数据集中正负实例完全分开的分类超平面 感知机原始形式 from future import division import rando
  • Rabbitmq延迟队列实现定时任务

    场景 开发中经常需要用到定时任务 对于商城来说 定时任务尤其多 比如优惠券定时过期 订单定时关闭 微信支付2小时未支付关闭订单等等 都需要用到定时任务 但是定时任务本身有一个问题 一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执