Spring Boot + Disruptor

2023-10-27

首先了解一下Disrupto的背景:

Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。

Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。

从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。

Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。

其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。

Disruptor 的核心概念

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

Ring Buffer

如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

Sequence Disruptor

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。

(注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。

Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

Wait Strategy

定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

Producer

即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

案例-demo

通过下面8个步骤,你就能将Disruptor Get回家啦:

1、添加pom.xml依赖

<dependency>

<groupId>com.lmax</groupId>

<artifactId>disruptor</artifactId>

<version>3.4.4</version>

</dependency>

2、消息体Model

/**

* 消息体

*/

@Data

public class MessageModel {

private String message;

}

3、构造EventFactory

public class HelloEventFactory implements EventFactory<MessageModel> {

@Override

public MessageModel newInstance() {

return new MessageModel();

}

}

4、构造EventHandler-消费者

@Slf4j

public class HelloEventHandler implements EventHandler<MessageModel> {

@Override

public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {

try {

//这里停止1000ms是为了确定消费消息是异步的

Thread.sleep(1000);

log.info("消费者处理消息开始");

if (event != null) {

log.info("消费者消费的信息是:{}",event);

}

} catch (Exception e) {

log.info("消费者处理消息失败");

}

log.info("消费者处理消息结束");

}

}

5、构造BeanManager

/**

* 获取实例化对象

*/

@Component

public class BeanManager implements ApplicationContextAware {

private static ApplicationContext applicationContext = null;

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationContext = applicationContext;

}

public static ApplicationContext getApplicationContext() { return applicationContext; }

public static Object getBean(String name) {

return applicationContext.getBean(name);

}

public static <T> T getBean(Class<T> clazz) {

return applicationContext.getBean(clazz);

}

}

6、构造MQManager

@Configuration

public class MQManager {

@Bean("messageModel")

public RingBuffer<MessageModel> messageModelRingBuffer() {

//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理

ExecutorService executor = Executors.newFixedThreadPool(2);

//指定事件工厂

HelloEventFactory factory = new HelloEventFactory();

//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率

int bufferSize = 1024 * 256;

//单线程模式,获取额外的性能

Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,

ProducerType.SINGLE, new BlockingWaitStrategy());

//设置事件业务处理器---消费者

disruptor.handleEventsWith(new HelloEventHandler());

// 启动disruptor线程

disruptor.start();

//获取ringbuffer环,用于接取生产者生产的事件

RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

return ringBuffer;

}

7、构造Mqservice和实现类-生产者

public interface DisruptorMqService {

/**

* 消息

* @param message

*/

void sayHelloMq(String message);

}

@Slf4j

@Component

@Service

public class DisruptorMqServiceImpl implements DisruptorMqService {

@Autowired

private RingBuffer<MessageModel> messageModelRingBuffer;

@Override

public void sayHelloMq(String message) {

log.info("record the message: {}",message);

//获取下一个Event槽的下标

long sequence = messageModelRingBuffer.next();

try {

//给Event填充数据

MessageModel event = messageModelRingBuffer.get(sequence);

event.setMessage(message);

log.info("往消息队列中添加消息:{}", event);

} catch (Exception e) {

log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());

} finally {

//发布Event,激活观察者去消费,将sequence传递给改消费者

//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer

messageModelRingBuffer.publish(sequence);

}

}

}

8、构造测试类及方法

@Slf4j

@RunWith(SpringRunner.class)

@SpringBootTest(classes = DemoApplication.class)

public class DemoApplicationTests {

@Autowired

private DisruptorMqService disruptorMqService;

/**

* 项目内部使用Disruptor做消息队列

* @throws Exception

*/

@Test

public void sayHelloMqTest() throws Exception{

disruptorMqService.sayHelloMq("消息到了,Hello world!");

log.info("消息队列已发送完毕");

//这里停止2000ms是为了确定是处理消息是异步的

Thread.sleep(2000);

}

}

测试运行结果

