我们现在来仔细分析一下,线程退出线程池的时候是如何保证线程池当中总的线程数是不小于 corePoolSize 的!首先整体的框架是使用 CAS 进行实现,具体代码为 do ... while 操作 , 然后在 while 操作里面使用 CAS 进行测试替换 , 如果没有成功再次获取 ,当线程池当中核心线程的数目小于等于 corePoolSize 的时候也需要退出循环,因为线程池当中线程的个数不能小于 corePoolSize。因此使用 break 跳出循环的线程是不会退出线程池的 。
线程池实现的BUG在我们自己实现的线程池当中当线程退出的时候,workers 当中还保存这指向这个线程的对象,但是当线程退出的时候我们还没有在 workers 当中删除这个对象,因此这个线程对象不会被垃圾回收器收集掉 , 但是我们这个只是一个线程池实现的例子而已 , 并不用于生产环境 , 只是为了帮助大家理解线程池的原理 。
完整代码package cscore.concurrent.java.threadpoolv2;import java.util.ArrayList;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public class ThreadPool { private AtomicInteger ct = new AtomicInteger(0); // 当前在执行任务的线程个数 private int corePoolSize; private int maximumPoolSize; private long keepAliveTime; private TimeUnit unit; private BlockingQueue<Runnable> taskQueue; private RejectPolicy policy; private ArrayList<Worker> workers = new ArrayList<>(); private volatile boolean isStopped; private boolean useTimed; public int getCt() { return ct.get(); } public ThreadPool(int corePoolSize, int maximumPoolSize, TimeUnit unit, long keepAliveTime, RejectPolicy policy , int maxTasks) { // please add -ea to vm options to make assert keyword enable assert corePoolSize > 0; assert maximumPoolSize > 0; assert keepAliveTime >= 0; assert maxTasks > 0; this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.unit = unit; this.policy = policy; this.keepAliveTime = keepAliveTime; taskQueue = new ArrayBlockingQueue<Runnable>(maxTasks); useTimed = keepAliveTime != 0; } /** * * @param runnable 需要被执行的任务 * @param max 是否使用 maximumPoolSize * @return boolean */ public synchronized boolean addWorker(Runnable runnable, boolean max) { if (ct.get() >= corePoolSize && !max) return false; if (ct.get() >= maximumPoolSize && max) return false; Worker worker = new Worker(runnable); workers.add(worker); Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1)); thread.start(); return true; } // 下面这个方法是向线程池提交任务 public void execute(Runnable runnable) throws InterruptedException { checkPoolState(); if (addWorker(runnable, false) // 如果能够加入新的线程执行任务 加入成功就直接返回 || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 说明提交任务失败 任务队列已经满了 || addWorker(runnable, true)) // 使用能够使用的最大的线程数 (maximumPoolSize) 看是否能够产生新的线程 return; // 如果任务队列满了而且不能够加入新的线程 则拒绝这个任务 if (!taskQueue.offer(runnable)) reject(runnable); } private void reject(Runnable runnable) throws InterruptedException { switch (policy) { case ABORT: throw new RuntimeException("task queue is full"); case CALLER_RUN: runnable.run(); case DISCARD: return; case DISCARD_OLDEST: // 放弃等待时间最长的任务 taskQueue.poll(); execute(runnable); } } private void checkPoolState() { if (isStopped) { // 如果线程池已经停下来了,就不在向任务队列当中提交任务了 throw new RuntimeException("thread pool has been stopped, so quit submitting task"); } } public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException { checkPoolState(); FutureTask<V> futureTask = new FutureTask<>(task); execute(futureTask); return futureTask; } // 强制关闭线程池 public synchronized void stop() { isStopped = true; for (Worker worker : workers) { worker.stopWorker(); } } public synchronized void shutDown() { // 先表示关闭线程池 线程就不能再向线程池提交任务 isStopped = true; // 先等待所有的任务执行完成再关闭线程池 waitForAllTasks(); stop(); } private void waitForAllTasks() { // 当线程池当中还有任务的时候 就不退出循环 while (taskQueue.size() > 0) { Thread.yield(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } class Worker implements Runnable { private Thread thisThread; private final Runnable firstTask; private volatile boolean isStopped; public Worker(Runnable firstTask) { this.firstTask = firstTask; } @Override public void run() { // 先执行传递过来的第一个任务 这里是一个小的优化 让线程直接执行第一个任务 不需要 // 放入任务队列再取出来执行了 firstTask.run(); thisThread = Thread.currentThread(); while (!isStopped) { try { Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take(); if (task == null) { int i; boolean exit = true; if (ct.get() > corePoolSize) { do{ i = ct.get(); if (i <= corePoolSize) { exit = false; break; } }while (!ct.compareAndSet(i, i - 1)); if (exit) { return; } } }else { task.run(); } } catch (InterruptedException e) { // do nothing } } } public synchronized void stopWorker() { if (isStopped) { throw new RuntimeException("thread has been interrupted"); } isStopped = true; thisThread.interrupt(); } }}
推荐阅读
- 驱动开发:内核枚举进程与线程ObCall回调
- [WPF] 抄抄超强的苹果官网滚动文字特效实现
- 河南移动卡套餐 移动卡套餐一览表
- reportportal 集成 robotframework 自动化执行及结果可视化
- 驱动开发:内核枚举Registry注册表回调
- 魔镜物语喜羊羊联动角色技能属性是什么样的
- 微信怎么把一条消息或链接转发给自己(微信消息怎么全部转发)
- 手机怎么自动截屏(如何停止手机自动截屏)
- 烟雨江湖蓝鲸屿活动玩法思路是什么
- 如何撤掉自己建的群 微信如何建立自己的微信群建群后如何撤销呢