xxl-job源码之执行器执行任务的核心流程
先来一张图大致看看
如果看的不清晰,可以前往地址:https://www.processon.com/view/link/604e0b860791291f25613424
密码: xiaowan
绿色那一块就是执行器的流程
看看启动后的控制台有什么
"nioEventLoopGroup-2-1"@6,081 in group "main": RUNNING
"nioEventLoopGroup-3-1"@6,231 in group "main": RUNNING
"nioEventLoopGroup-3-2"@6,424 in group "main": RUNNING
"xxl-job, executor ExecutorRegistryThread"@6,099 in group "main": SLEEPING
"xxl-job, executor JobLogFileCleanThread"@5,708 in group "main": SLEEPING
"xxl-job, executor TriggerCallbackThread"@5,711 in group "main": WAIT
"xxl-rpc, EmbedServer bizThreadPool-632554680"@6,439 in group "main": WAIT
启动
对于不习惯SpringBoot,官方提供了2个demo执行器,一个是xxl-job-executor-sample-springboot,一个是xxl-job-executor-sample-frameless。在2个都能运行的情况下,我们就使用 xxl-job-executor-sample-frameless 吧,框架依赖的少,那么对于我们来说,需要屏蔽无用的内容就少。
这个目录就3个源码文件,肯定是猜测启动一个服务,然后将执行器的配置文件数据读入,最后与服务端进行通讯。
最后就锁定 FrameLessXxlJobConfig ,里面其他方法,稍微过一眼,核心就在 FrameLessXxlJobConfig#initXxlJobExecutor–>xxlJobExecutor.start(),而这个方法里面前面一堆set方法,肯定最后落到 start方法上
XxlJobSimpleExecutor#start
public void start() {
initJobHandlerMethodRepository(xxlJobBeanList);
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
XxlJobExecutor#start
public void start() throws Exception {
XxlJobFileAppender.initLogPath(logPath);
initAdminBizList(adminAddresses, accessToken);
JobLogFileCleanThread.getInstance().start(logRetentionDays);
TriggerCallbackThread.getInstance().start();
initEmbedServer(address, ip, port, appname, accessToken);
}
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
EmbedServer#start
可以看到这里启动了一个netty的服务,对于接收到的请求,最后都会交给 EmbedHttpServerHandler 这个处理。
EmbedHttpServerHandler 负责去序列化收到的消息内容,最后交给我们的 ExecutorBizImpl 处理。
如果我们收到一个run方法,那么最后就是走向 ExecutorBizImpl#run
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024))
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
startRegistry(appname, address);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
} finally {
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true);
thread.start();
}
ExecutorBizImpl#run
这里面拿到对应的 IJobHandler,交给我们的JobThread去进行处理
如果我们设置了超时时间,那么这里的 IJobHandler 又会再包一层thread,设置超时时间进行执行
源码分析
分布式作业调度(定时任务)系统xxl-job快速上手及高级功能简述
xxl-job源码之admin调度中心的线程们
xxl-job源码之执行器执行任务的核心流程
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)