getTask尝试获取任务执行runWorker中是通过getTask获取任务的,getTask中包含着工作线程是如何从工作队列中获取任务的关键逻辑 。
- 在获取任务前,需要通过getTask检查当前线程池的线程数量是否超过了参数配置(启动后被动态调整了),因此需要先获得当前线程池工作线程总数workCount 。如果当前工作线程数量超过了指定的最大线程个数maximumPoolSize限制 , 则说明当前线程需要退出了
- timed标识用于决定当前线程如何从工作队列(阻塞队列)中获取新任务,如果timed为true则通过poll方法获取同时指定相应的超时时间(配置参数keepAliveTime),如果timed为false则通过take方法无限期的等待 。如果工作队列并不为空 , 则poll和take方法都会立即返回一个任务对象 。而当工作队列为空时,工作线程则会阻塞在工作队列上以让出CPU(idle状态)直到有新的任务到来而被唤醒(或者超时唤醒) 。这也是存储任务的workQueue不能是普通的队列,而必须是阻塞队列的原因 。(对阻塞队列工作原理不太清楚的读者可以参考我以前的博客:自己动手实现一个阻塞队列)
- timed的值由两方面共同决定 。一是配置参数allowCoreThreadTimeOut是否为true , 为true的话说明不管是核心线程还是非核心线程都需要在idle等待keepAliveTime后销毁退出 。所以allowCoreThreadTimeOut=true,则timed一定为true二是如果allowCoreThreadTimeOut为false , 说明核心线程不需要退出,而非核心线程在idle等待keepAliveTime后需要销毁退出 。则判断当前workCount是否大于配置的corePoolSize,是的话则timed为true否则为false 。如果当前线程数超过了指定的最大核心线程数corePoolSize,则需要让工作队列为空时(说明线程池负载较低)部分idle线程退出 , 使得最终活跃的线程数减少到和corePoolSize一致 。从这里可以看到,核心与非核心线程的概念在ThreadPoolExecutor里是很弱的,不关心工作线程最初是以什么原因创建的都一视同仁,谁都可能被当作非核心线程而销毁退出 。
- timedOut标识当前工作线程是否因为poll拉取任务时出现了超时 。take永远不会返回null,因此只有poll在超时时会返回null,当poll返回值为null时 , 表明是等待了keepAliveTime时间后超时了 , 所以timedOut标识为true 。同时如果拉取任务时线程被中断了,则捕获InterruptedException异常,将timeOut标识为false(被中断的就不认为是超时) 。
- 当(workCount > maximumPoolSize)或者 (timed && timedOut)两者满足一个时 , 就说明当前线程应该要退出了 。此时将当前的workCount用cas的方式减去1,返回null代表获取任务失败即可;如果cas失败,则在for循环中重试 。但有一种情况是例外的(workCount <= 1 && !workQueue.isEmpty()),即当前工作线程数量恰好为1,且工作队列不为空(那么还需要当前线程继续工作把工作队列里的任务都消费掉,无论如何不能退出)
/*** 尝试着从阻塞队列里获得待执行的任务* @return 返回null代表工作队列为空 , 没有需要执行的任务; 或者当前worker线程满足了需要退出的一些条件*返回对应的任务* */private Runnable getTask() {boolean timedOut = false;for(;;) {int currentCtl = ctl.get();// 获得当前工作线程个数int workCount = workerCountOf(currentCtl);// 有两种情况需要指定超时时间的方式从阻塞队列workQueue中获取任务(即timed为true)// 1.线程池配置参数allowCoreThreadTimeOut为true,即允许核心线程在idle一定时间后被销毁//所以allowCoreThreadTimeOut为true时,需要令timed为true,这样可以让核心线程也在一定时间内获取不到任务(idle状态)而被销毁// 2.线程池配置参数allowCoreThreadTimeOut为false,但当前线程池中的线程数量workCount大于了指定的核心线程数量corePoolSize//说明当前有一些非核心的线程正在工作 , 而非核心的线程在idle状态一段时间后需要被销毁//所以此时也令timed为true , 让这些线程在keepAliveTime时间内由于队列为空拉取不到任务而返回null,将其销毁boolean timed = allowCoreThreadTimeOut || workCount > corePoolSize;// 有共四种情况不需要往下执行,代表// 1 (workCount > maximumPoolSize && workCount > 1)// 当前工作线程个数大于了指定的maximumPoolSize(可能是由于启动后通过setMaximumPoolSize调小了maximumPoolSize的值)// 已经不符合线程池的配置参数约束了,要将多余的工作线程回收掉// 且当前workCount > 1说明存在不止一个工作线程,意味着即使将当前工作线程回收后也还有其它工作线程能继续处理工作队列里的任务,直接返回null表示自己需要被回收// 2 (workCount > maximumPoolSize && workCount <= 1 && workQueue.isEmpty())// 当前工作线程个数大于了指定的maximumPoolSize(maximumPoolSize被设置为0了)// 已经不符合线程池的配置参数约束了 , 要将多余的工作线程回收掉// 但此时workCount<=1 , 说明将自己这个工作线程回收掉后就没有其它工作线程能处理工作队列里剩余的任务了// 所以即使maximumPoolSize设置为0 , 也需要等待任务被处理完,工作队列为空之后才能回收当前线程,否则还会继续拉取剩余任务// 3 (workCount <= maximumPoolSize && (timed && timedOut) && workCount > 1)// workCount <= maximumPoolSize符合要求// 但是timed && timedOut , 说明timed判定命中 , 需要以poll的方式指定超时时间,并且最近一次拉取任务超时了timedOut=true// 进入新的一次循环后timed && timedOut成立,说明当前worker线程处于idle状态等待任务超过了规定的keepAliveTime时间,需要回收当前线程// 且当前workCount > 1说明存在不止一个工作线程,意味着即使将当前工作线程回收后也还有其它工作线程能继续处理工作队列里的任务,直接返回null表示自己需要被回收// 4 (workCount <= maximumPoolSize && (timed && timedOut) && workQueue.isEmpty())// workCount <= maximumPoolSize符合要求// 但是timed && timedOut,说明timed判定命中,需要以poll的方式指定超时时间,并且最近一次拉取任务超时了timedOut=true// 进入新的一次循环后timed && timedOut成立,说明当前worker线程处于idle状态等待任务超过了规定的keepAliveTime时间,需要回收当前线程// 但此时workCount<=1,说明将自己这个工作线程回收掉后就没有其它工作线程能处理工作队列里剩余的任务了// 所以即使timed && timedOut超时逻辑匹配,也需要等待任务被处理完,工作队列为空之后才能回收当前线程,否则还会继续拉取剩余任务if ((workCount > maximumPoolSize || (timed && timedOut))&& (workCount > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(currentCtl)) {// 满足上述条件,说明当前线程需要被销毁了,返回nullreturn null;}// compareAndDecrementWorkerCount方法由于并发的原因cas执行失败,continue循环重试continue;}try {// 根据上面的逻辑的timed标识,决定以什么方式从阻塞队列中获取任务Runnable r = timed ?// timed为true,通过poll方法指定获取任务的超时时间(如果指定时间内没有队列依然为空,则返回)workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// timed为false,通过take方法无限期的等待阻塞队列中加入新的任务workQueue.take();if (r != null) {// 获得了新的任务,getWork正常返回对应的任务对象return r;}else{// 否则说明timed=true,且poll拉取任务时超时了timedOut = true;}} catch (InterruptedException retry) {// poll or take任务等待时worker线程被中断了,捕获中断异常// timeout = false,标识拉取任务时没有超时timedOut = false;}}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 我的Vue之旅 10 Gin重写后端、实现页面详情页 Mysql + Golang + Gin
- zk系列三:zookeeper实战之分布式锁实现
- 【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC
- OPPO手机怎么才能设置到自己心仪的彩铃
- 手机动态彩铃是怎么设置的(自己手机的彩铃能设置吗)
- 第2-1-5章 docker安装MinIO实现文件存储服务-springboot整合minio-minio全网最全的资料
- 【深入浅出 Yarn 架构与实现】2-1 Yarn 基础库概述
- Golang 实现时间戳和时间的转化
- 二叉搜索树 - C++ 实现
- 【深入浅出 Yarn 架构与实现】1-2 搭建 Hadoop 源码阅读环境