自己动手实现线程池 jdk线程池ThreadPoolExecutor工作原理解析(一)( 八 )

processWorkerExit(处理工作线程退出)在runWorker中,如果getTask方法没有拿到任务返回了null或者任务在执行时抛出了异常就会在最终的finally块中调用processWorkerExit方法,令当前工作线程销毁退出 。

  • processWorkerExit方法内会将当前线程占用的一些资源做清理,比如从workers中移除掉当前线程(利于Worker对象的GC) , 并令当前线程workerCount减一(completedAbruptly=true,说明是中断导致的退出,getTask中没来得及减workerCount , 在这里补正)
  • completedAbruptly=true,说明是runWorker中任务异常导致的线程退出,无条件的通过addWorker重新创建一个新的工作线程代替当前退出的工作线程 。
  • completedAbruptly=false,在退出当前工作线程后,需要判断一下退出后当前所存活的工作线程数量是否满足要求 。比如allowCoreThreadTimeOut=false时,当前工作线程个数是否不低于corePoolSize等,如果不满足要求则通过addWorker重新创建一个新的线程 。
工作线程退出时所占用资源的回收
  • processWorkerExit方法执行完毕后,当前工作线程就完整的从当前线程池中退出了(workers中没有了引用,workerCount减1了),GC便会将内存中的Worker对象所占用的内存给回收掉 。
  • 同时runWorker中最后执行完processWorkerExit后,工作线程的run方法也return了,标识着整个线程正常退出了 , 操作系统层面上也会将线程转为终止态并最终回收 。至此,线程占用的所有资源就被彻底的回收干净了 。
