通过Thread Pool Executor类解析线程池执行任务的核心流程( 三 )

(3)CAS操作成功后 , 表示向线程池中成功添加了工作线程,此时,还没有线程去执行任务 。使用全局的独占锁mainLock来将新增的工作线程Worker对象安全的添加到workers中 。
总体逻辑就是:创建新的Worker对象 , 并获取Worker对象中的执行线程,如果线程不为空,则获取独占锁,获取锁成功后,再次检查线线程的状态,这是避免在获取独占锁之前其他线程修改了线程池的状态,或者关闭了线程池 。如果线程池关闭,则需要释放锁 。否则将新增加的线程添加到工作集合中,释放锁并启动线程执行任务 。将是否启动线程的标识设置为true 。最后,判断线程是否启动,如果没有启动,则调用addWorkerFailed(Worker)方法 。最终返回线程是否起送的标识 。
//跳出最外层for循环 , 说明通过CAS新增线程数量成功//此时创建新的工作线程boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//将执行的任务封装成workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//独占锁,保证操作workers时的同步final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//此处需要重新检查线程池状态//原因是在获得锁之前可能其他的线程改变了线程池的状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive())throw new IllegalThreadStateException();//向worker中添加新任务workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;//将是否添加了新任务的标识设置为trueworkerAdded = true;}} finally {//释放独占锁mainLock.unlock();}//添加新任成功,则启动线程执行任务if (workerAdded) {t.start();//将任务是否已经启动的标识设置为trueworkerStarted = true;}}} finally {//如果任务未启动或启动失败 , 则调用addWorkerFailed(Worker)方法if (! workerStarted)addWorkerFailed(w);}//返回是否启动任务的标识return workerStarted;addWorkerFailed(Worker)方法在addWorker(Runnable, boolean)方法中,如果添加工作线程失败或者工作线程启动失败时 , 则会调用addWorkerFailed(Worker)方法,下面我们就来看看addWorkerFailed(Worker)方法的实现,如下所示 。
private void addWorkerFailed(Worker w) {//获取独占锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//如果Worker任务不为空if (w != null)//将任务从workers集合中移除workers.remove(w);//通过CAS将任务数量减1decrementWorkerCount();tryTerminate();} finally {//释放锁mainLock.unlock();}}addWorkerFailed(Worker)方法的逻辑就比较简单了 , 获取独占锁,将任务从workers中移除 , 并且通过CAS将任务的数量减1,最后释放锁 。
拒绝策略我们在分析execute(Runnable)方法时,线程池会在适当的时候调用reject(Runnable)方法来执行相应的拒绝策略,我们看下reject(Runnable)方法的实现 , 如下所示 。
final void reject(Runnable command) {handler.rejectedExecution(command, this);}通过代码,我们发现调用的是handler的rejectedExecution方法 , handler又是个什么鬼,我们继续跟进代码,如下所示 。
private volatile RejectedExecutionHandler handler;再看看RejectedExecutionHandler是个啥类型,如下所示 。
package java.util.concurrent;public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}可以发现RejectedExecutionHandler是个接口,定义了一个rejectedExecution(Runnable, ThreadPoolExecutor)方法 。既然RejectedExecutionHandler是个接口,那我们就看看有哪些类实现了RejectedExecutionHandler接口 。

通过Thread Pool Executor类解析线程池执行任务的核心流程

文章插图
看到这里,我们发现RejectedExecutionHandler接口的实现类正是线程池默认提供的四种拒绝策略的实现类 。
至于reject(Runnable)方法中具体会执行哪个类的拒绝策略,是根据创建线程池时传递的参数决定的 。如果没有传递拒绝策略,则默认会执行AbortPolicy类的拒绝策略 。否则会执行传递的类的拒绝策略 。
在创建线程池时,除了能够传递JDK默认提供的拒绝策略外,还可以传递自定义的拒绝策略 。如果想使用自定义的拒绝策略,则只需要实现RejectedExecutionHandler接口,并重写rejectedExecution(Runnable, ThreadPoolExecutor)方法即可 。例如 , 下面的代码 。
public class CustomPolicy implements RejectedExecutionHandler {public CustomPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {System.out.println("使用调用者所在的线程来执行任务")r.run();}}}使用如下方式创建线程池 。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new CustomPolicy());

推荐阅读