通过Thread Pool Executor类解析线程池执行任务的核心流程( 二 )


//任务队列已满,则新增worker线程,如果新增线程失败 , 则执行拒绝策略else if (!addWorker(command, false))reject(command);这里 , 我们将execute(Runnable)方法拆解,结合流程图来理解线程池中任务的执行流程就比较简单了 。可以这么说,execute(Runnable)方法的逻辑基本上就是一般线程池的执行逻辑,理解了execute(Runnable)方法,就基本理解了线程池的执行逻辑 。
注意:有关ScheduledThreadPoolExecutor类和ForkJoinPool类执行线程池的逻辑,在【高并发专题】系列文章中的后文中会详细说明,理解了这些类的执行逻辑,就基本全面掌握了线程池的执行流程 。
在分析execute(Runnable)方法的源码时,我们发现execute(Runnable)方法中多处调用了addWorker(Runnable, boolean)方法,接下来,我们就一起分析下addWorker(Runnable, boolean)方法的逻辑 。
addWorker(Runnable, boolean)方法总体上,addWorker(Runnable, boolean)方法可以分为三部分,第一部分是使用CAS安全的向线程池中添加工作线程;第二部分是创建新的工作线程;第三部分则是将任务通过安全的并发方式添加到workers中,并启动工作线程执行任务 。
接下来,我们看下addWorker(Runnable, boolean)方法的源码 , 如下所示 。
private boolean addWorker(Runnable firstTask, boolean core) {//标记重试的标识retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查队列是否在某些特定的条件下为空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;//下面循环的主要作用为通过CAS方式增加线程的个数for (;;) {//获取线程池中的线程数量int wc = workerCountOf(c);//如果线程池中的线程数量超出限制 , 直接返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//通过CAS方式向线程池新增线程数量if (compareAndIncrementWorkerCount(c))//通过CAS方式保证只有一个线程执行成功 , 跳出最外层循环break retry;//重新获取ctl的值c = ctl.get();//如果CAS操作失败了,则需要在内循环中重新尝试通过CAS新增线程数量if (runStateOf(c) != rs)continue retry;}}//跳出最外层for循环 , 说明通过CAS新增线程数量成功//此时创建新的工作线程boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//将执行的任务封装成workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//独占锁,保证操作workers时的同步final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//此处需要重新检查线程池状态//原因是在获得锁之前可能其他的线程改变了线程池的状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive())throw new IllegalThreadStateException();//向worker中添加新任务workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;//将是否添加了新任务的标识设置为trueworkerAdded = true;}} finally {//释放独占锁mainLock.unlock();}//添加新任成功,则启动线程执行任务if (workerAdded) {t.start();//将任务是否已经启动的标识设置为trueworkerStarted = true;}}} finally {//如果任务未启动或启动失败,则调用addWorkerFailed(Worker)方法if (! workerStarted)addWorkerFailed(w);}//返回是否启动任务的标识return workerStarted;}乍一看,addWorker(Runnable, boolean)方法还蛮长的,这里 , 我们还是将addWorker(Runnable, boolean)方法进行拆解 。
(1)检查任务队列是否在某些特定的条件下为空,代码如下所示 。
// 检查队列是否在某些特定的条件下为空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;(2)在通过步骤(1)的校验后,则进入内层for循环,在内层for循环中通过CAS来增加线程池中的线程数量,如果CAS操作成功,则直接退出双重for循环 。如果CAS操作失败,则查看当前线程池的状态是否发生了变化 , 如果线程池的状态发生了变化,则通过continue关键字重新通过外层for循环校验任务队列 , 检验通过再次执行内层for循环的CAS操作 。如果线程池的状态没有发生变化,此时上一次CAS操作失败了,则继续尝试CAS操作 。代码如下所示 。
for (;;) {//获取线程池中的线程数量int wc = workerCountOf(c);//如果线程池中的线程数量超出限制,直接返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//通过CAS方式向线程池新增线程数量if (compareAndIncrementWorkerCount(c))//通过CAS方式保证只有一个线程执行成功,跳出最外层循环break retry;//重新获取ctl的值c = ctl.get();//如果CAS操作失败了,则需要在内循环中重新尝试通过CAS新增线程数量if (runStateOf(c) != rs)continue retry;}

推荐阅读