基于时间轮片方式处理超时任务

2023-10-30


作者 | 酱了里个酱 
来源 | 掘金

https://juejin.im/post/5e733e4f51882549417fe9aa

背景

最近收到小伙伴的一个吐槽:“项目里的某个函数是同步阻塞的,无法确定其运行时间,某些情况下,可能出现长时间阻塞导致应用无法响应”。为了解决这个问题,他尝试过用子线程+定时器的方式去异步处理,如果超时,则重新调用,但该函数会被频繁调用,意味着每次调用都要创建一个定时器。听到这个场景后,下意识想起之前看到的一篇文章:时间轮片(Timing Wheel)实现心跳机制。该文章主要描述了使用时间轮片的方式去处理TCP心跳连接,从而避免每个连接都要开启一个计时器。明确了时间轮片方式的优势后,便尝试着手实现一个通用的基于时间轮片方式处理超时任务的框架。

时间轮

简单来说,时间轮就是一个循环列表,每个列表中包含一个称为槽的结构,这个结构通常也可以是一个列表,且每隔一定时间就会将指针向前移动。

iOS 时间轮实现方案

可以使用一个嵌套数组的形式来定义时间轮结构,并用定时器去定时遍历列表中的元素。

class TimeWheel {
    private var capacity: Int
    private var interval: TimeInterval
    private var timeWheel: [[Any]]
    var index: Int
    private var timer: Timer?
    weak var delegate: TimeWheelDelegate?
}

初始化时,我们需要建立N个空槽,用于存取数据

init(_ capacity: Int, _ interval: TimeInterval) {
    self.capacity = capacity
    self.interval = interval
    self.index = 0
    timeWheel = []
    for _ in 0 ..< capacity { //先填充空数组,创建若干个“空槽”
        self.timeWheel.append([])
    }
}

添加任务时,如未启动定时器,则启动定时器,并把元素添加到当前槽位中

