Worker工作线程ThreadPoolExecutor中的工作线程并不是裸的Thread,而是被封装在了一个Worker的内部类中 。Worker实现了Runnable所以可以作为一个普通的线程来启动,在run方法中只是简单的调用了一下runWorker(runWorker后面再展开) 。Worker类有三个成员属性:
- Thread thread(被封装的工作线程对象)
- Runnable firstTask(提交任务时,创建新Worker对象时指定的第一次要执行的任务(后续线程就会去拉取工作队列里的任务执行了))
- volatile long completedTasks(统计用,计算当前工作线程总共完成了多少个任务)
除此之外,Worker对象还继承了AbstractQueuedSynchronizer(AQS)类,简单的实现了一个不可重入的互斥锁 。对AQS互斥模式不太了解的读者可以参考一下我之前关于AQS互斥模式的博客:AQS互斥模式与ReentrantLock可重入锁原理解析AQS中维护了一个volatile修饰的int类型的成员变量state,其具体的含义可以由使用者自己定义 。在Worker中,state的值有三种状态:
- state=-1,标识工作线程还未启动(不会被interruptIfStarted打断)
- state=0,标识工作线程已经启动,但没有开始处理任务(可能是在等待任务,idle状态)
- state=1,标识worker线程正在执行任务(runWorker方法中,成功获得任务后,通过lock方法将state设置为1)
/*** jdk的实现中令Worker继承AbstractQueuedSynchronizer并实现了一个不可重入的锁* AQS中的state属性含义* -1:标识工作线程还未启动*0:标识工作线程已经启动 , 但没有开始处理任务(可能是在等待任务 , idle状态)*1:标识worker线程正在执行任务(runWorker中 , 成功获得任务后,通过lock方法将state设置为1)* */private final class MyWorker extends AbstractQueuedSynchronizer implements Runnable{final Thread thread;Runnable firstTask;volatile long completedTasks;public MyWorker(Runnable firstTask) {this.firstTask = firstTask;// newThread可能是nullthis.thread = getThreadFactory().newThread(this);}@Overridepublic void run() {runWorker(this);}protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock(){acquire(1);}public boolean tryLock(){return tryAcquire(1);}public void unlock(){release(1);}public boolean isLocked(){return isHeldExclusively();}void interruptIfStarted() {Thread t;// 三个条件同时满足 , 才去中断Worker对应的thread// getState() >= 0,用于过滤还未执行runWorker的,刚入队初始化的Worker// thread != null,用于过滤掉构造方法中ThreadFactory.newThread返回null的Worker// !t.isInterrupted(),用于过滤掉那些已经被其它方式中断的Worker线程(比如用户自己去触发中断,提前终止线程池中的任务)if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
execute执行提交的任务下面介绍本篇博客的重点,即线程池是如何执行用户所提交的任务的 。用户提交任务的入口是public的execute方法,Runnable类型的参数command就是提交的要执行的任务 。MyThreadPoolExecutorV1的execute方法(相比jdk的实现v1版本去掉了关于优雅停止的逻辑)
/*** 提交任务,并执行* */public void execute(Runnable command) {if (command == null){throw new NullPointerException("command参数不能为空");}int currentCtl = this.ctl.get();if (workerCountOf(currentCtl) < this.corePoolSize) {// 如果当前存在的worker线程数量低于指定的核心线程数量,则创建新的核心线程boolean addCoreWorkerSuccess = addWorker(command,true);if(addCoreWorkerSuccess){// addWorker添加成功,直接返回即可return;}}// 走到这里有两种情况// 1 因为核心线程超过限制(workerCountOf(currentCtl) < corePoolSize == false),需要尝试尝试将任务放入阻塞队列// 2 addWorker返回false,创建核心工作线程失败if(this.workQueue.offer(command)){// workQueue.offer入队成功if(workerCountOf(currentCtl) == 0){// 在corePoolSize为0的情况下 , 当前不存在存活的核心线程// 一个任务在入队之后,如果当前线程池中一个线程都没有,则需要兜底的创建一个非核心线程来处理入队的任务// 因此firstTask为null,目的是先让任务先入队后创建线程去拉取任务并执行addWorker(null,false);}else{// 加入队列成功 , 且当前存在worker线程,成功返回return;}}else{// 阻塞队列已满,尝试创建一个新的非核心线程处理boolean addNonCoreWorkerSuccess = addWorker(command,false);if(!addNonCoreWorkerSuccess){// 创建非核心线程失败,执行拒绝策略(失败的原因和前面创建核心线程addWorker的原因类似)reject(command);}else{// 创建非核心线程成功,成功返回return;}}}/*** 根据指定的拒绝处理器 , 执行拒绝策略* */private void reject(Runnable command) {this.handler.rejectedExecution(command, this);}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 我的Vue之旅 10 Gin重写后端、实现页面详情页 Mysql + Golang + Gin
- zk系列三:zookeeper实战之分布式锁实现
- 【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC
- OPPO手机怎么才能设置到自己心仪的彩铃
- 手机动态彩铃是怎么设置的(自己手机的彩铃能设置吗)
- 第2-1-5章 docker安装MinIO实现文件存储服务-springboot整合minio-minio全网最全的资料
- 【深入浅出 Yarn 架构与实现】2-1 Yarn 基础库概述
- Golang 实现时间戳和时间的转化
- 二叉搜索树 - C++ 实现
- 【深入浅出 Yarn 架构与实现】1-2 搭建 Hadoop 源码阅读环境