实战:实现缓存和数据库一致性方案

2023-11-18

原创:微信公众号 阿Q说代码,欢迎分享,转载请保留出处。

哈喽大家好,我是阿Q!

最近不是正好在研究 canal 嘛,刚巧前两天看了一篇关于解决缓存与数据库一致性问题的文章,里边提到了一种解决方案是结合 canal 来操作的,所以阿Q就想趁热打铁,手动来实现一下。

架构

文中提到的思想是:

  • 采用先更新数据库,后删除缓存的方式来解决并发引发的一致性问题;
  • 采用异步重试的方式来保证“更新数据库、删除缓存”这两步都能执行成功;
  • 可以采用订阅变更日志的方式来清除 Redis 中的缓存;

基于这种思想,阿Q脑海中搭建了以下架构

  • APP 从 Redis 中查询信息,将数据的更新写入 MySQL 数据库中;
  • Canal 向 MySQL 发送 dump 协议,接收 binlog 推送的数据;
  • Canal 将接收到的数据投递给 MQ 消息队列;
  • MQ 消息队列消费消息,同时删除 Redis 中对应数据的缓存;

环境准备

这篇文章中有 mysql 的安装教程:mysql 安装

这篇文章中有 canal 的安装教程以及对 mysql 的相关配置:canal安装

考虑到我们服务器之前安装过 RabbitMQ ,所以我们就用 RabbitMQ 来充当消息队列吧。

Canal 配置

修改 conf/canal.properties 配置

# 指定模式
canal.serverMode = rabbitMQ
# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example 

# rabbitmq 服务端 ip
rabbitmq.host = 127.0.0.1
# rabbitmq 虚拟主机 
rabbitmq.virtual.host = / 
# rabbitmq 交换机  
rabbitmq.exchange = xxx
# rabbitmq 用户名
rabbitmq.username = xxx
# rabbitmq 密码
rabbitmq.password = xxx
rabbitmq.deliveryMode =

修改实例配置文件 conf/example/instance.properties

#配置 slaveId,自定义,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10 

# 数据库地址:配置自己的ip和端口
canal.instance.master.address=ip:port 
 
# 数据库用户名和密码 
canal.instance.dbUsername=xxx 
canal.instance.dbPassword=xxx
	
# 指定库和表
canal.instance.filter.regex=.*\\..*    // 这里的 .* 表示 canal.instance.master.address 下面的所有数据库
		
# mq config
# rabbitmq 的 routing key
canal.mq.topic=xxx

然后重启 canal 服务。

这篇文章中有 RabbitMQ 的安装教程:RabbitMQ安装

这篇文章中有 Redis 的安装教程:Redis安装

数据库

建表语句