/*** 处理worker线程退出* @param myWorker 需要退出的工作线程对象* @param completedAbruptly 是否是因为中断异常的原因,而需要回收* */private void processWorkerExit(MyWorker myWorker, boolean completedAbruptly) {if (completedAbruptly) {// 如果completedAbruptly=true,说明是任务在run方法执行时出错导致的线程退出// 而正常退出时completedAbruptly=false,在getTask中已经将workerCount的值减少了decrementWorkerCount();}ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 线程池全局总完成任务数累加上要退出的工作线程已完成的任务数this.completedTaskCount += myWorker.completedTasks;// workers集合中将当前工作线程剔除workers.remove(myWorker);// completedTaskCount是long类型的,workers是HashSet,// 都是非线程安全的,所以在mainLock的保护进行修改} finally {mainLock.unlock();}int currentCtl = this.ctl.get();if (!completedAbruptly) {// completedAbruptly=false , 说明不是因为中断异常而退出的// min标识当前线程池允许的最小线程数量// 1 如果allowCoreThreadTimeOut为true,则核心线程也可以被销毁,min=0// 2 如果allowCoreThreadTimeOut为false,则min应该为所允许的核心线程个数,min=corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty()) {// 如果min为0了,但工作队列不为空,则修正min=1,因为至少需要一个工作线程来将工作队列中的任务消费、处理掉min = 1;}if (workerCountOf(currentCtl) >= min) {// 如果当前工作线程数大于了min,当前线程数量是足够的,直接返回(否则要执行下面的addWorker恢复)return;}}// 两种场景会走到这里进行addWorker操作// 1 completedAbruptly=true,说明线程是因为中断异常而退出的 , 需要重新创建一个新的工作线程// 2 completedAbruptly=false,且上面的workerCount<min,则说明当前工作线程数不够,需要创建一个// 为什么参数core传的是false呢?// 因为completedAbruptly=true而中断退出的线程,无论当前工作线程数是否大于核心线程,都需要创建一个新的线程来代替原有的被退出的线程addWorker(null, false);}动态修改配置参数ThreadPoolExecutor除了支持启动前通过构造函数设置配置参数外 , 也允许在线程池运行的过程中动态的更改配置 。而要实现动态的修改配置,麻烦程度要比启动前静态的指定大得多 。举个例子,在线程池的运行过程中如果当前corePoolSize=20,且已经创建了20个核心线程时(workerCount=20),现在将corePoolSize减少为10或者增大为30时应该如何实时的生效呢?下面通过内嵌于代码中的注释 , 详细的说明了allowCoreThreadTimeOut、corePoolSize、maximumPoolSize这三个关键配置参数实现动态修改的原理 。
/*** 设置是否允许核心线程idle超时后退出* */public void allowCoreThreadTimeOut(boolean value) {if (value && keepAliveTime <= 0) {throw new IllegalArgumentException("Core threads must have nonzero keep alive times");}// 判断一下新旧值是否相等,避免无意义的volatile变量更新,导致不必要的cpu cache同步if (value != allowCoreThreadTimeOut) {allowCoreThreadTimeOut = value;if (value) {// 参数值value为true,说明之前不允许核心线程由于idle超时而退出// 而此时更新为true说明现在允许了 , 则通过interruptIdleWorkers唤醒所有的idle线程// 令其走一遍runWorker中的逻辑,尝试着让idle超时的核心线程及时销毁interruptIdleWorkers();}}}/*** 动态更新核心线程最大值corePoolSize* */public void setCorePoolSize(int corePoolSize) {if (corePoolSize < 0) {throw new IllegalArgumentException();}// 计算差异int delta = corePoolSize - this.corePoolSize;// 赋值this.corePoolSize = corePoolSize;if (workerCountOf(this.ctl.get()) > corePoolSize) {// 更新完毕后,发现当前工作线程数超过了指定的值// 唤醒所有idle线程,让目前空闲的idle超时的线程在workerCount大于maximumPoolSize时及时销毁interruptIdleWorkers();} else if (delta > 0) {// 差异大于0 , 代表着新值大于旧值// We don't really know how many new threads are "needed".// As a heuristic, prestart enough new workers (up to new// core size) to handle the current number of tasks in// queue, but stop if queue becomes empty while doing so.// 我们无法确切的知道有多少新的线程是所需要的 。// 启发式的预先启动足够的新工作线程用于处理工作队列中的任务// 但当执行此操作时工作队列为空了 , 则立即停止此操作(队列为空了说明当前负载较低,再创建更多的工作线程是浪费资源)// 取差异和当前工作队列中的最小值为kint k = Math.min(delta, workQueue.size());// 尝试着一直增加新的工作线程,直到和k相同// 这样设计的目的在于控制增加的核心线程数量,不要一下子创建过多核心线程// 举个例子:原来的corePoolSize是10 , 且工作线程数也是10,现在新值设置为了30,新值比旧值大20,理论上应该直接创建20个核心工作线程// 而工作队列中的任务数只有10 , 那么这个时候直接创建20个新工作线程是没必要的,只需要一个一个创建,在创建的过程中新的线程会尽量的消费工作队列中的任务// 这样就可以以一种启发性的方式创建合适的新工作线程,一定程度上节约资源 。后面再有新的任务提交时,再从runWorker方法中去单独创建核心线程(类似惰性创建)while (k-- > 0 && addWorker(null, true)) {if (workQueue.isEmpty()) {// 其它工作线程在循环的过程中也在消费工作线程,且用户也可能不断地提交任务// 这是一个动态的过程 , 但一旦发现当前工作队列为空则立即结束break;}}}}/*** 动态更新最大线程数maximumPoolSize* */public void setMaximumPoolSize(int maximumPoolSize) {if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {throw new IllegalArgumentException();}this.maximumPoolSize = maximumPoolSize;if (workerCountOf(this.ctl.get())> maximumPoolSize) {// 更新完毕后,发现当前工作线程数超过了指定的值// 唤醒所有idle线程,让目前空闲的idle超时的线程在workerCount大于maximumPoolSize时及时销毁interruptIdleWorkers();}}

推荐阅读