自己动手实现线程池 jdk线程池ThreadPoolExecutor工作原理解析(一)( 四 )

Worker工作线程ThreadPoolExecutor中的工作线程并不是裸的Thread,而是被封装在了一个Worker的内部类中 。Worker实现了Runnable所以可以作为一个普通的线程来启动,在run方法中只是简单的调用了一下runWorker(runWorker后面再展开) 。Worker类有三个成员属性:

  1. Thread thread(被封装的工作线程对象)
  2. Runnable firstTask(提交任务时,创建新Worker对象时指定的第一次要执行的任务(后续线程就会去拉取工作队列里的任务执行了))
  3. volatile long completedTasks(统计用,计算当前工作线程总共完成了多少个任务)
Worker内封装的实际的工作线程对象thread,其在构造函数中由线程池的线程工厂threadFactory生成,传入this,所以thread在start后 , 便会调用run方法进而执行runWorker 。线程工厂可以由用户在创建线程池时通过参数指定 , 因此用户在自由控制所生成的工作线程的同时,也需要保证newThread能正确的返回一个可用的线程对象 。
除此之外,Worker对象还继承了AbstractQueuedSynchronizer(AQS)类,简单的实现了一个不可重入的互斥锁 。对AQS互斥模式不太了解的读者可以参考一下我之前关于AQS互斥模式的博客:AQS互斥模式与ReentrantLock可重入锁原理解析AQS中维护了一个volatile修饰的int类型的成员变量state,其具体的含义可以由使用者自己定义 。在Worker中,state的值有三种状态:
  1. state=-1,标识工作线程还未启动(不会被interruptIfStarted打断)
  2. state=0,标识工作线程已经启动,但没有开始处理任务(可能是在等待任务,idle状态)
  3. state=1,标识worker线程正在执行任务(runWorker方法中,成功获得任务后,通过lock方法将state设置为1)
具体这三种情况分别在什么时候出现会在下面解析提交任务源码的那部分里详细介绍 。
/*** jdk的实现中令Worker继承AbstractQueuedSynchronizer并实现了一个不可重入的锁* AQS中的state属性含义* -1:标识工作线程还未启动*0:标识工作线程已经启动 , 但没有开始处理任务(可能是在等待任务 , idle状态)*1:标识worker线程正在执行任务(runWorker中 , 成功获得任务后,通过lock方法将state设置为1)* */private final class MyWorker extends AbstractQueuedSynchronizer implements Runnable{final Thread thread;Runnable firstTask;volatile long completedTasks;public MyWorker(Runnable firstTask) {this.firstTask = firstTask;// newThread可能是nullthis.thread = getThreadFactory().newThread(this);}@Overridepublic void run() {runWorker(this);}protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock(){acquire(1);}public boolean tryLock(){return tryAcquire(1);}public void unlock(){release(1);}public boolean isLocked(){return isHeldExclusively();}void interruptIfStarted() {Thread t;// 三个条件同时满足 , 才去中断Worker对应的thread// getState() >= 0,用于过滤还未执行runWorker的,刚入队初始化的Worker// thread != null,用于过滤掉构造方法中ThreadFactory.newThread返回null的Worker// !t.isInterrupted(),用于过滤掉那些已经被其它方式中断的Worker线程(比如用户自己去触发中断,提前终止线程池中的任务)if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}execute执行提交的任务下面介绍本篇博客的重点,即线程池是如何执行用户所提交的任务的 。用户提交任务的入口是public的execute方法,Runnable类型的参数command就是提交的要执行的任务 。
MyThreadPoolExecutorV1的execute方法(相比jdk的实现v1版本去掉了关于优雅停止的逻辑)/*** 提交任务,并执行* */public void execute(Runnable command) {if (command == null){throw new NullPointerException("command参数不能为空");}int currentCtl = this.ctl.get();if (workerCountOf(currentCtl) < this.corePoolSize) {// 如果当前存在的worker线程数量低于指定的核心线程数量,则创建新的核心线程boolean addCoreWorkerSuccess = addWorker(command,true);if(addCoreWorkerSuccess){// addWorker添加成功,直接返回即可return;}}// 走到这里有两种情况// 1 因为核心线程超过限制(workerCountOf(currentCtl) < corePoolSize == false),需要尝试尝试将任务放入阻塞队列// 2 addWorker返回false,创建核心工作线程失败if(this.workQueue.offer(command)){// workQueue.offer入队成功if(workerCountOf(currentCtl) == 0){// 在corePoolSize为0的情况下 , 当前不存在存活的核心线程// 一个任务在入队之后,如果当前线程池中一个线程都没有,则需要兜底的创建一个非核心线程来处理入队的任务// 因此firstTask为null,目的是先让任务先入队后创建线程去拉取任务并执行addWorker(null,false);}else{// 加入队列成功 , 且当前存在worker线程,成功返回return;}}else{// 阻塞队列已满,尝试创建一个新的非核心线程处理boolean addNonCoreWorkerSuccess = addWorker(command,false);if(!addNonCoreWorkerSuccess){// 创建非核心线程失败,执行拒绝策略(失败的原因和前面创建核心线程addWorker的原因类似)reject(command);}else{// 创建非核心线程成功,成功返回return;}}}/*** 根据指定的拒绝处理器 , 执行拒绝策略* */private void reject(Runnable command) {this.handler.rejectedExecution(command, this);}

推荐阅读