CREATE TABLE `product_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `price` decimal(10,4) DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  `update_date` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8

数据初始化

INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(1, '从你的全世界路过', 14.0000, '2020-11-21 21:26:12', '2021-03-27 22:17:39');
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(2, '乔布斯传', 25.0000, '2020-11-21 21:26:42', '2021-03-27 22:17:42');
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(3, 'java开发', 87.0000, '2021-03-27 22:43:31', '2021-03-27 22:43:34');

实战

项目引入的依赖比较多,为了不占用过多的篇幅,大家可以在公众号【阿Q说代码】后台回复“canal”获取项目源码!

MySQL 和 Redis 的相关配置在此不再赘述,有不懂的可以私聊阿Q:qingqing-4132;

RabbitMQ 配置

@Configuration
public class RabbitMQConfig {

    public static final String CANAL_QUEUE = "canal_queue";//队列
    public static final String DIRECT_EXCHANGE = "canal";//交换机,要与canal中配置的相同
    public static final String ROUTING_KEY = "routingkey";//routing-key,要与canal中配置的相同

    /**
     * 定义队列
     **/
    @Bean
    public Queue canalQueue(){
        return new Queue(CANAL_QUEUE,true);
    }

    /**
     * 定义直连交换机
     **/
    @Bean
    public DirectExchange directExchange(){
       return new DirectExchange(DIRECT_EXCHANGE);
    }

    /**
     * 队列和交换机绑定
     **/
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(canalQueue()).to(directExchange()).with(ROUTING_KEY);
    }
}

商品信息入缓存

/**
 * 获取商品信息:
 * 先从缓存中查,如果不存在再去数据库中查,然后将数据保存到缓存中
 * @param productInfoId
 * @return
 */
@Override
public ProductInfo findProductInfo(Long productInfoId) {
	//1.从缓存中获取商品信息
	Object object = redisTemplate.opsForValue().get(REDIS_PRODUCT_KEY + productInfoId);
	if(ObjectUtil.isNotEmpty(object)){
		return (ProductInfo)object;
	}
	//2.如果缓存中不存在,从数据库获取信息
	ProductInfo productInfo = this.baseMapper.selectById(productInfoId);
	if(productInfo != null){
		//3.将商品信息缓存
		redisTemplate.opsForValue().set(REDIS_PRODUCT_KEY+productInfoId, productInfo,
				REDIS_PRODUCT_KEY_EXPIRE, TimeUnit.SECONDS);
		return productInfo;
	}
	return null;
}

执行方法后,查看 Redis 客户端是否有数据存入

更新数据入MQ

/**
 * 更新商品信息
 * @param productInfo
 * @return
 */
@PostMapping("/update")
public AjaxResult update(@RequestBody ProductInfo productInfo){
	productInfoService.updateById(productInfo);
	return AjaxResult.success();
}

当我执行完 update 方法的时候,去RabbitMQ Management 查看,发现并没有消息进入队列。

问题描述

通过排查之后我在服务器中 canal 下的 /usr/local/logs/example/example.log 文件里发现了问题所在。

原因就是meta.dat中保存的位点信息和数据库的位点信息不一致导致 canal 抓取不到数据库的动作。

于是我找到 canal 的 conf/example/instance.properties 实例配置文件,发现没有将canal.instance.master.address=127.0.0.1:3306 设置成自己的数据库地址。

解决方案
  • 先停止 canal 服务的运行;
  • 删除meta.dat文件;
  • 再重启 canal,问题解决;

再次执行 update 方法,会发现 RabbitMQ Management中已经有我们想要的数据了。

MQ接收数据

编写 RabbitMQ 消费代码的逻辑

@RabbitListener(queues = "canal_queue")//监听队列名称
public void getMsg(Message message, Channel channel, String msg) throws IOException {
	long deliveryTag = message.getMessageProperties().getDeliveryTag();
	try {
		log.info("消费的队列消息来自:" + message.getMessageProperties().getConsumerQueue());

		//删除reids中对应的key
		ProductInfoDetail productInfoDetail = JSON.parseObject(msg, ProductInfoDetail.class);
		log.info("库名:"+ productInfoDetail.getDatabase());
		log.info("表名: "+ productInfoDetail.getTable());
		if(productInfoDetail!=null && productInfoDetail.getData()!=null){
			List<ProductInfo> data = productInfoDetail.getData();
			ProductInfo productInfo = data.get(0);
			if(productInfo!=null){
				Long id = productInfo.getId();
				redisTemplate.delete(REDIS_PRODUCT_KEY+id);
				channel.basicAck(deliveryTag, true);
				return;
			}
		}
		channel.basicReject(deliveryTag ,true);
		return;
	}catch (Exception e){
		channel.basicReject(deliveryTag,false);
		e.printStackTrace();
	}
}

当我们再次调用 update接口时,控制台会打印以下信息

从图中打印的信息可以看出就是我们的库和表以及消息队列,Redis 客户端中缓存的信息也被删除了。

拓展

看到这,你肯定会问:RabbitMQ 是阅后即焚的机制,它确认消息被消费者消费后会立刻删除,如果此时我们的业务还没有跑完,没来的及删除 Redis 中的缓存就宕机了,岂不是缓存一直都得不到更新了吗?

首先我们要明确的是 RabbitMQ 是通过消费者回执来确认消费者是否成功处理消息的,即消费者获取消息后,应该向 RabbitMQ 发送 ACK 回执,表明自己已经处理消息了。

为了不让上述问题出现,消费者返回 ACK 回执的时机就显得非常重要了, 而 SpringAMQP 也为我们提供了三种可选的确认模式:

  • manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack;
  • auto:自动 ack ,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack,抛出异常则返回 nack;
  • none:关闭 ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除;

由此可知在 none 模式下消息投递最不可靠,可能会丢失消息;在默认的 auto 模式下如果出现服务器宕机的情况也是会丢失消息的,本次实战中,阿Q为了防止消息丢失采用的是 manual 这种模式,配置信息如下:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #开启手动确认

所以在代码中也就出现了

//用于肯定确认
channel.basicAck(deliveryTag, true);
//用于否定确认
channel.basicReject(deliveryTag ,true);

当然此种模式虽然不会丢失消息,但是会导致效率变低。

今天的内容到这里就结束了,赶快动手体验一下吧!后台回复“canal”获取项目源码。我是阿Q,我们下期再见。

写文不易,希望大家可以一键三连:点赞、转发、在看

阿Q的小破站上新了,方便大家更好的查看往期文章:https://zhangxiaoq.github.io/

推荐阅读

实战:画了几张图,终于把OAuth2搞清楚了

重磅出击,20张图带你彻底了解ReentrantLock加锁解锁的原理

领导看了我写的关闭超时订单,让我出门左转!

看了同事写的代码,我竟然开始默默的模仿了。。。

实战篇:断点续传?文件秒传?手撸大文件上传

参考文章:

https://mp.weixin.qq.com/s?__biz=MzIyOTYxNDI5OA==&mid=2247487312&idx=1&sn=fa19566f5729d6598155b5c676eee62d&chksm=e8beb8e5dfc931f3e35655da9da0b61c79f2843101c130cf38996446975014f958a6481aacf1&scene=178&cur_album_id=1699766580538032128#rd

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

实战:实现缓存和数据库一致性方案 的相关文章

  • seq2seq模型

    转载自 http blog csdn net sunlylorn article details 50607376
  • k--最近邻算法(KNN)

    目录 一 简介 二 举例理解 三 算法步骤 四 其他说明 1 关于距离的计算 2 超参数 3 关于K值的选择 4 取K值的方法 5 关于决策依据 6 优缺点 五 代码 一 简介 邻近算法 KNN 是数据挖掘分类技术最简单的方法之一 所谓K最

随机推荐

  • Asp.net_Study学习笔记

    Asp net Study web基本原理 浏览器向服务器发送请求 服务器响应 报错 HTTP Error 403 14 Forbidden Web 服务器被配置为不列出此目录的内容 解决 打开控制面板里的程序 点击启用或关闭Windows
  • Matlab实现Kmeans算法(每行代码标注详细注解)

    本文主要为了完成平日作业 并进一步加深对算法的理解 也希望对来访的读者有所帮助 该算法的优化Kmean 算法的代码详解已在其他文章给出 Matlab实现Kmeans 算法 每行代码标注详细注解 高垚淼的博客 CSDN博客 Matlab实现B
  • ESD静电放电最常用的三种模型及其防护设计

    推荐好文 建议直接看链接 1 静电放电最常用的三种模型及其防护设计 http www 360doc com content 17 0827 14 32066980 682502209 shtml 人体模型HBM 机器模型MM 充电器件模型C
  • LA@齐次线性方程组解的结构

    文章目录 齐次线性方程组解的结构 解的性质 齐次线性方程组的解的线性组合还是方程组的解 基础解系 通解 定理 齐次线性方程组基础解系存在定理 齐次线性方程组的基础解系包含的向量个数 秩 应用和示例 推论1 推论2 推论3 转置矩阵对的乘积秩
  • 解决jar文件不显示图标问题

    版权声明 本文为转载文章 遵循 CC 4 0 BY SA 版权协议 本文链接 https blog csdn net qyfx123456 article details 104713149 一 问题的产生 最近 重新装了JDK 配置了环境
  • 生命在于学习——Socket编程(偏安全方面)

    本篇文章仅用于学习记录和交流 不得用于其他违规用途 产生的不良后果 自己负责 一 Socket介绍 首先socket 套接字 是工作在应用层和传输层之间一个抽像层 为什么要有他呢 虽然我们已经有了ip port可以和世界上任意一台计算机上的
  • 扫雷游戏是一款十分经典的单机小游戏。 问题 H: 扫雷游戏

    题目描述 扫雷游戏是一款十分经典的单机小游戏 在n行m列的雷区中有一些格子含有地雷 称之为地雷格 其他格子不含地雷 称之为非地雷格 玩家翻开一个非地雷格时 该格将会出现一个数字 提示周围格子中有多少个是地雷格 游戏的目标是在不翻出任何地雷格
  • axios封装(例如:请求头、token、超时、BaseUrl、请求错误、请求重复)

    axios封装 文章目录 axios封装 前言 1 为什么封装Axios 2 包括的功能 一 Axios是什么 二 安装 Axios 1 安装axios JavaScript版本 2 安装axios TypeScript版本 三 封装 Ax
  • 关于 forza horizon 4 极限竞速地平线4 进游戏没声音 看见车没声音问题 的解决方案 (可能是微星的锅)

    这个问题可能是我微星笔记本的问题 型号GS63 7RE 018n 这个机器有个独立声卡 有个单独的sb软件 地平线4用蓝牙音箱的特殊问题 如果音箱有麦克风 声音就崩了 我用的bose soundlink 我用线从电脑上接出来 结果进了游戏就
  • QT线程池实验研究与分析(QThread与和QThreadPool + QRunnable使用上的区别)

    第一部分 QT线程池的构建与使用 网上关于QT线程池QThreadPool的文章很多 而且大都千篇一律 基本上都是参考QT的帮助文档介绍QT全局线程池的用法 这样就往往会使人产生误解 QT是不是推荐大家使用其全局线程池 而不推荐使用自定义构
  • Docker客户端连接Docker Daemon的方式

    Docker为C S架构 服务端为docker daemon 客户端为docker service 支持本地unix socket域套接字通信与远程socket通信 默认为本地unix socket通信 要支持远程客户端访问需要做如下设置
  • Linux下DM644x设备驱动I2C之总线驱动(一)详解

    本文均属自己阅读源码的点滴总结 转账请注明出处谢谢 欢迎和大家交流 qq 1037701636 email 200803090209 zjut com gzzaigcn2012 gmail com linux DM6441下I2C设备驱动的
  • 融云深度参与「新加坡 GTLC 大会」,连接亚太机遇、开拓国际市场

    8 月 18 日 由 TGO 鲲鹏会主办的新加坡 GTLC Global Tech Leadership Conference 全球技术领导力大会 圆满收官 融云作为共创伙伴深度参与了大会 关注 融云全球互联网通信云 了解更多 本次大会以
  • centos7.9安装postgresql12

    目录 1 下载安装包 2 安装 首先登陆官方网站下载 PostgreSQL Downloads 1 下载安装包 按照官方的命令安装 下载PGSQL的rpm包 sudo yum install y https download postgre
  • 社会经济学中的因果分析思想初探

    社会经济学中的因果分析思想初探 目录 1 因果是什么 2 因果关系和相关关系 3 因果推断的三个层级 4 经典因果推断模型 5 社会经济学中的因果实证分析 5 1随机控制实验 5 2自然实验 5 3准实验 5 3 1准实验 双重差分法 5
  • 基于STC8G1K08A的可调节占空比和频率的PWM应用案例

    基于STC8G的可调节占空比和频率的PWM应用案例 CSDN的小白分享 前言 一 STC8G系列的介绍 二 使用记录 1 建立工程 2 打开工程所需的芯片功能 及本人的函数 总结 CSDN的小白分享 前言 学习单片机以来 都是写了就算的态度
  • 如何在Rich Edit Control中管理超链接

    如何在Rich Edit Control中管理超链接 一 在Rich Edit Control 中显示超链接 在Rich Edit Control 中显示超链接的格式 也就是给选择的文本添加CFE LINK属性 可以通过以下两种方法实现 1
  • 详解MNIST数据集下载、解析及显示的Python实现

    Content MNIST数据集基本介绍 下载MNIST数据集到本地 解析MNIST数据集 显示MNIST数据集中训练集的前9张图片和标签 随着图像处理 计算机视觉 机器学习 甚至深度学习的蓬勃发展 一个良好的数据集作为学习和测试相关算法非
  • python编程基础-task4-FOR、IF以及while

    一 IF语句 avg 90 math 95 Chinese 85 if math lt Chinese print 语文更好 if math gt Chinese print 数学更好 Chinese 5 if avg Chinese pr
  • 实战:实现缓存和数据库一致性方案

    原创 微信公众号 阿Q说代码 欢迎分享 转载请保留出处 哈喽大家好 我是阿Q 最近不是正好在研究 canal 嘛 刚巧前两天看了一篇关于解决缓存与数据库一致性问题的文章 里边提到了一种解决方案是结合 canal 来操作的 所以阿Q就想趁热打