ThreadPoolExecutor共有五种状态,但有四种都和优雅停止有关(除了RUNNING) 。但由于v1版本的MyThreadPoolExecutorV1不支持优雅停止 , 所以不在本篇博客中讲解这些状态具体的含义以及其是如何变化的(下一篇v2版本的博客中展开)
记录线程池运行过程中的一些关键指标
- completedTaskCount(线程池自启动后已完成的总任务数)
- largestPoolSize(线程池自启动后工作线程个数的最大值)在运行过程中,ThreadPoolExecutor会在对应的地方进行埋点,统计一些指标并提供相应的api给用户实时的查询,以提高线程池工作时的可观测性 。
public class MyThreadPoolExecutorV1 implements MyThreadPoolExecutor{/*** 指定的最大核心线程数量* */private volatile int corePoolSize;/*** 指定的最大线程数量* */private volatile int maximumPoolSize;/*** 线程保活时间(单位:纳秒 nanos)* */private volatile long keepAliveTime;/*** 存放任务的工作队列(阻塞队列)* */private final BlockingQueue<Runnable> workQueue;/*** 线程工厂* */private volatile ThreadFactory threadFactory;/*** 拒绝策略* */private volatile MyRejectedExecutionHandler handler;/*** 是否允许核心线程在idle一定时间后被销毁(和非核心线程一样)* */private volatile boolean allowCoreThreadTimeOut;/*** 主控锁* */private final ReentrantLock mainLock = new ReentrantLock();/*** 当前线程池已完成的任务数量* */private long completedTaskCount;/*** 维护当前存活的worker线程集合* */private final HashSet<MyWorker> workers = new HashSet<>();/*** 当前线程池中存在的worker线程数量 + 状态的一个聚合(通过一个原子int进行cas,来避免对两个业务属性字段加锁来保证一致性)* v1版本只关心前者,即worker线程数量*/private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;/*** 32位的有符号整数,有3位是用来存放线程池状态的,所以用来维护当前工作线程个数的部分就只能用29位了* 被占去的3位中,有1位原来的符号位 , 2位是原来的数值位 。* */private static final int CAPACITY= (1 << COUNT_BITS) - 1;/*** 线程池状态poolStatus常量(状态值只会由小到大,单调递增)* 线程池状态迁移图:*SHUTDOWN* RUNNING↓TIDYING → TERMINATED*STOP* 1 RUNNING状态,代表着线程池处于正常运行的状态 。能正常的接收并处理提交的任务* 线程池对象初始化时,状态为RUNNING* 对应逻辑:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));** 2 SHUTDOWN状态,代表线程池处于停止对外服务的状态 。不再接收新提交的任务 , 但依然会将workQueue工作队列中积压的任务处理完* 调用了shutdown方法时 , 状态由RUNNING -> SHUTDOWN* 对应逻辑:shutdown方法中的advanceRunState(SHUTDOWN);** 3 STOP状态,代表线程池处于停止状态 。不再接受新提交的任务,同时也不再处理workQueue工作队列中积压的任务,当前还在处理任务的工作线程将收到interrupt中断通知* 之前未调用shutdown方法,直接调用了shutdownNow方法,状态由RUNNING -> STOP* 之前先调用了shutdown方法 , 后调用了shutdownNow方法 , 状态由SHUTDOWN -> STOP* 对应逻辑:shutdownNow方法中的advanceRunState(STOP);** 4 TIDYING状态,代表着线程池即将完全终止,正在做最后的收尾工作* 当前线程池状态为SHUTDOWN,任务被消费完工作队列workQueue为空,且工作线程全部退出完成工作线程集合workers为空时,tryTerminate方法中将状态由SHUTDOWN->TIDYING* 当前线程池状态为STOP,工作线程全部退出完成工作线程集合workers为空时,tryTerminate方法中将状态由STOP->TIDYING* 对应逻辑:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)** 5 TERMINATED状态,代表着线程池完全的关闭 。之前线程池已经处于TIDYING状态,且调用的钩子函数terminated已返回* 当前线程池状态为TIDYING,调用的钩子函数terminated已返回* 对应逻辑:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));* */private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctlprivate static int workerCountOf(int c){ return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }/*** 跟踪线程池曾经有过的最大线程数量(只能在mainLock的并发保护下更新)*/private int largestPoolSize;private boolean compareAndIncrementWorkerCount(int expect) {return this.ctl.compareAndSet(expect, expect + 1);}private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}private void decrementWorkerCount() {do {// cas更新 , workerCount自减1} while (!compareAndDecrementWorkerCount(ctl.get()));}public MyThreadPoolExecutorV1(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,MyRejectedExecutionHandler handler) {// 基本的参数校验if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {throw new IllegalArgumentException();}if (unit == null || workQueue == null || threadFactory == null || handler == null) {throw new NullPointerException();}// 设置成员变量this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}public ThreadFactory getThreadFactory() {return threadFactory;}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 我的Vue之旅 10 Gin重写后端、实现页面详情页 Mysql + Golang + Gin
- zk系列三:zookeeper实战之分布式锁实现
- 【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC
- OPPO手机怎么才能设置到自己心仪的彩铃
- 手机动态彩铃是怎么设置的(自己手机的彩铃能设置吗)
- 第2-1-5章 docker安装MinIO实现文件存储服务-springboot整合minio-minio全网最全的资料
- 【深入浅出 Yarn 架构与实现】2-1 Yarn 基础库概述
- Golang 实现时间戳和时间的转化
- 二叉搜索树 - C++ 实现
- 【深入浅出 Yarn 架构与实现】1-2 搭建 Hadoop 源码阅读环境