2020-04-05 14:31:18.543 INFO 7274 --- [ main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl : record the message: 消息到了,Hello world!

2020-04-05 14:31:18.545 INFO 7274 --- [ main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl : 往消息队列中添加消息:MessageModel(message=消息到了,Hello world!)

2020-04-05 14:31:18.545 INFO 7274 --- [ main] c.e.utils.demo.DemoApplicationTests : 消息队列已发送完毕

2020-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : 消费者处理消息开始

2020-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : 消费者消费的信息是:MessageModel(message=消息到了,Hello world!)

2020-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : 消费者处理消息结束

总结

其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Boot + Disruptor 的相关文章

  • 在Java中清空数组/处理

    除了循环遍历数组中的每个元素并将每个元素设置为 null 之外 Java 处理中是否有一个本机函数可以简单地清空数组 或销毁它 以便能够将其重新声明为新数组 There s Arrays fill myArray null 并不是说它执行的
  • 使用 Android WebViewClient 启用特定 SSL 协议

    我的应用程序使用WebViewClient与服务器建立 SSL 连接 服务器配置为仅接受 TLSv1 1 及以上协议 使用 Android 时 如何检查哪些 SSL 协议是 a 支持的和 b 默认启用的WebViewClient在设备上 如
  • @OneToMany 与 @JoinTable 错误

    我试图理解 OneToMany with JoinTable 对于这样的场景 我正在使用 JPA 2 1 Hibernate 5 0 4 和 Oracle 11 XE 当我打电话时userDao save user 下面的代码 我有 jav
  • firebase推送通知错误Spring Boot服务器端

    我正在尝试从 Spring Boot 服务器端发送通知到客户端 android 服务器运行良好 一切都很好 2020 09 01 08 13 07 691 INFO 18941 restartedMain e DevToolsPropert
  • 在 TestNG 中运行多个类

    我正在尝试自动化一个场景 其中我想登录一次应用程序 然后进行操作而无需再次重新登录 考虑一下 我有在特定类的 BeforeSuite 方法中登录应用程序的代码 public class TestNGClass1 public static
  • Java Spring 应用程序存在内存泄漏。系统非堆内存不断增加

    我已使用 yourkit 分析器监视我的 Web 应用程序 保留最大大小的主要对象是 SessionFactoryImpl webappclassloader 和 CGlib 对象显示 spring crone调度程序会导致内存泄漏吗 我尝
  • 在 Hibernate 中创建 UPDATE RETURNING 查询

    在 Oracle 中 我们可以创建一个更新查询 该查询将使用 RETURNING 子句返回更新的记录 Hibernate中有类似的功能吗 除了数据库生成的值之外 Hibernate 显然不需要返回更新的实例 因为对象传递给Session s
  • 是否可以从另一个方法传递 args[] 来调用 main 方法?

    我试图从另一个传递参数的方法调用类的主要方法 就像从命令行运行该类时一样 有没有办法做到这一点 您可以致电main方法就像您调用任何其他 静态 方法一样 MyClass main new String arg1 arg2 arg3 Exam
  • 如何制作无限的jscrollpane?

    我之前已经实现过拖动滚动 但是创建无限滚动窗格的最佳方法是什么 当然不会有任何滚动条 我将实现拖动滚动 我想做的是在无限表面上实现动态加载 EDIT 当然 它实际上不会是无限的 我想问如何伪造它 您可以执行以下操作 AdjustmentCl
  • SimpleDateFormat 将 lenient 设置为 false 时出现异常

    为什么这段代码会抛出无法解析日期的异常 SimpleDateFormat f new SimpleDateFormat yyyy MM dd T HH mm ss 000Z f setLenient false String dateStr
  • 如何使用 Guava 连接字符串?

    我写了一些代码来连接字符串 String inputFile for String inputLine list inputFile inputLine trim 但我不能使用 连接 所以我决定使用 Guava 所以我需要使用Joiner
  • 使用Java开发跨平台,不同平台字体缩放不同

    我正在为我的大学制作一些软件 需要一个 GUI 在它的第一个版本中 我让它使用系统外观 因此它看起来像 Linux Mac Windows 中的本机应用程序 我发现这很麻烦 因为我必须根据操作系统使所有 JLabel 具有不同的大小 无论分
  • HTTP PUT 在 Java 中上传文件

    Edit 我想我已经弄清楚如何执行二进制数据部分 仔细检查代码 但我很确定我做对了 现在 当我尝试按照中所述完成上传时遇到新错误Vimeo API 文档 http vimeo com api docs upload streaming Ed
  • Java ConcurrentModificationException [重复]

    这个问题在这里已经有答案了 当删除倒数第二个元素时 没有 ConcurrentModificationException List
  • 如何使用云打印打印Android活动显示

    我正在尝试将 Google 云打印实现到应用程序中 遵循集成指南 https developers google com cloud print docs android 我试图通过打印 google com 来保持基本 单击我创建的打印按
  • 在服务器内部调用 Web 服务

    我有一个网络服务 getEmployee 当传递 id 时 它会获取单个员工的员工详细信息 同一服务器上的另一个 Web 服务 getEmployeeList 当传递一个部门时 它会获取整个员工列表 这将获取部门的 ID 然后调用 getE
  • 方法签名中带或不带synchronized关键字的方法具有相同的字节码

    对于以下 2 个类 获得相同的 Java 字节码 java版本 java 版本 1 8 0 181 Java TM SE 运行时环境 构建 1 8 0 181 b13 Java HotSpot TM 64 位服务器 VM 内部版本 25 1
  • java.io.EOFException:没有更多可用数据 - 预期结束标记 关闭开始标记

    我正在使用 xmpp 开发一个聊天应用程序 根据我们的要求 我们有三台服务器 Apache Tomcat 7 ejabbered 2 1 11 和 mysql 5 5 to run xmppbot on tomcat used below
  • Integer.parseInt 引发的 NumberFormatException

    嘿 我在学校上编码课 但老师没有很好地解释 所以我们必须在网上查找我所做的信息 但我无法找到代码中的错误 你能帮我吗 char end s do System out println Tipo de boleto char boleto c
  • Libgdx 和 Google 应用内购买结果

    我遵循了这些指示 https github com libgdx libgdx wiki Interfacing with platform specific code使用 ActionResolver 接口集成 Libgdx 和原生 An

随机推荐

  • 安装python遇到错误_安装Python时遇到如下问题,解决方案

    sudo apt get install python pip 正在读取软件包列表 完成 正在分析软件包的依赖关系树 正在读取状态信息 完成 python pip 已经是最新的版本了 升级了0 个软件包 新安装了 0 个软件包 要卸载 0
  • 入门-《Zbrush 自学宝典》适合基础学员【软件精通】

    入门 Zbrush 自学宝典 适合基础学员 软件精通 ZBrush自学宝典合集 包含了零基础萌新需要的软件基础 到必备的人体解刨知识 还有多达十几个的实战案例展示 可以说是一本名副其实的zbrush自学大全 入门 Zbrush 自学宝典 适
  • Unity检测点击到UI上

    using UnityEngine EventSystems using UnityEngine using System Collections Generic
  • vue跳转注册时Unhandled promise rejection undefined

    这里写自定义目录标题 vue跳转注册时Unhandled promise rejection undefined vue跳转注册时Unhandled promise rejection undefined 挂载路由导航守卫 router b
  • 刷脸支付会员积分卡券打造完整商业闭环

    刷脸支付成为新的支付趋势的原因 缓解对外部媒介的过渡依赖 移动支付的过程需要手机 而很多消费者会遇到手机没电或者忘记携带手机的情况 而刷脸支付不需要手机 仅通过人脸识别就可以完成付款 随着支付宝 微信 央行都推出了自己的刷脸支付设备 刷脸支
  • VS2017调节字体大小快捷键

    快捷键 Ctrl Shift gt 调大 Ctrl Shift lt 调小 Ctrl 鼠标滚动 向上调大 向下调小 笔记本触屏放大 就和在手机上放大图片一样 也可以去工具 gt 选项 gt 环境 gt 字体与颜色里面直接选择字号调整
  • 【廖雪峰python入门笔记】list_创建

    1 list 列表 list 1 是Python内置的一种数据类型 2 是一种有序的集合 3 可以随时添加和删除其中的元素 比如 列出班里所有同学的名字 就可以用一个list表示 Michael Bob Tracy list是数学意义上的有
  • GraalVM原生编译,Swing取色调色工具

    Graalvm 安装和静态编译 今天使用GraalVM把以前写的一个Swing小工具ColorCat转成原生应用 使用GraalVM转成原生应用后 可以脱离JVM CPU和内存的占用率是降低了的 性能是相对提升了不少 GraalVM编译步骤
  • SVN提交代码评审

    1 前言 在公司提交代码时 需要发给上级主管评审 如何让评审的主管能快速清晰的知道你的修改点是很重要也是很基础的要求 有的是用用脚本来产生差异文件的文件夹 但其实SVN本身就有命令列出当前修改和版本的差异点 2 命令 svn commit
  • 登录页面中记住密码操作的实现

    1 思路 访问 前端login jsp gt 后台 如果上次用户选择勾选记住密码 自动填充账号和密码 否则 不填 如何判断上次是否记住密码 第一次登录成功 去判断是否需要记住密码 如果需要记住密码 则往浏览器写cookie 否则 删除coo
  • Swift Property ‘self.xxx‘ not initialized at super.init call

    Swift重写父类的init方法时 报了如下错误 Property self xxx not initialized at super init call 大概意思是在调用父类init方法前 需要给属性赋值 报错代码 required in
  • QT 自定义widget 背景图片设置

    在自定义widget中 通过setSheetStyle 设置的背景图片没有效果 必须加一层QFrame 定义一个QFrame 设置QFrame的 sheetstyle 同时设置 widget setAutoFillBackground tr
  • 使用python-docx在文档中插入图片

    在文档中添加图片的方法如下 from docx import Document from docx shared import Inches Cm document Document document add heading 大标题 0 新
  • VUE 中let that = this 或者 var that = this 有什么意义

    在vue 开发过程中我们经常会发现 let that this 这样写有什么好处呢 首先要理解this 下面简述两点 1 this 永远指向一个对象 2 this 的指向取决于函数调用的位置 首先 this 指向的是一个对象 这个好理解 难
  • 回归分析结果表格怎么填_Excel回归分析结果的详细阐释

    内容来自用户 jasonboy95 利用Excel的数据分析进行回归 可以得到一系列的统计参量 下面以连续10年积雪深度和灌e69da5e887aa3231313335323631343130323136353331333433646365
  • 永年二中2021高考成绩查询,喜讯丨邯郸市永年二中今年高考成绩再创历史新高...

    很多朋友在关注 喜讯丨邯郸市永年二中今年高考成绩再创历史新高 下面带大家一起来阅读吧 近10年高考县状元 2004年全县理科状元 王延涛 南沿村镇西王庄村 七中 2005年全县文科状元 刘 鑫 广府镇西街 九中 全县理科状元 杨佩勋 东杨庄
  • 帮我写一下目标检测的代码

    您好 我可以为您提供以下基本的目标检测代码 import cv2 读取图像 img cv2 imread image jpg 创建一个检测器 detector cv2 CascadeClassifier detector xml 检测目标
  • springboot整合redis 使用缓存注解

    1 启动类标明 EnableCaching SpringBootApplication MapperScan com jx luckyDraw mapper EnableCaching public class LuckyDrawAppli
  • Java中 Long(long) 和Integer(int)之间的强制转换

    一 将long型转化为int型 这里的long型是基础类型 long a 10 int b int a 二 将Long型转换为int 型的 这里的Long型是包装类型 Long a 10 int b a intValue 三 将int型转化
  • Spring Boot + Disruptor

    首先了解一下Disrupto的背景 Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列 研发的初衷是解决内存队列的延迟问题 在性能测试中发现竟然与I O操作处于同样的数量级 基于 Disruptor 开发的系统单线程能支撑每