自己动手实现线程池 jdk线程池ThreadPoolExecutor工作原理解析(一)( 五 )

可以看到,execute方法源码中对于任务处理的逻辑很清晰 , 也能与ThreadPoolExecutor运行时工作流程中所介绍的流程所匹配 。
addWorker方法(创建新的工作线程)在execute方法中当需要创建核心线程或普通线程时,便需要通过addWorker方法尝试创建一个新的工作线程 。
/*** 向线程池中加入worker* */private boolean addWorker(Runnable firstTask, boolean core) {// retry标识外层循环retry:for (;;) {int currentCtl = ctl.get();// 用于cas更新workerCount的内层循环(注意这里面与jdk的写法不同,改写成了逻辑一致但更可读的形式)for (;;) {// 判断当前worker数量是否超过了限制int workerCount = workerCountOf(currentCtl);if (workerCount >= CAPACITY) {// 当前worker数量超过了设计上允许的最大限制return false;}if (core) {// 创建的是核心线程,判断当前线程数是否已经超过了指定的核心线程数if (workerCount >= this.corePoolSize) {// 超过了核心线程数,创建核心worker线程失败return false;}} else {// 创建的是非核心线程,判断当前线程数是否已经超过了指定的最大线程数if (workerCount >= this.maximumPoolSize) {// 超过了最大线程数,创建非核心worker线程失败return false;}}// cas更新workerCount的值boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);if (casSuccess) {// cas成功,跳出外层循环break retry;}// compareAndIncrementWorkerCount方法cas争抢失败 , 重新执行内层循环}}boolean workerStarted = false;MyWorker newWorker = null;try {// 创建一个新的workernewWorker = new MyWorker(firstTask);final Thread myWorkerThread = newWorker.thread;if (myWorkerThread != null) {// MyWorker初始化时内部线程创建成功// 加锁,防止并发更新final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (myWorkerThread.isAlive()) {// 预检查线程的状态,刚初始化的worker线程必须是未唤醒的状态throw new IllegalThreadStateException();}// 加入worker集合this.workers.add(newWorker);int workerSize = workers.size();if (workerSize > largestPoolSize) {// 如果当前worker个数超过了之前记录的最大存活线程数 , 将其更新largestPoolSize = workerSize;}// 创建成功} finally {// 无论是否发生异常,都先将主控锁解锁mainLock.unlock();}// 加入成功,启动worker线程myWorkerThread.start();// 标识为worker线程启动成功 , 并作为返回值返回workerStarted = true;}}finally {if (!workerStarted) {addWorkerFailed(newWorker);}}return workerStarted;}addWorker可以分为两部分:判断当前是否满足创建新工作线程的条件、创建并启动新的Worker工作线程 。
判断当前是否满足创建新工作线程的条件入口处开始的retry标识的for循环部分,便是用于判断是否满足创建新工作线程的条件 。

  • 首先判断当前工作线程数量是否超过了理论的最大值CAPACITY(即2^29-1),超过了则不能创建,返回false , 不创建新工作线程
  • 根据boolean类型参数core判断是否创建核心工作线程,core=true则判断是否超过了corePoolSize的限制 , core=false则判断是否超过了maximumPoolSize的限制 。不满足则返回false,不创建新工作线程
  • 满足上述限制条件后,则说明可以创建新线程了,compareAndIncrementWorkerCount方法进行cas的增加当前工作线程数 。如果cas失败,则说明存在并发的更新了,则再一次的循环重试,并再次的进行上述检查 。
需要注意的是:这里面有两个for循环的原因在于v1版本省略了优雅停止的逻辑(所以实际上v1版本能去掉内层循环的) 。如果线程池处于停止状态则不能再创建新工作线程了,因此也需要判断线程池当前的状态,不满足条件则也需要返回false,不创建工作线程 。而且compareAndIncrementWorkerCount中cas更新ctl时,如果并发的线程池被停止而导致线程池状态发生了变化,也会导致cas失败重新检查 。这也是jdk的实现中为什么把线程池状态和工作线程数量绑定在一起的原因之一,这样在cas更新时可以原子性的同时检查两个字段的并发争抢 。(更具体的细节会在下一篇博客的v2版本中介绍)
创建并启动新的Worker工作线程在通过retry那部分的层层条件检查后 , 紧接着便是实际创建新工作线程的逻辑 。