Future详解( 四 )

【3】汇总说明
1)说白了就是基于FutureTask 是单线程的任务,考虑可以等待获取返回结果,那么应该可以采用线程池的方法形成多任务并发的结果 。
2)故定义了CompletionService接口作为规范,ExecutorCompletionService类作为具体的实现类【作为管理者】,不然每次采用线程池来做的话都要自己定义去管理 。
3)当需要批量提交异步任务的时候建议你使用CompletionService 。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单 。
4)CompletionService能够让异步任务的执行结果有序化 。先执行完的先进入阻塞队列 , 利用这个特性,你可以轻松实现后续处理的有序性 , 避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求 。
5)线程池隔离 。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险 。
【4】示例展示
1)示例代码
public class CompletionServiceDemo {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建线程池ExecutorService executor = Executors.newFixedThreadPool(10);//创建CompletionServiceCompletionService<Integer> cs = new ExecutorCompletionService<>(executor);//异步向电商S1询价cs.submit(() -> getPriceByS1());//异步向电商S2询价cs.submit(() -> getPriceByS2());//异步向电商S3询价cs.submit(() -> getPriceByS3());//将询价结果异步保存到数据库for (int i = 0; i < 3; i++) {//从阻塞队列获取futureTaskInteger r = cs.take().get();executor.execute(() -> save(r));}executor.shutdown();}private static void save(Integer r) {System.out.println("保存询价结果:{}"+r);}private static Integer getPriceByS1() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(5000);System.out.println("电商S1询价信息1200");return 1200;}private static Integer getPriceByS2() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(8000);System.out.println("电商S2询价信息1000");return 1000;}private static Integer getPriceByS3()throws InterruptedException {TimeUnit.MILLISECONDS.sleep(3000);System.out.println("电商S3询价信息800");return 800;}}了解CompletableFuture【1】介绍
1)简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务 , 往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系 。如果要我们手动用 Fueture 实现,是非常麻烦的 。
2)CompletableFuture是Future接口的扩展和增强 。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展 , 完美地弥补了Future上述的种种问题 。更为重要的是,CompletableFuture实现了对任务的编排能力 。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式 。从某种程度上说,这项能力是它的核心能力 。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护 。
3)CompletableFuture除了实现Future接口还实现了CompletionStage接口 。
4)CompletionStage接口: 执行某一个阶段,可向下执行后续阶段 。异步执行 , 默认线程池是ForkJoinPool.commonPool() 。
【2】常用方法
1)描述依赖关系:
1.thenApply() 把前面异步任务的结果,交给后面的Function
2.thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回
2)描述and聚合关系:
1.thenCombine:任务合并,有返回值
2.thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗 , 无返回值 。
3.runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable) 。
3)描述or聚合关系:
1.applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值 。
2.acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值 。
3.runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable) 。
4)并行执行:
1.CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行
【3】创建异步操作
1)CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable)public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)2)这四个方法区别在于:

推荐阅读