Future详解( 三 )


3)FutureTask 一般是结合线程池使用,然后额外采用FutureTask获取结果 。
【4】Future的局限性
从本质上说,Future表示一个异步计算的结果 。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果 。在异步计算中,Future确实是个非常优秀的接口 。但是 , 它的本身也确实存在着许多限制:
1)并发执行多任务:Future只提供了get()方法来获取结果 , 并且是阻塞的 。所以 , 除了等待你别无他法;
2)无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
3)无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
4)没有异常处理:Future接口中没有关于异常处理的方法;
了解CompletionService接口【1】介绍
1)CompletionService 接口是一个独立的接口,并没有扩展 ExecutorService。其默认实现类是ExecutorCompletionService;
2)接口CompletionService 的功能是:以异步的方式一边执行未完成的任务,一边记录、处理已完成任务的结果 。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了 。
3)简单来说,CompletionService 就是监视着 Executor线程池执行的任务,用 BlockingQueue 将完成的任务的结果存储下来 。(当然,这个也可以是程序员自己去实现,但是要不断遍历与每个任务关联的 Future,然后不断去轮询,判断任务是否已经完成,比较繁琐);
【2】源码展示
public interface CompletionService<V> {//提交一个 Callable 任务;一旦完成 , 便可以由take()、poll()方法获取Future<V> submit(Callable<V> task);//提交一个 Runnable 任务 , 并指定计算结果;Future<V> submit(Runnable task, V result);//获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待 。Future<V> take() throws InterruptedException;//获取并移除表示下一个已完成任务的 Future , 如果不存在这样的任务 , 则返回 null 。Future<V> poll();//获取并移除表示下一个已完成任务的 Future , 如果目前不存在这样的任务 , 则将等待指定的时间(如果有必要)Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;}了解ExecutorCompletionService类(CompletionService接口的实现类)【1】介绍
1)内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果 。
【2】源码分析
1)属性分析
//线程池private final Executor executor;//判断线程池是否继承抽象类private final AbstractExecutorService aes;//阻塞队列private final BlockingQueue<Future<V>> completionQueue;2)构造方法
//对于线程池必须定义,而阻塞队列会有默认的//而默认的LinkedBlockingQueue对于并发编程来说是存在隐患的(依据阿里手册来说,因为队列的无尽性会导致OOM)//所以一般考虑要你自己去定义阻塞队列public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;//如果是继承了抽象类的实现this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>();}public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {if (executor == null || completionQueue == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;this.completionQueue = completionQueue;}3)阻塞队列元素的定义
private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}//FutureTask里面的拓展方法,在run的时候会被调用,所以是做完任务了会自动提交到队列里面protected void done() { completionQueue.add(task); }private final Future<V> task;}4)实现接口的方法
//采用newTaskFor来封装非标准的取消//因为传入的Callable或Runnable,这种不是FutureTask,故需要封装private RunnableFuture<V> newTaskFor(Callable<V> task) {if (aes == null)return new FutureTask<V>(task);elsereturn aes.newTaskFor(task);}private RunnableFuture<V> newTaskFor(Runnable task, V result) {if (aes == null)return new FutureTask<V>(task, result);elsereturn aes.newTaskFor(task, result);}//下面是对接口定义的方法的实现public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture(f));return f;}public Future<V> submit(Runnable task, V result) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task, result);executor.execute(new QueueingFuture(f));return f;}public Future<V> take() throws InterruptedException {return completionQueue.take();}public Future<V> poll() {return completionQueue.poll();}public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {return completionQueue.poll(timeout, unit);}

推荐阅读