自己动手实现线程池 jdk线程池ThreadPoolExecutor工作原理解析(一)( 六 )

  • 如果Worker中的线程不是alive状态等原因导致工作线程启动失败 , 则在finally中通过addWorkerFailed进行一系列的回滚操作
  • 虽然在前面线程池工作流程的分析中提到了核心线程与非核心线程的概念,但Worker类中实际上并没有核心/非核心的标识 。经过了工作线程启动前的条件判断后,新创建的工作线程实际上并没有真正的核心与非核心的差别 。
    addWorkerFailed(addWorker的逆向回滚操作)addWorker中工作线程可能会启动失败,所以要对addWorker中对workers集合以及workerCount等数据的操作进行回滚 。
    /*** 当创建worker出现异常失败时,对之前的操作进行回滚* 1 如果新创建的worker加入了workers集合,将其移除* 2 减少记录存活的worker个数(cas更新)* 3 检查线程池是否满足中止的状态,防止这个存活的worker线程阻止线程池的中止(v1版本不考虑 , 省略了tryTerminate)*/private void addWorkerFailed(MyWorker myWorker) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (myWorker != null) {// 如果新创建的worker加入了workers集合,将其移除workers.remove(myWorker);}// 减少存活的worker个数decrementWorkerCount();// 尝试着将当前worker线程终止(addWorkerFailed由工作线程自己调用)// tryTerminate();} finally {mainLock.unlock();}}runWorker(工作线程核心执行逻辑)前面介绍了用户如何向线程池提交任务,以及如何创建新工作线程Worker,下面介绍工作线程在线程池中是如何运行的 。
    • runWorker方法内部本质上是一个无限循环 , 在进入主循环之前通过unlock方法,将内部AQS父类中的state标识为0,允许被外部中断(可以被interruptIfStarted选中而打断)
    • 之后便是主循环,如果firstTask不为空(说明第一次启动),则直接调用task.run方法 。否则通过getTask方法尝试从工作队列中捞取一个任务来执行
    • 在实际的任务执行前和执行后都调用对应的钩子方法(beforeExecute、afterExecute)
    • 在任务执行前通过lock方法将AQS的state方法设置为1代表当前Worker正在执行任务 , 并在执行完一个任务后在finally中进行unlock解锁,令当前工作线程进入idle状态 。同时清空firstTask的值(清空后下一次循环就会通过getTask获取任务了)并令Worker中的completedTasks统计指标也自增1
    • 如果任务执行过程中出现了异常 , 则catch住并最终向上抛出跳出主循环,finally中执行processWorkerExit(认为任务一旦执行出现了异常,则很可能工作线程内部的一些状态已经损坏 , 需要重新创建一个新的工作线程来代替出异常的老工作线程)
    • 有两种情况会导致执行processWorkerExit,一种是上面说的任务执行时出现了异常,此时completedAbruptly=true;还有一种可能时getTask因为一些原因返回了null , 此时completedAbruptly=false 。completedAbruptly会作为processWorkerExit的参数传递 。
    /*** worker工作线程主循环执行逻辑* */private void runWorker(MyWorker myWorker) {// 时worker线程的run方法调用的,此时的current线程的是worker线程Thread workerThread = Thread.currentThread();Runnable task = myWorker.firstTask;// 已经暂存了firstTask,将其清空(有地方根据firstTask是否存在来判断工作线程中负责的任务是否是新提交的)myWorker.firstTask = null;// 将state由初始化时的-1设置为0// 标识着此时当前工作线程开始工作了 , 这样可以被interruptIfStarted选中myWorker.unlock();// 默认线程是由于中断退出的boolean completedAbruptly = true;try {// worker线程处理主循环,核心逻辑while (task != null || (task = getTask()) != null) {// 将state由0标识为1,代表着其由idle状态变成了正在工作的状态// 这样interruptIdleWorkers中的tryLock会失败 , 这样工作状态的线程就不会被该方法中断任务的正常执行myWorker.lock();// v1版本此处省略优雅停止相关的核心逻辑try {// 任务执行前的钩子函数beforeExecute(workerThread, task);Throwable thrown = null;try {// 拿到的任务开始执行task.run();} catch (RuntimeException | Error x) {// 使用thrown收集抛出的异常,传递给afterExecutethrown = x;// 同时抛出错误,从而中止主循环throw x;} catch (Throwable x) {// 使用thrown收集抛出的异常 , 传递给afterExecutethrown = x;// 同时抛出错误,从而中止主循环throw new Error(x);} finally {// 任务执行后的钩子函数,如果任务执行时抛出了错误/异常,thrown不为nullafterExecute(task, thrown);}} finally {// 将task设置为null,令下一次while循环通过getTask获得新任务task = null;// 无论执行时是否存在异常,已完成的任务数加1myWorker.completedTasks++;// 无论如何将myWorker解锁 , 标识为idle状态myWorker.unlock();}}// getTask返回了null,说明没有可执行的任务或者因为idle超时、线程数超过配置等原因需要回收当前线程 。// 线程正常的退出,completedAbruptly为falsecompletedAbruptly = false;}finally {// getTask返回null,线程正常的退出 , completedAbruptly值为false// task.run()执行时抛出了异常/错误,直接跳出了主循环 , 此时completedAbruptly为初始化时的默认值trueprocessWorkerExit(myWorker, completedAbruptly);// processWorkerExit执行完成后,worker线程对应的run方法(run->runWorker)也会执行完毕// 此时线程对象会进入终止态 , 等待操作系统回收// 而且processWorkerExit方法内将传入的Worker从workers集合中移除,jvm中的对象也会因为不再被引用而被GC回收// 此时,当前工作线程所占用的所有资源都已释放完毕}}

    推荐阅读