addWorkerFailed(addWorker的逆向回滚操作)addWorker中工作线程可能会启动失败,所以要对addWorker中对workers集合以及workerCount等数据的操作进行回滚 。
/*** 当创建worker出现异常失败时,对之前的操作进行回滚* 1 如果新创建的worker加入了workers集合,将其移除* 2 减少记录存活的worker个数(cas更新)* 3 检查线程池是否满足中止的状态,防止这个存活的worker线程阻止线程池的中止(v1版本不考虑 , 省略了tryTerminate)*/private void addWorkerFailed(MyWorker myWorker) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (myWorker != null) {// 如果新创建的worker加入了workers集合,将其移除workers.remove(myWorker);}// 减少存活的worker个数decrementWorkerCount();// 尝试着将当前worker线程终止(addWorkerFailed由工作线程自己调用)// tryTerminate();} finally {mainLock.unlock();}}
runWorker(工作线程核心执行逻辑)前面介绍了用户如何向线程池提交任务,以及如何创建新工作线程Worker,下面介绍工作线程在线程池中是如何运行的 。- runWorker方法内部本质上是一个无限循环 , 在进入主循环之前通过unlock方法,将内部AQS父类中的state标识为0,允许被外部中断(可以被interruptIfStarted选中而打断)
- 之后便是主循环,如果firstTask不为空(说明第一次启动),则直接调用task.run方法 。否则通过getTask方法尝试从工作队列中捞取一个任务来执行
- 在实际的任务执行前和执行后都调用对应的钩子方法(beforeExecute、afterExecute)
- 在任务执行前通过lock方法将AQS的state方法设置为1代表当前Worker正在执行任务 , 并在执行完一个任务后在finally中进行unlock解锁,令当前工作线程进入idle状态 。同时清空firstTask的值(清空后下一次循环就会通过getTask获取任务了)并令Worker中的completedTasks统计指标也自增1
- 如果任务执行过程中出现了异常 , 则catch住并最终向上抛出跳出主循环,finally中执行processWorkerExit(认为任务一旦执行出现了异常,则很可能工作线程内部的一些状态已经损坏 , 需要重新创建一个新的工作线程来代替出异常的老工作线程)
- 有两种情况会导致执行processWorkerExit,一种是上面说的任务执行时出现了异常,此时completedAbruptly=true;还有一种可能时getTask因为一些原因返回了null , 此时completedAbruptly=false 。completedAbruptly会作为processWorkerExit的参数传递 。
/*** worker工作线程主循环执行逻辑* */private void runWorker(MyWorker myWorker) {// 时worker线程的run方法调用的,此时的current线程的是worker线程Thread workerThread = Thread.currentThread();Runnable task = myWorker.firstTask;// 已经暂存了firstTask,将其清空(有地方根据firstTask是否存在来判断工作线程中负责的任务是否是新提交的)myWorker.firstTask = null;// 将state由初始化时的-1设置为0// 标识着此时当前工作线程开始工作了 , 这样可以被interruptIfStarted选中myWorker.unlock();// 默认线程是由于中断退出的boolean completedAbruptly = true;try {// worker线程处理主循环,核心逻辑while (task != null || (task = getTask()) != null) {// 将state由0标识为1,代表着其由idle状态变成了正在工作的状态// 这样interruptIdleWorkers中的tryLock会失败 , 这样工作状态的线程就不会被该方法中断任务的正常执行myWorker.lock();// v1版本此处省略优雅停止相关的核心逻辑try {// 任务执行前的钩子函数beforeExecute(workerThread, task);Throwable thrown = null;try {// 拿到的任务开始执行task.run();} catch (RuntimeException | Error x) {// 使用thrown收集抛出的异常,传递给afterExecutethrown = x;// 同时抛出错误,从而中止主循环throw x;} catch (Throwable x) {// 使用thrown收集抛出的异常 , 传递给afterExecutethrown = x;// 同时抛出错误,从而中止主循环throw new Error(x);} finally {// 任务执行后的钩子函数,如果任务执行时抛出了错误/异常,thrown不为nullafterExecute(task, thrown);}} finally {// 将task设置为null,令下一次while循环通过getTask获得新任务task = null;// 无论执行时是否存在异常,已完成的任务数加1myWorker.completedTasks++;// 无论如何将myWorker解锁 , 标识为idle状态myWorker.unlock();}}// getTask返回了null,说明没有可执行的任务或者因为idle超时、线程数超过配置等原因需要回收当前线程 。// 线程正常的退出,completedAbruptly为falsecompletedAbruptly = false;}finally {// getTask返回null,线程正常的退出 , completedAbruptly值为false// task.run()执行时抛出了异常/错误,直接跳出了主循环 , 此时completedAbruptly为初始化时的默认值trueprocessWorkerExit(myWorker, completedAbruptly);// processWorkerExit执行完成后,worker线程对应的run方法(run->runWorker)也会执行完毕// 此时线程对象会进入终止态 , 等待操作系统回收// 而且processWorkerExit方法内将传入的Worker从workers集合中移除,jvm中的对象也会因为不再被引用而被GC回收// 此时,当前工作线程所占用的所有资源都已释放完毕}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 我的Vue之旅 10 Gin重写后端、实现页面详情页 Mysql + Golang + Gin
- zk系列三:zookeeper实战之分布式锁实现
- 【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC
- OPPO手机怎么才能设置到自己心仪的彩铃
- 手机动态彩铃是怎么设置的(自己手机的彩铃能设置吗)
- 第2-1-5章 docker安装MinIO实现文件存储服务-springboot整合minio-minio全网最全的资料
- 【深入浅出 Yarn 架构与实现】2-1 Yarn 基础库概述
- Golang 实现时间戳和时间的转化
- 二叉搜索树 - C++ 实现
- 【深入浅出 Yarn 架构与实现】1-2 搭建 Hadoop 源码阅读环境