背景:
CompletableFuture 字面翻译过来,就是 “可完成的 Future”。同传统的 Future 相比较,CompletableFuture 能够主动设置计算的结果值(主动终结计算过程,即 completable),从而在某些场景下主动结束阻塞等待。而 Future 由于不能主动设置计算结果值,一旦调用 get () 进行阻塞等待,要么当计算结果产生,要么超时,才会返回。
CompletableFuture 说白了其实就是为了解决 Future 的问题(阻塞),而生!!!
下面总结 CompletableFuture 的常用 api
创建 CompletableFuture
实例方法:
//实例方法
CompletableFuture<String> completableFutureOne = new CompletableFuture<>();
Supplier<?> task=new Supplier<Object>() {
@Override
public Object get() {
return null;
}
};
CompletableFuture<?> completableFuture = completableFutureOne.supplyAsync(task);
静态方法:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnable = () ->
System.out.println (“执行无返回结果的异步任务”);
System.out.println(CompletableFuture.runAsync(runnable).get());
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行有返回值的异步任务");
return "Hello World";
});
String result = future.get();
System.out.println(result);
}
# 2、whenComplete - 第一个任务结束,对其结果处理 (handly 的作用一样)
结果处理就是当 future 任务完成时,对任务的结果做处理工作!或异常情况处理!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } System.out.println("执行结束1!"); return 5; }); future.whenComplete(new BiConsumer<Integer, Throwable>() { @Override public void accept(Integer t, Throwable action) { t=t+1; // int i = 12 / 0; System.out.println("执行完成2!"+action.getMessage()); } }) .exceptionally(new Function<Throwable, Integer>() { @Override public Integer apply(Throwable t) { System.out.println("执行失败3:" + t.getMessage()); return null; } }).join(); Integer integer = future.get(); System.out.println("=>integer"+integer); }
1、whenComplete 只是对任务运行结束后,拿到任务结果,做个处理,并且如果任务执行有异常,会监听到异常!
2、如果 whenComplete 本身有异常,那么需要单独加 exceptionally 来监听异常!
3、最终 future.get () 拿到的还是任务 1 的结果
4、如果任务有异常,future.get () 拿到会抛出异常!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 执行结束1! 执行失败3:java.lang.ArithmeticException: / by zero Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at top.lisicheng.wmd.CompleableFutureTest2.main(CompleableFutureTest2.java:83) Caused by: java.lang.ArithmeticException: / by zero at top.lisicheng.wmd.CompleableFutureTest2.lambda$main$0(CompleableFutureTest2.java:65) at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
# 3、thenApply - 第一个任务结束,可能还有第二、第三个任务,且后面一个任务,需要用到前面任务的返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static void test4(String[] args) throws ExecutionException, InterruptedException { /** * public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) * public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) * public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) * 总结:thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果, * 并返回一个具有处理结果的Future对象。 * */ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int result = 100; System.out.println("一阶段:" + result); return result; }).thenApply(number -> { int result = number * 3; System.out.println("二阶段:" + result); return result; }).thenApply(number -> { int result = number * 3; System.out.println("三阶段:" + result); return result; }); System.out.println("最终结果:" + future.get()); }
# 4、thenCompose - 跟上面一样的作用:
thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。
1 2 3 public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ; public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(3); System.out.println("第一阶段:" + number); return number; } }).thenCompose(new Function<Integer, CompletionStage<Integer>>() { @Override public CompletionStage<Integer> apply(Integer param) { return CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = param * 2; System.out.println("第二阶段:" + number); return number; } }); } }); System.out.println("最终结果: " + future.get()); }
1 2 3 4 5 那么 thenApply 和 thenCompose 有何区别呢: thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture; thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算, 是生成一个新的CompletableFuture。
下面用一个例子对对比:
1 2 3 4 5 6 7 8 9 public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> result1 = future.thenApply(param -> param + " World"); CompletableFuture<String> result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World")); System.out.println(result1.get()); System.out.println(result2.get()); }
# 5、结果消费
1 2 3 thenAccept系列:对单个结果进行消费 thenAcceptBoth系列:对两个结果进行消费 thenRun系列:不关心结果,只对结果执行Action
只会拿到上个任务的值,然后对值进行消费,但是绝对不会产生新的值
这是跟上面任务中间 转换的,最大的区别
消费结果还包括 thenRun
thenRun 跟 thenAccept 的区别是,它不仅不产生新的值,还不消费上个任务的值,只是自己做一个业务处理。
6、结果组合
thenCombine - 合并两个线程任务的结果,并进一步处理。
applyToEither - 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。
acceptEither - 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。
runAfterEither - 两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。
runAfterBoth - 两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。
anyOf-anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。
allOf-allOf 方法用来实现多 CompletableFuture 的同时返回。
代码实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 @RestController @RequestMapping("/test") public class TestController { public static ExecutorService threadPool = new ThreadPoolExecutor( 10, 40, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( 16 )); /** * CompletableFuture 测试 * * @return */ @ApiOperation("CompletableFuture 测试") @PostMapping("test") public Object test() { Map<Object, Object> result = new HashMap<>(); CompletableFuture.allOf( CompletableFuture.runAsync(() -> { System.out.println("执行1"); result.put("key1", seslectData1()); }, threadPool), CompletableFuture.runAsync(() -> { System.out.println("执行2"); result.put("key2", seslectData2()); }, threadPool), CompletableFuture.runAsync(() -> { System.out.println("执行3"); result.put("key3", seslectData3()); }, threadPool) ).join(); return ResponseUtil.ok(result); } public String seslectData1() { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "key1"; } public String seslectData2() { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "key2"; } public String seslectData3() { try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } return "key3"; } }
# 关于我
Brath 是一个热爱技术的 Java 程序猿,公众号「InterviewCoder」定期分享有趣有料的精品原创文章!
非常感谢各位人才能看到这里,原创不易,文章如果有帮助可以关注、点赞、分享或评论,这都是对我的莫大支持!