func addObject(_ task: Any) {
    if timer == nil {
        timer = Timer.scheduledTimer(timeInterval: 1.0, target: self, selector: #selector(detectTimeoutItem(_:)), userInfo: nil, repeats: true)
        RunLoop.current.add(timer!, forMode: .common)
    }
    
    if index < timeWheel.count {
        var arr = timeWheel[index]
        arr.append(task)
        timeWheel[index] = arr
    }
}

定时检查,先将位置移动到下一位,然后将对应槽位的元素传递给外部,最后清除该槽位的元素

@objc
private func detectTimeoutItem(_ timer: Timer) {
    moveToNextTimeSlot()
    delegate?.timeoutItems(self.currentObjects(), self)
    removeExpiredObjects()
}

完整代码

protocol TimeWheelDelegate : class {
    func timeoutItems(_ items: [Any]?, _ timeWheel: TimeWheel)
}

class TimeWheel {
    private var capacity: Int
    private var interval: TimeInterval
    private var timeWheel: [[Any]]
    var index: Int
    private var timer: Timer?
    weak var delegate: TimeWheelDelegate?
    
    init(_ capacity: Int, _ interval: TimeInterval) {
        self.capacity = capacity
        self.interval = interval
        self.index = 0
        timeWheel = []
        for _ in 0 ..< capacity { //先填充空数组,创建若干个“空槽”
            self.timeWheel.append([])
        }
    }
    
    func addObject(_ task: Any) {
        if timer == nil {
            timer = Timer.scheduledTimer(timeInterval: 1.0, target: self, selector: #selector(detectTimeoutItem(_:)), userInfo: nil, repeats: true)
            RunLoop.current.add(timer!, forMode: .common)
        }
        
        if index < timeWheel.count {
            var arr = timeWheel[index]
            arr.append(task)
            timeWheel[index] = arr
        }
    }
    
    func currentObjects() -> [Any]? {
        if index < timeWheel.count {
            return timeWheel[index]
        }
        return nil
    }
    
    func cleanup() {
        self.timeWheel.removeAll()
        if timer != nil {
            timer?.invalidate()
            timer = nil
        }
    }
    
    private func removeExpiredObjects() {
        if index < timeWheel.count {
            var arr = timeWheel[index]
            arr.removeAll()
        }
    }
    
    private func moveToNextTimeSlot() {
        index = (index + 1) % timeWheel.count
    }
    
    @objc
    private func detectTimeoutItem(_ timer: Timer) {
        moveToNextTimeSlot()
        delegate?.timeoutItems(self.currentObjects(), self)
        removeExpiredObjects()
    }
}

任务管理

定义一个任务协议,用于定义其通用行为

protocol Task  {
    associatedtype T
    func taskKey() -> String //任务对应的唯一key,用于区分任务
    func doTask() -> T // 实现任务行为
    var completion: ((_ result: T?, _ timeout: Bool) -> Void)? {get set} //返回的异步结果
}

定义一个具体的Task

class NetworkTask: Task {
    typealias T = String
    var completion: ((String?, Bool) -> Void)?
    
    var hostName: String
    
    init(_ name: String) {
        hostName = name
    }
    
    func taskKey() -> String {
        return hostName
    }
    
    func doTask() -> String {
        Thread.sleep(forTimeInterval: Double.random(in: 1...20)) //模拟耗时任务
        return "\(hostName)'s result"
    }
    
}

任务管理

为了保证任务的独立允许,需要创建一个并发队列,且使用字典存储已添加的任务,以便确认任务是按时完成回调的,还是超时导致回调的。

class TaskManager<T: Task> : TimeWheelDelegate {
    
    private var timeWheel: TimeWheel?
    private var timeInterval: TimeInterval
    private var timeoutSeconds: Int
    private var queue: DispatchQueue
    private var callbackDict: Dictionary<String, T>
    
    init(_ timeout: Int, _ timeInterval: TimeInterval) {
        timeoutSeconds = timeout
        self.timeInterval = timeInterval
        queue = DispatchQueue(label: "com.task.queue", qos: .default, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)
        callbackDict = [:]
    }
}

添加任务:开启时间轮,且将任务提交到队列中

func appendTask(_ task: T, _ completion:@escaping (_ result: T.T?, _ timeout: Bool) -> (Void)) {
    
    if timeWheel == nil {
        timeWheel = TimeWheel(timeoutSeconds, timeInterval)
        timeWheel?.delegate = self
    }
    
    var task = task
    task.completion = completion
    self.callbackDict[task.taskKey()] = task
    self.timeWheel?.addObject(task) //将任务添加到对应的时间轮槽位中
    
    self.queue.async {
        let result = task.doTask()
        DispatchQueue.main.async { //保证数据的一致性
            let key = task.taskKey()
            if let item = self.callbackDict[key] {
                item.completion?(result, false) //返回按时完成任务的结果
                self.callbackDict.removeValue(forKey: key)
            }
        }
    }
}

处理超时任务:通过定时轮返回的过期数据,将任务超时回调返回。

 func timeoutItems(_ items: [Any]?, _ timeWheel: TimeWheel) {
    if let callbacks = items {
        for callback in callbacks {
            if let item = callback as? T, let task = self.callbackDict[item.taskKey()] {
                task.completion?(nil, true)
                self.callbackDict.removeValue(forKey: task.taskKey())
            }
        }
    }
}

完整代码

class TaskManager<T: Task> : TimeWheelDelegate {
    
    private var timeWheel: TimeWheel?
    private var timeInterval: TimeInterval
    private var timeoutSeconds: Int
    private var queue: DispatchQueue
    private var callbackDict: Dictionary<String, T>
    
    init(_ timeout: Int, _ timeInterval: TimeInterval) {
        timeoutSeconds = timeout
        self.timeInterval = timeInterval
        queue = DispatchQueue(label: "com.task.queue", qos: .default, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)
        callbackDict = [:]
    }
    
    func appendTask(_ task: T, _ completion:@escaping (_ result: T.T?, _ timeout: Bool) -> (Void)) {
        
        if timeWheel == nil {
            timeWheel = TimeWheel(timeoutSeconds, timeInterval)
            timeWheel?.delegate = self
        }
        
        var task = task
        task.completion = completion
        self.callbackDict[task.taskKey()] = task
        self.timeWheel?.addObject(task) //将任务添加到对应的时间轮槽位中
        
        self.queue.async {
            let result = task.doTask()
            DispatchQueue.main.async { //保证数据的一致性
                let key = task.taskKey()
                if let item = self.callbackDict[key] {
                    item.completion?(result, false) //返回按时完成任务的结果
                    self.callbackDict.removeValue(forKey: key)
                }
            }
        }
    }
    
    func timeoutItems(_ items: [Any]?, _ timeWheel: TimeWheel) {
        if let callbacks = items {
            for callback in callbacks {
                if let item = callback as? T, let task = self.callbackDict[item.taskKey()] {
                    task.completion?(nil, true)
                    self.callbackDict.removeValue(forKey: task.taskKey())
                }
            }
        }
    }
}

使用示例

定义任务超时时间为10s,并每1s进行检查一次。这里加了一个随机时间添加任务,以便测试到时间轮不同轮的情况。

let manager = TaskManager<NetworkTask>(10, 1)
for i in 0 ..< 5 {
    let task = NetworkTask("host-\(i)")
    DispatchQueue.main.asyncAfter(deadline: .now()+Double.random(in: 0...20.0)) {
        print("task:\(task.hostName) do task in \(Date.init())")
        manager.appendTask(task) { (result, timeout) -> (Void) in
            print("task:\(task.hostName), result:\(result ?? "null"), timeout:\(timeout), time:\(Date.init())")
        }
    }
}

结果数据:
task:host-4 do task in 2020-03-19 11:56:46 +0000
task:host-1 do task in 2020-03-19 11:56:47 +0000
task:host-2 do task in 2020-03-19 11:56:56 +0000
task:host-4, result:null, timeout:true, time:2020-03-19 11:56:56 +0000
task:host-1, result:null, timeout:true, time:2020-03-19 11:56:56 +0000
task:host-2, result:host-2's result, timeout:false, time:2020-03-19 11:57:01 +0000
task:host-3 do task in 2020-03-19 11:57:03 +0000
task:host-0 do task in 2020-03-19 11:57:03 +0000
task:host-0, result:host-0's result, timeout:false, time:2020-03-19 11:57:09 +0000
task:host-3, result:null, timeout:true, time:2020-03-19 11:57:12 +0000

根据结果,可以看到,若任务10s内能按时完成,则返回对应的任务结果,否则返回timeout为true,并返回一个空结果。

总结

通过这次的事例,实现一个基于时间轮方式来处理超时任务的简单框架,从一定程度上避免了性能的消耗。

近期精彩内容推荐:  

 妹子 rm -rf 把公司整个数据库删没了...

 当互联网码农遇见国企老同学

 推荐33个IDEA最牛配置,写代码太爽了

 Python中zip()函数的解释和可视化

在看点这里好文分享给更多人↓↓

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于时间轮片方式处理超时任务 的相关文章

  • 一、深入理解redis之需要掌握的知识点

    导读 从本章开始我们将要深入讲解redis 讲解的内容包括 redis的基础数据类型及应用场景 数据存储 持久化方式 RDB AOF redis集群进化过程 redis中AKF问题解决方案 redis中CAP问题解决方案 redis的分布式
  • Android RxJava生命周期管理解决方案整理

    一 为什么要做生命周期管理 Observable create new ObservableOnSubscribe
  • ubuntu下rime输入法的安装配置

    一 安装 ibus rime安装 1 1 安装ibus输入法框架 sudo apt get install ibus ibus clutter ibus gtk ibus gtk3 ibus qt4 1 2 安装rime sudo apt
  • 计算绕原点旋转某角度后的点的坐标

    问题 A点 x y 按顺时针旋转 theta 角度后点的坐标为A1点 x1 y1 求x1 y1坐标用 x y 和 theta 来表示 方法一 设 OA 向量和x轴的角度为 alpha 那么顺时针转过 theta后 OA1 向量和x轴的角度为
  • loadrunner压力fullgc

    loadrunner 打压力的时候 我们会根据jconsole来监控被压系统的内存 cpu fullgc等 当出现内存溢出或者fullgc 见下图 既内存溢出 又有fullgc 要先查看jvm的配置 好多技术都使用的默认配置 那么就要调试j
  • 编写软著的基本常识

    1 背景介绍 1 1 软著的概念 软著即软件著作权 市面上任何一个APP在编写完毕之后都会向国家版权中心去申请著作权对自己的软件进行知识产权保护 对于一个系统来说 可以对系统的功能模块进行拆分 分别进行软著的申请 也就相当于一个系统可以申请
  • 关于阿里云ECS服务器提示高危漏洞问题的处理

    购买阿里云服务器后 一段时间 会发钱提示高危漏洞 而且很多 有高危 中危 低危 严重等几个等级 当点击 一键修复或者生成修复命令时 开始让买买买了就 关于这个问题自己手动修复的话 采用软件升级一般都可以解决 除了提示带kernel的高危漏洞
  • 送书

    最好的挣钱方式是钱生钱 怎样钱生钱呢 钱生钱可以通过投资 例如买股票 基金等方式 有人可能说买股票基金发财 我没这样的命和运气 买股票基金靠的不只有命运和运气 更多靠的是长期的经验和对股票基金数据的分析 今天我们使用scrapy框架来js逆
  • C#和Java,究竟选哪个方向?我只说事实,你自己分析……

    优质资源分享 学习路线指引 点击解锁 知识定位 人群定位 Python实战微信订餐小程序 进阶级 本课程是python flask 微信小程序的完美结合 从项目搭建到腾讯云部署上线 打造一个全栈订餐系统 Python量化交易实战 入门级 手
  • netty源码分析(十四)Netty初始化流程总结及Channel与ChannelHandlerContext作用域分析

    我们回到ServerBootstrap的init方法 之前介绍过Attribute的设置 那么Attribute的具体设置是怎样的呢 void init Channel channel throws Exception final Map
  • MySQL深度探险(二)-- MySQL系统架构详解

    一 逻辑模块组成 总的来说 MySQL 可以看成是二层架构 第一层我们通常叫做SQL Layer 在 MySQL 数 据库系统处理底层数据之前的所有工作都是在这一层完成的 包括权限判断 sql 解析 执 行计划优化 query cache
  • 联想笔记本安装win10 ubuntu配置步骤

    一 准备ubuntu分区 在win10下用磁盘管理工具 磁盘管理工具可以右键我的电脑 gt 属性 gt 磁盘管理工具打开 选中安装ubuntu的目标硬盘 右键 gt 压缩卷 会自动计算出可以分出的空间大小 根据自己需求进行压缩 压缩后会生成
  • Python中让代码 BUG 变得酷炫的利器

    当我们写的一个脚本或程序发生各种不可预知的异常时 如果我们没有进行捕获处理的时候 通常都会致使程序崩溃退出 并且会在终端打印出一堆 密密麻麻 的 traceback 堆栈信息来告诉我们 是哪个地方出了问题 就像这样子 天呐 密集恐惧症要犯了
  • 剑指offer-16 链表反转

    法一 package Leetcode ListNode Author YCKJ3803 Date 2021 3 2 22 39 Description 反转链表 最经典的题 yyds public class ReverseListNod
  • API安全

    1 API的简介 API代表应用程序编程接口 它由一组允许软件组件进行通信的定义和协议组成 作为软件系统之间的中介 API使软件应用程序或服务能够共享数据和功能 但是API不仅仅提供连接基础 它还管理软件应用程序如何被允许进行通信和交互 A
  • 带分数 -- 蓝桥杯

    带分数 蓝桥杯 题目描述 100 可以表示为带分数的形式 100 3 69258714 还可以表示为 100 82 3546197 注意特征 带分数中 数字 1 9 分别出现且只出现一次 不包含 0 类似这样的带分数 100 有 11 种表
  • 虚拟机不能上网以及无法ping通百度的解决方案

    虚拟机无法上网 看了许多博客也没有解决问题 最后自己钻研文档解决了 此处分享一下 1 点击此处编辑 2 选择虚拟网络编辑器 3 点击更改设置 4 选择v8 并将使用本地DHCP选项勾选掉 注 此处为nat模式 5 手动输入子网IP 子网掩码
  • 微信公众号支付java前后端分离开发

    微信公众号支付java前后端分离开发 微信公众号支付java前后端分离开发 我们开发的是基于河北银行的支付 支付宝微信都做了 这里就介绍一下微信公众号支付 这个公众号支付需要配置的东西太多了 官方文档写的跟s一样 看不懂 一点一点自摸索的
  • C++ Array size()实例讲解

    描述 C 函数std array size 用于获取数组中存在的元素数 声明 以下是 std array size 函数形式 std array 标头的声明 constexpr size type size noexcept 参数 空 返回
  • jxl读取excel封装成List、Map

    一 封装成List 数据格式为List

随机推荐