springmvc源码学习(二十七)异步任务超时异常的执行流程

2023-11-14


前言

本文分析异步任务出现超时及异常的情况时的处理流程。


一、示例

设置超时时间为2s,但任务需要执行10s

    @ApiOperation(value = "test", notes = "test")
    @GetMapping(value = "/test", produces = {MediaType.APPLICATION_JSON_VALUE})
    public DeferredResult test() throws ValidDataException {
        Test test = new Test();
        test.setId(1);
        test.setName("test1");
        //构建DeferredResult,超时时间为2s
        DeferredResult<Object> result = new DeferredResult<>(2000L);
        Callable call = () -> {
            //这里休眠10是
            TimeUnit.SECONDS.sleep(10000);
            return test;
        };
        //将call作为DeferredResult的result
        result.setResult(call);
        //超时回调
        result.onTimeout(() -> {
            result.setResult("请求超时");
        });

        return result;
    }

二、源码分析

在AsyncContextImpl中执行异步任务的过程中,调用了updateTimeout( )来处理超时

(1)processAsyncTask( )

private synchronized void processAsyncTask() {
        if (!initialRequestDone) {
            return;
        }
        //处理超时
        updateTimeout();
        final Runnable task = asyncTaskQueue.poll();
        if (task != null) {
            processingAsyncTask = true;
            asyncExecutor().execute(new TaskDispatchRunnable(task));
        } else {
            processingAsyncTask = false;
        }
    }

2、updateTimeout( )

public void updateTimeout() {
        XnioExecutor.Key key = this.timeoutKey;
        if (key != null) {
            if (!key.remove()) {
                return;
            } else {
                this.timeoutKey = null;
            }
        }
        //设置了超时时间,并且异步任务还没有完成
        if (timeout > 0 && !complete) {
            this.timeoutKey = WorkerUtils.executeAfter(exchange.getIoThread(), timeoutTask, timeout, TimeUnit.MILLISECONDS);
        }
    }

3、executeAfter( )

public static XnioExecutor.Key executeAfter(XnioIoThread thread, Runnable task, long timeout, TimeUnit timeUnit) {
        try {
            return thread.executeAfter(task, timeout, timeUnit);
        } catch (RejectedExecutionException e) {
            if(thread.getWorker().isShutdown()) {
                UndertowLogger.ROOT_LOGGER.debugf(e, "Failed to schedule task %s as worker is shutting down", task);
                //we just return a bogus key in this case
                return new XnioExecutor.Key() {
                    @Override
                    public boolean remove() {
                        return false;
                    }
                };
            } else {
                throw e;
            }
        }
    }
public Key executeAfter(final Runnable command, final long time, final TimeUnit unit) {
        final long millis = unit.toMillis(time);
        if ((state & SHUTDOWN) != 0) {
            throw log.threadExiting();
        }
        if (millis <= 0) {
            execute(command);
            return Key.IMMEDIATE;
        }
        final long deadline = (nanoTime() - START_TIME) + Math.min(millis, LONGEST_DELAY) * 1000000L;
        //创建TimeKey,传入截止时间,及超时任务TimeoutTask
        final TimeKey key = new TimeKey(deadline, command);
        synchronized (workLock) {
            final TreeSet<TimeKey> queue = delayWorkQueue;
            //加入到队列中
            queue.add(key);
            if (queue.iterator().next() == key) {
                // we're the next one up; poke the selector to update its delay time
                if (polling) { // flag is always false if we're the same thread
                    selector.wakeup();
                }
            }
            return key;
        }
    }

超时任务被加入到了delayWorkQueue队列中

4、WorkerThread

在WorkerThread的run方法中,会把delayWorkQueue的任务取出来,校验是否超时,超时后将运行TimeKey的Command即TimeoutTask

