问题:多线程任务,一个任务执行错误,其他任务应该同步取消
1.主线程监视
主线程监视任务线程,当一个任务线程出现执行错误时,直接调用System.exit(0)结束程序。
任务线程代码:
package com.example.thread.cancel.listener;
public class TaskThread extends Thread {
/**
* 线程名称
*/
public String name;
/**
* 模拟线程执行时长
*/
public int mockExecMillisecond;
/**
* 模拟线程执行结果
*/
public boolean mockExecStatus;
/**
* 线程实际执行结果
*/
public boolean defaultExecStatus = Boolean.TRUE;
/**
* 定义TaskThread构造方法,用于初始化TaskThread对象
* @param name 线程名称
* @param mockExecMillisecond 模拟线程执行时长
* @param mockExecStatus 模拟线程执行结果
*/
public TaskThread(String name, int mockExecMillisecond, boolean mockExecStatus) {
this.name = name;
this.mockExecMillisecond = mockExecMillisecond;
this.mockExecStatus = mockExecStatus;
}
@Override
public void run() {
try {
System.out.printf("%s开始执行\n", this.name);
Thread.sleep(mockExecMillisecond);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s执行结束,执行时间%sms,执行结果%s\n",
this.name, this.mockExecMillisecond, this.mockExecStatus);
defaultExecStatus = mockExecStatus;
}
}
主线程代码:
package com.example.thread.cancel.listener;
import java.util.ArrayList;
import java.util.List;
public class TaskListener {
public static void main(String[] args) {
List<TaskThread> taskThreads = new ArrayList<>();
TaskThread taskThreadOne = new TaskThread("taskOne",5000,true);
TaskThread taskThreadTwo = new TaskThread("taskTwo",3000,true);
TaskThread taskThreadThree = new TaskThread("taskThree",1000,false);
taskThreads.add(taskThreadOne);
taskThreads.add(taskThreadTwo);
taskThreads.add(taskThreadThree);
//线程启动
for (TaskThread taskThread : taskThreads) {
taskThread.start();
}
//模拟监听,通过任务线程的状态判断线程执行结果,并进行相应处理
for (;;){
taskThreads.stream().forEach(taskThread ->{
if (!taskThread.defaultExecStatus){
System.out.println("System.exit(0)");
System.exit(0);
}
});
}
}
}
2.Boss&Worker
创建Boss线程和Worker线程,在Boss线程中定义end方法用于接收Worker线程的通知。创建Worker线程时传入Boss对象,Worker线程执行过程中通过调用Boss对象的end方法进行通知。Boss线程接收到Worker线程通知后,根据Worker线程的执行结果进行相应处理,若Worker线程执行失败则调用System.exit(0)结束程序。
改进:
Worker线程中定义cancel方法,用于当前线程的任务取消处理,Boss线程接收到Work线程通知后,根据Worker线程的执行结果进行相应处理,若某一个Worker线程执行失败则调用所有Work对象各自的cancel方法取消任务。
任务取消:
线程只能在实际执行过程中进行取消,可以在cancel方法中定义canceling的标志位,在Worker线程的实际执行代码中加入cancel的相关处理(监听)。
Boss对象代码:
package com.example.thread.cancel.boss;
import java.util.List;
public class Boss {
private List<Worker> workers = null;
/**
* 传入任务线程,线程取消由Boss统一调配发起,不传入则直接调用System.exit(0)退出
* @param workers 任务线程集合
*/
public void setWorkers(List<Worker> workers) {
this.workers = workers;
}
/**
* 多线程任务出现异常,由Boss直接调用系统退出
* @param exceptionWorker 异常任务线程
*/
public void end(Worker exceptionWorker) {
//根据Boss创建时是否传入Worker对象列表workers,来决定是否使用Worker自身进行取消
if (workers != null && !workers.isEmpty()) {
workerCancel(exceptionWorker);
} else if (!exceptionWorker.mockExecStatus) {
System.out.println("System.exit(0)");
System.exit(0);
}
}
/**
* 多线程任务出现异常,由Worker自身进行取消
* @param exceptionWorker 异常任务线程
*/
public void workerCancel(Worker exceptionWorker) {
if (!exceptionWorker.mockExecStatus) {
for (Worker workerItem : workers) {
if (workerItem.mockExecStatus) {
workerItem.cancel(exceptionWorker);
}
}
}
}
}
Worker线程代码:
package com.example.thread.cancel.boss;
public class Worker extends Thread {
/**
* 统一调度对象
*/
public Boss boss;
/**
* 线程名称
*/
public String name;
/**
* 模拟线程执行时长
*/
public int mockExecMillisecond;
/**
* 模拟线程执行结果
*/
public boolean mockExecStatus;
/**
* 线程实际运行时长
*/
private int realExecMillisecond;
/**
* 定义Worker构造方法,用于初始化Worker对象
* @param boss 统一调度对象
* @param name 线程名称
* @param mockExecMillisecond 模拟线程执行时长
* @param mockExecStatus 模拟线程执行结果
*/
public Worker(Boss boss, String name, int mockExecMillisecond, boolean mockExecStatus) {
this.boss = boss;
this.name = name;
this.mockExecMillisecond = mockExecMillisecond;
this.mockExecStatus = mockExecStatus;
}
@Override
public void run() {
try {
System.out.printf("%s开始执行\n", this.name);
//模拟线程执行过程
int mockExecMillisecondUnit = 1000;
for (int i = 0; i < mockExecMillisecond / mockExecMillisecondUnit; i++) {
Thread.sleep(mockExecMillisecondUnit);
realExecMillisecond += mockExecMillisecondUnit;
//通过判断mockExecStatus标志位,模拟Worker取消操作
if (!mockExecStatus) {
System.out.printf("%s执行异常,执行时间%sms,执行结果%s\n",
this.name, this.realExecMillisecond, false);
boss.end(this);
return;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
//线程执行完毕
System.out.printf("%s执行结束,执行时间%sms,执行结果%s\n",
this.name, this.mockExecMillisecond, this.mockExecStatus);
if (!mockExecStatus) {
boss.end(this);
}
}
/**
* Worker线程定义取消方法,供Boss调用
* @param exceptionWorker 异常任务线程
*/
public void cancel(Worker exceptionWorker) {
//修改标志位,在Worker业务线程中处理取消操作
mockExecStatus = false;
System.out.printf("%s执行异常,%s取消中,已执行时间%sms\n",
exceptionWorker.name, this.name, this.realExecMillisecond);
}
}
主线程代码:
package com.example.thread.cancel.boss;
import java.util.ArrayList;
import java.util.List;
public class WorkerNotifyBoss {
public static void main(String[] args) {
Boss boss = new Boss();
List<Worker> workers = new ArrayList<>();
Worker workerOne = new Worker(boss,"taskOne",5000,true);
Worker workerTwo = new Worker(boss,"taskTwo",3000,true);
Worker workerThree = new Worker(boss,"taskThree",1000,false);
workers.add(workerOne);
workers.add(workerTwo);
workers.add(workerThree);
boss.setWorkers(workers);
//线程启动
for (Worker worker: workers) {
worker.start();
}
}
}
3.使用CompletableFuture
使用CompletableFuture线程池的supplyAsync方法异步处理多个任务,并使用thenAccept方法异步获取任务返回结果,并使用Lambda表达式定义异步任务返回后需要调用的callback方法,在callback方法中实现任务的取消和回滚操作。
任务执行过程需要根据CompletableFuture线程池异步返回的任务执行结果取消所有任务并执行rollback操作,rollback操作需要根据实际的业务场景实现,用于撤销之前已执行的操作。
任务对象代码:
package com.example.thread.cancel;
public class Task {
/**
* 线程名称
*/
public String name;
/**
* 模拟线程执行时长
*/
public int mockExecMillisecond;
/**
* 模拟线程执行结果
*/
public boolean mockExecStatus;
/**
* 线程实际运行时长
*/
private int realExecMillisecond;
/**
* 定义Task构造方法,用于初始化Task对象
* @param name 线程名称
* @param mockExecMillisecond 模拟线程执行时长
* @param mockExecStatus 模拟线程执行结果
*/
public Task(String name, int mockExecMillisecond, boolean mockExecStatus) {
this.name = name;
this.mockExecMillisecond = mockExecMillisecond;
this.mockExecStatus = mockExecStatus;
}
/**
* 模拟任务执行方法,返回执行结果
* @return boolean
*/
public boolean runTask() {
try {
System.out.printf("%s开始执行\n", this.name);
int mockExecMillisecondUnit = 1000;
for (int i = 0; i < mockExecMillisecond / mockExecMillisecondUnit; i++) {
Thread.sleep(mockExecMillisecondUnit);
realExecMillisecond += mockExecMillisecondUnit;
//通过判断mockExecStatus标志位,模拟Worker取消操作
if (!mockExecStatus) {
System.out.printf("%s执行异常,执行时间%sms,执行结果%s\n",
this.name, this.realExecMillisecond, false);
return false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s执行结束,执行时间%sms,执行结果%s\n",
this.name, this.mockExecMillisecond, true);
return mockExecStatus;
}
/**
* 模拟任务回滚过程
* @param task 任务对象
*/
public void rollback(Task task) {
//修改标志位,在Task任务中处理取消操作
mockExecStatus = false;
System.out.printf("%s执行异常,%s取消中,已执行时间%sms\n",
task.name, this.name, this.realExecMillisecond);
}
}
主线程代码:
package com.example.thread.cancel;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class TaskCompletableFuture {
public static List<Task> tasks = new ArrayList<>();
public static void main(String[] args) {
Task taskOne = new Task("taskOne", 5000, true);
Task taskTwo = new Task("taskTwo", 3000, true);
Task taskThree = new Task("taskThree", 1000, false);
tasks.add(taskOne);
tasks.add(taskTwo);
tasks.add(taskThree);
//线程交由CompletableFuture异步执行
for (Task task : tasks) {
CompletableFuture.supplyAsync(() -> task.runTask())
.thenAccept((result) -> callback(result, task));
}
//程序暂停,等待CompletableFuture异步执行
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 根据CompletableFuture任务返回结果,取消所有任务
* @param result 任务执行返回结果
* @param task 任务对象
*/
public static void callback(boolean result, Task task) {
if (!result) {
for (Task taskItem : tasks) {
if (taskItem.mockExecStatus){
taskItem.rollback(task);
}
}
}
}
}