补充部分---ScheduledThreadPoolExecutor类分析 线程池底层原理详解与源码分析( 三 )

2.ScheduledThreadPoolExecutor类#triggerTime方法
//获取初始的延迟执行时间(以纳秒的形式 , 相当于我在哪个时间点要执行)private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));}long triggerTime(long delay) {return System.nanoTime() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}3.ScheduledThreadPoolExecutor类#delayedExecute方法
private void delayedExecute(RunnableScheduledFuture<?> task) {//如果处于非运行状态则拒绝任务(这个方法里面比较的是不是比关闭状态大)if (isShutdown())reject(task);else {//加入队列super.getQueue().add(task);//如果加入队列后canRunInCurrentRunState检测线程池 , 返回false则移除任务if (!canRunInCurrentRunState(task) && remove(task))task.cancel(false); //以不可中断方式执行完成执行中的调度任务elseensurePrestart();}}boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {//如果处于运行状态返回trueif (!isShutdown())return true;//处于停止状态,整理状态,销毁状态,三者之一返回falseif (isStopped())return false;//处于关闭状态 , 返回run-after-shutdown参数return task.isPeriodic()? continueExistingPeriodicTasksAfterShutdown //默认false: (executeExistingDelayedTasksAfterShutdown|| task.getDelay(NANOSECONDS) <= 0);}void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize) //保持工作者与核心线程数持平addWorker(null, true);else if (wc == 0) //即时核心线程是0 , 也至少会启动一个addWorker(null, false);}【6】DelayedWorkQueue类源码分析
0.DelayedWorkQueue类#核心属性
private static final int INITIAL_CAPACITY = 16;// 初始容量private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];// 控制并发和阻塞等待private final ReentrantLock lock = new ReentrantLock();private final Condition available = lock.newCondition(); //这个可以参考take方法与offer方法,个人觉得是采用中断方式唤醒持有锁的线程private int size; // 节点数量private Thread leader;//记录持有锁的线程(当等待的时候)1.DelayedWorkQueue类#add方法
public boolean add(Runnable e) {return offer(e);}public boolean offer(Runnable x) {//空值校验if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;final ReentrantLock lock = this.lock;//加锁lock.lock();try {int i = size;// 超过容量,扩容if (i >= queue.length)grow();size = i + 1; //更新当前节点数if (i == 0) {//插入的是第一个节点(阻塞队列原本为空)queue[0] = e;setIndex(e, 0); //setIndex(e, 0)用于修改ScheduledFutureTask的heapIndex属性,表示该对象在队列里的下标} else {//阻塞队列非空siftUp(i, e); //在插入新节点后对堆进行调整,进行节点上移,保持其特性(节点的值小于子节点的值)不变}/*** 这里最好结合take方法理解一下* 队列头等于当前任务,说明了当前任务的等待时间是最小的 。此时为什么要去清空leader?* leader代表的是某一个正在等待获取元素的线程句柄,* 在take的时候因为之前的头结点时间未到,不能拿,被休眠了一定时间(而这个时间就是距离之前那个队列头结点的可以出队列的时间差) 。* 此时头结点换了,理应清空句柄,唤醒它,让它再次尝试去获取最新的头结点(就算是再次休眠 , 时间也会比之前的少) 。*/if (queue[0] == e) {leader = null;available.signal();}} finally {lock.unlock(); //解锁}return true;}2.DelayedWorkQueue类#siftUp方法
//其实把这个队列看作树结构会更容易理解(要理解数组与完全二叉树的关联)private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1; //父节点坐标RunnableScheduledFuture<?> e = queue[parent]; //获取父节点的值// 如果 节点>= 父节点 , 确定最终位置if (key.compareTo(e) >= 0)break;// 节点<父节点,将节点向上移动(就是将父节点放在k处)queue[k] = e;setIndex(e, k);k = parent;}//确定key的最后落脚处queue[k] = key;setIndex(key, k);}3.ScheduledFutureTask类#compareTo方法
/** * compareTo 作用是加入元素到延迟队列后,内部建立或者调整堆时候会使用该元素的 compareTo 方法与队列里面其他元素进行比较,* 让最快要过期的元素放到队首 。所以无论什么时候向队列里面添加元素 , 队首的的元素都是最即将过期的元素 。* 如果时间相同,序列号小的排前面 。*/public int compareTo(Delayed other) {if (other == this) // 如果2个指向的同一个对象,则返回0return 0;// other必须是ScheduledFutureTask类型的if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time; //两者之间的时间差if (diff < 0)return -1; //返回当前对象时间比目标对象小的标记【这个标记仅仅是标记,具体还要在上层方法逻辑中决定】else if (diff > 0)return 1;//返回当前对象时间比目标对象大的标记// 时间相同,比较序列号else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}// 到这里 , 说明other不是ScheduledFutureTask类型的long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}

推荐阅读