public void run() {
        final Selector selector = this.selector;
        try {
            log.tracef("Starting worker thread %s", this);
            final Object lock = workLock;
            //工作任务队列
            final Queue<Runnable> workQueue = selectorWorkQueue;
            //延时任务队列
            final TreeSet<TimeKey> delayQueue = delayWorkQueue;
            log.debugf("Started channel thread '%s', selector %s", currentThread().getName(), selector);
            Runnable task;
            Iterator<TimeKey> iterator;
            long delayTime = Long.MAX_VALUE;
            Set<SelectionKey> selectedKeys;
            SelectionKey[] keys = new SelectionKey[16];
            int oldState;
            int keyCount;
            for (;;) {
                // Run all tasks
                do {
                    synchronized (lock) {
                        task = workQueue.poll();
                        //首次task为null
                        if (task == null) {
                            iterator = delayQueue.iterator();
                            delayTime = Long.MAX_VALUE;
                            if (iterator.hasNext()) {
                                final long now = nanoTime();
                                do {
                                	//取出延时任务
                                    final TimeKey key = iterator.next();
                                    if (key.deadline <= (now - START_TIME)) {
                                   		//超时
                                   		//将超时任务加入工作队列
                                        workQueue.add(key.command);
                                        //移除延时任务
                                        iterator.remove();
                                    } else {
                                    	//没有超时,停止循环
                                        delayTime = key.deadline - (now - START_TIME);
                                        // the rest are in the future
                                        break;
                                    }
                                } while (iterator.hasNext());
                            }
                            //取出task,超时的时候这里是TimeoutTask
                            task = workQueue.poll();
                        }
                    }
                    // clear interrupt status
                    Thread.interrupted();
                    //执行task
                    safeRun(task);
                } while (task != null);
			......
			......
}

4、TimeoutTask

 private final class TimeoutTask implements Runnable {

        @Override
        public void run() {
            synchronized (AsyncContextImpl.this) {
            	//任务初始化或出现异常,并且还没有完成
                if (!dispatched && !complete) {
                	//新建一个处理超时的异步任务
                    addAsyncTask(new Runnable() {
                        @Override
                        public void run() {

                            final boolean setupRequired = SecurityActions.currentServletRequestContext() == null;
                            UndertowServletLogger.REQUEST_LOGGER.debug("Async request timed out");
                            servletRequestContext.getCurrentServletContext().invokeRunnable(servletRequestContext.getExchange(), new Runnable() {
                                @Override
                                public void run() {

                                    //now run request listeners
                                    setupRequestContext(setupRequired);
                                    try {
                                    	//调用监听器的onTimeout方法
                                        onAsyncTimeout();
                                        if (!dispatched) {
                                            if (!getResponse().isCommitted()) {
                                                //close the connection on timeout
                                                exchange.setPersistent(false);
                                                exchange.getResponseHeaders().put(Headers.CONNECTION, Headers.CLOSE.toString());
                                                Connectors.executeRootHandler(new HttpHandler() {
                                                    @Override
                                                    public void handleRequest(HttpServerExchange exchange) throws Exception {
                                                        //servlet
                                                        try {
                                                            if (servletResponse instanceof HttpServletResponse) {
                                                                ((HttpServletResponse) servletResponse).sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
                                                            } else {
                                                                servletRequestContext.getOriginalResponse().sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
                                                            }
                                                        } catch (IOException e) {
                                                            UndertowLogger.REQUEST_IO_LOGGER.ioException(e);
                                                        } catch (Throwable t) {
                                                            UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
                                                        }
                                                    }
                                                }, exchange);
                                            } else {
                                                //not much we can do, just break the connection
                                                IoUtils.safeClose(exchange.getConnection());
                                            }
                                            if (!dispatched) {
                                            	//调用完成
                                                complete();
                                            }
                                        }
                                    } finally {
                                        tearDownRequestContext(setupRequired);
                                    }
                                }
                            });
                        }
                    });
                }
            }
        }
    }

dispatched :表示任务是否分发过或处理完成过,在代码中设置值的地方

第一处:

private synchronized void doDispatch(final Runnable runnable) {
        if (dispatched) {
            throw UndertowServletMessages.MESSAGES.asyncRequestAlreadyDispatched();
        }
        //在异步任务创建之前,设置为true
        dispatched = true;
        final HttpServletRequestImpl request = servletRequestContext.getOriginalRequest();
        //创建异步任务
        addAsyncTask(new Runnable() {
            @Override
            public void run() {
                request.asyncRequestDispatched();
                runnable.run();
            }
        });
        if (timeoutKey != null) {
            timeoutKey.remove();
        }
    }

第二处:

//异步任务结束,处理结果
 public void handleCompletedBeforeInitialRequestDone() {
        assert completedBeforeInitialRequestDone;
        completeInternal(true);
        //设置为true
        dispatched = true;
    }

第三处:

 //出现error的时候,回调
 public void handleError(final Throwable error) {
 		//设置为false
        dispatched = false; //we reset the dispatched state
        onAsyncError(error);
        ...
        }

complete:表示任务是否完成,是在complate方法中设置的

@Override
    public synchronized void complete() {
        if (complete) {
            UndertowLogger.REQUEST_LOGGER.trace("Ignoring call to AsyncContext.complete() as it has already been called");
            return;
        }
        //设置为true
        complete = true;
        if (timeoutKey != null) {
            timeoutKey.remove();
            timeoutKey = null;
        }
        if (!dispatched) {
            completeInternal(false);
        } else {
            onAsyncComplete();
        }
        if (previousAsyncContext != null) {
            previousAsyncContext.complete();
        }
    }

这里的dispatched、complete、initialRequestDone、processingAsyncTask等状态都是用boolean值,让人难以理解,为什么不参考FutureTask的状态值呢?

	private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

5、onAsyncTimeout( )

 private void onAsyncTimeout() {
        for (final BoundAsyncListener listener : asyncListeners) {
        	//创建异步事件AsyncEvent 
            AsyncEvent event = new AsyncEvent(this, listener.servletRequest, listener.servletResponse);
            try {
            	//调用监听器的onTimeout( )
                listener.asyncListener.onTimeout(event);
            } catch (IOException e) {
                UndertowServletLogger.REQUEST_LOGGER.ioExceptionDispatchingAsyncEvent(e);
            } catch (Throwable t) {
                UndertowServletLogger.REQUEST_LOGGER.failureDispatchingAsyncEvent(t);
            }
        }
    }

其中这个asyncListener是在StandardServletAsyncWebRequest调用startAsync( )方法的时候创建的

@Override
	public void startAsync() {
		Assert.state(getRequest().isAsyncSupported(),
				"Async support must be enabled on a servlet and for all filters involved " +
				"in async request processing. This is done in Java code using the Servlet API " +
				"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
				"filter declarations in web.xml.");
		Assert.state(!isAsyncComplete(), "Async processing has already completed");

		if (isAsyncStarted()) {
			return;
		}
		this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
		//创建Listener
		this.asyncContext.addListener(this);
		if (this.timeout != null) {
			this.asyncContext.setTimeout(this.timeout);
		}
	}

6、onTimeout( )

StandardServletAsyncWebRequest.java

public void onTimeout(AsyncEvent event) throws IOException {
		//掉用timeoutHandlers
		this.timeoutHandlers.forEach(Runnable::run);
	}

在WebAsyncManager的startCallableProcessing方法中初始化了timeoutHandler,也可以自定义传入timeoutHandler

this.asyncWebRequest.addTimeoutHandler(() -> {
			if (logger.isDebugEnabled()) {
				logger.debug("Async request timeout for " + formatRequestUri());
			}
			//调用拦截器链
			Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
			if (result != CallableProcessingInterceptor.RESULT_NONE) {
				//处理结果
				setConcurrentResultAndDispatch(result);
			}
		});

其中的一个拦截器会创建异常,result被赋值AsyncRequestTimeoutException,最后在处理结果中会处理超时异常的情况

public class TimeoutCallableProcessingInterceptor implements CallableProcessingInterceptor {

	@Override
	public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception {
		return new AsyncRequestTimeoutException();
	}

}

总结

本文简单分析了异步任务出现超时异常的情况时,异步任务的处理流程。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

springmvc源码学习(二十七)异步任务超时异常的执行流程 的相关文章

随机推荐

  • 基于java springboot vue仓库管理系统源码(毕设)

    开发环境及工具 大于Jdk1 8 大于mysql5 5 idea eclipse vscode webstorm 技术说明 Springboot mybatis vue elementui 代码注释齐全 没有多余代码 适合学习 毕设 二次开
  • 优雅地测试Exception:@Rule

    使用 Rule测试Exception 导入库 import org junit Rule import org junit rules ExpectedException 声明错误期望对象 Rule public ExpectedExcep
  • javaweb项目总结思路

    JAVAweb项目思路指南 本次项目所用技术 第一章 需求 系统用户模块 员工管理模块 系统权限功能 第二章 数据库设计 创建数据库 用户表 员工表 第三章 架构选择 第四章 搭建系统开发 搭建系统开发环境 编写实体类 编写user实体类
  • 类和对象【默认成员函数】

    全文目录 类的6个默认成员函数 构造函数 特性 析构函数 特性 拷贝构造函数 特性 赋值运算符重载 运算符重载 赋值运算符重载 前置 和 后置 const 成员 取地址及const取地址操作符重载 类的6个默认成员函数 每个类不管有没有内容
  • Java进阶01:Hibernate教程

    1 什么是Hibernate Hibernate是一个开放源码的ORM Object Relational Mapping 对象关系映射 框架 它对JDBC进行了轻量级的封装 使得Java开发人员可以使用面向对象的编程思想来操作数据库 2
  • FISCO BCOS 和 WeBASE-Front 搭建联盟链

    一丶环境依赖 安装ubuntu依赖sudo apt install y openssl curl 安装centos依赖sudo yum install y openssl openssl devel 二丶创建操作目录cd mkdir p f
  • HTML案例设计(用户信息提交表单)

    最近学习了HTML语法知识 详见内容请戳此次HTML基础知识 遂做了此前端界面 比较简略 但是算对Web开发有了初步的认识 先放效果图出来 你的三连就是我创作的动力 HTML代码 本次知识点将会不定期补充 div align center
  • 智能学习

    智能学习 MATLAB实现CS BP多变量时间序列预测 布谷鸟搜索算法优化BP神经网络 目录 智能学习 MATLAB实现CS BP多变量时间序列预测 布谷鸟搜索算法优化BP神经网络 预测效果 基本介绍 程序设计 参考资料 预测效果 基本介绍
  • HTML静态页面获取url参数和UserAgent

    目录 前言 原因 解决 1 静态页面获取url的参数 2 取useragent的值 3 测试页面 前言 接技术支持小伙伴信 有用户反馈app在华为设备上下载不了 以为是服务器覆盖的范围不够或服务器挂了 直到另一个客服同事发来一个录屏 基本知
  • 自己写ArrayList后的心得

    源码分析 ArrayList应该是Java工具类中最简单的一个 它的内部实现是一个数组 在首次加入元素时 ArrayList会创建一个默认大小的数组 之后的添加 删除 查询操作都是对该数组进行操作 而我自己写的ArrayList则是和Lin
  • 记录错误:FileNotFoundError: [WinError 3] 系统找不到指定的路径。: ‘E:\\CV_Paper_fuxian\\lesson\\B_VGG\\..\\Data\\tra

    撸代码发现错误 FileNotFoundError WinError 3 系统找不到指定的路径 E CV Paper fuxian lesson B VGG Data train 经过检查发现 是系统内文件夹名称设置错误 如果出现这样的错误
  • new与malloc

    1 属性 new delete是c 运算符 关键字 需要编译器支持 malloc free是库函数 需要头文件支持 2 参数 使用new操作符申请内存分配时无需指定内存块的大小 编译器会根据类型信息自行计算 而malloc需要显示的指出所需
  • java/php/net/python家庭财务管理系统设计

    本系统带文档lw万字以上 答辩PPT 查重 如果这个题目不合适 可以去我上传的资源里面找题目 找不到的话 评论留下题目 或者站内私信我 有时间看到机会给您发 管理员用例图 系统中的核心家庭是系统管理员 管理员登录后 通过管理员菜单来管理后台
  • 03. HTTP协议

    目录 HTTP协议 基本概念 请求 响应 请求头中最常见的 些重要内容 爬虫需要 响应头中 些重要的内容 请求方式 总结 HTTP协议 基本概念 协议 就是两个计算机之间为了能够流畅的进行沟通而设置的 个君子协定 常见的协议有TCP IP
  • firefly的使用

    https github com 9miao Firefly gitpython setup py install firefly admin py createproject myproject 就可以创建一个新的工程了 转载于 http
  • mac os 安装metasploit v5.0.23(msf)

    安装metasploit git clone https github com rapid7 metasploit framework git cd metasploit framework msfconsole 执行上面的命令时 报如下错
  • 台式机常见问题汇总

    1 第一步 必须安装硬盘 硬盘安装在中间 否则安装电源后 硬盘不好安装了 2 第二步 检查台式机的数据线 应该是给足的 3 开机启动后 电脑吱吱响 后来找到原因 硬盘四个固定角没有固定好 所以转起来震动噪音 4 硬盘安装时 用的螺丝是接触面
  • 太空狼人杀(Among US)只能QuickChat 更改年龄限制达到可以Free To chat方法

    Among us 不能自由聊天的解决方法 对于年龄数据被上传到服务器的账号 可能不适用 1 进入 C Users 你的账户名 AppData LocalLow InnerSloth Among Us playerPrefs 如果看不到App
  • centos 安装alien

    出处 http linux4you in install netapp oncommand system manger on centos 1 在root权限下执行命令 sudo su 2 安装alien需要的依赖包 yum y insta
  • springmvc源码学习(二十七)异步任务超时异常的执行流程

    目录 前言 一 示例 二 源码分析 总结 前言 本文分析异步任务出现超时及异常的情况时的处理流程 一 示例 设置超时时间为2s 但任务需要执行10s ApiOperation value test notes test GetMapping