可以看到,execute方法源码中对于任务处理的逻辑很清晰 , 也能与ThreadPoolExecutor运行时工作流程中所介绍的流程所匹配 。
addWorker方法(创建新的工作线程)在execute方法中当需要创建核心线程或普通线程时,便需要通过addWorker方法尝试创建一个新的工作线程 。
/*** 向线程池中加入worker* */private boolean addWorker(Runnable firstTask, boolean core) {// retry标识外层循环retry:for (;;) {int currentCtl = ctl.get();// 用于cas更新workerCount的内层循环(注意这里面与jdk的写法不同,改写成了逻辑一致但更可读的形式)for (;;) {// 判断当前worker数量是否超过了限制int workerCount = workerCountOf(currentCtl);if (workerCount >= CAPACITY) {// 当前worker数量超过了设计上允许的最大限制return false;}if (core) {// 创建的是核心线程,判断当前线程数是否已经超过了指定的核心线程数if (workerCount >= this.corePoolSize) {// 超过了核心线程数,创建核心worker线程失败return false;}} else {// 创建的是非核心线程,判断当前线程数是否已经超过了指定的最大线程数if (workerCount >= this.maximumPoolSize) {// 超过了最大线程数,创建非核心worker线程失败return false;}}// cas更新workerCount的值boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);if (casSuccess) {// cas成功,跳出外层循环break retry;}// compareAndIncrementWorkerCount方法cas争抢失败 , 重新执行内层循环}}boolean workerStarted = false;MyWorker newWorker = null;try {// 创建一个新的workernewWorker = new MyWorker(firstTask);final Thread myWorkerThread = newWorker.thread;if (myWorkerThread != null) {// MyWorker初始化时内部线程创建成功// 加锁,防止并发更新final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (myWorkerThread.isAlive()) {// 预检查线程的状态,刚初始化的worker线程必须是未唤醒的状态throw new IllegalThreadStateException();}// 加入worker集合this.workers.add(newWorker);int workerSize = workers.size();if (workerSize > largestPoolSize) {// 如果当前worker个数超过了之前记录的最大存活线程数 , 将其更新largestPoolSize = workerSize;}// 创建成功} finally {// 无论是否发生异常,都先将主控锁解锁mainLock.unlock();}// 加入成功,启动worker线程myWorkerThread.start();// 标识为worker线程启动成功 , 并作为返回值返回workerStarted = true;}}finally {if (!workerStarted) {addWorkerFailed(newWorker);}}return workerStarted;}
addWorker可以分为两部分:判断当前是否满足创建新工作线程的条件、创建并启动新的Worker工作线程 。
判断当前是否满足创建新工作线程的条件入口处开始的retry标识的for循环部分,便是用于判断是否满足创建新工作线程的条件 。
- 首先判断当前工作线程数量是否超过了理论的最大值CAPACITY(即2^29-1),超过了则不能创建,返回false , 不创建新工作线程
- 根据boolean类型参数core判断是否创建核心工作线程,core=true则判断是否超过了corePoolSize的限制 , core=false则判断是否超过了maximumPoolSize的限制 。不满足则返回false,不创建新工作线程
- 满足上述限制条件后,则说明可以创建新线程了,compareAndIncrementWorkerCount方法进行cas的增加当前工作线程数 。如果cas失败,则说明存在并发的更新了,则再一次的循环重试,并再次的进行上述检查 。
创建并启动新的Worker工作线程在通过retry那部分的层层条件检查后 , 紧接着便是实际创建新工作线程的逻辑 。
- 首先通过Worker的构造方法创建一个新的Worker对象,并将用户提交的任务作为firstTask参数传入 。
- 判断Worker在构造时线程工厂是否正确的生成了一个Thread(判空) , 如果thread == null的话直接返回false,标识创建新工作线程失败 。
- 在mainLock的保护下 , 将新创建的worker线程加入workers集合中
- 启动Worker中的线程(myWorkerThread.start()),启动后会执行Worker类中的run方法,新的工作线程会执行runWorker方法(下文会展开分析runWorker)
推荐阅读
- 我的Vue之旅 10 Gin重写后端、实现页面详情页 Mysql + Golang + Gin
- zk系列三:zookeeper实战之分布式锁实现
- 【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC
- OPPO手机怎么才能设置到自己心仪的彩铃
- 手机动态彩铃是怎么设置的(自己手机的彩铃能设置吗)
- 第2-1-5章 docker安装MinIO实现文件存储服务-springboot整合minio-minio全网最全的资料
- 【深入浅出 Yarn 架构与实现】2-1 Yarn 基础库概述
- Golang 实现时间戳和时间的转化
- 二叉搜索树 - C++ 实现
- 【深入浅出 Yarn 架构与实现】1-2 搭建 Hadoop 源码阅读环境