CompletableFuture中的API使用说明

您所在的位置:网站首页 get方法返回值不同重载 CompletableFuture中的API使用说明

CompletableFuture中的API使用说明

2023-10-24 23:03| 来源: 网络整理| 查看: 265

CompletableFuture与Future有什么不同 开发背景

在使用多线程处理任务时,经常会需要等待某一阶段的任务执行完成之后,根据阶段结果再开启新的异步任务。

这个逻辑可以使用Future来实现,通过Future的isDone方法或get方法来判断异步任务是否完成或获取结果。这种方法的问题在于,isDone轮询会消耗CPU资源,并且不能够及时的获取的任务完成的状态;get方法会使调用的线程阻塞,无法处理后续逻辑。

CompletableFuture提供了回调机制和丰富的Stream API,可以很好的完成异步任务的回调和后续任务处理。并且CompletableFuture可以很好的描述各个异步任务之间的关系,让逻辑更清晰直观。内部的无锁并发栈也能够高性能的完成异步任务之间的并行调用。

对比举例

模拟场景:

小白去餐厅吃饭,点了一份西红柿鸡蛋和一碗米饭。点完之后就完王者等吃饭。 此时服务员开始做米饭,厨师开始炒菜。做好之后由服务员打饭上菜,叫小白吃饭。

Future实现 @Test public void FutureExample2() throws ExecutionException, InterruptedException { ExecutorService waiters = Executors.newFixedThreadPool(2); ExecutorService cookers = Executors.newFixedThreadPool(2); printTimeAndThread("小白进入餐厅"); printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭"); Future makeRice = waiters.submit(() -> { ThreadTools.sleepSeconds(2); printTimeAndThread("饭好了"); return "米饭"; }); Future cookDish = cookers.submit(() -> { ThreadTools.sleepSeconds(2); printTimeAndThread("菜好了"); return "番茄炒蛋"; }); Future getRice = waiters.submit(() -> { final long start = System.currentTimeMillis(); String rice = makeRice.get(); String dish = cookDish.get(); ThreadTools. sleepSeconds(2); printTimeAndThread(String.format("服务员打饭完成,耗时%d", System.currentTimeMillis() - start)); return String.format("%s + %s 好了", dish, rice); }); printTimeAndThread("小白在打王者"); printTimeAndThread(String.format("%s ,小白开吃", getRice.get())); }

结果

1634092703854 | main | 小白进入餐厅 1634092703854 | main | 小白点了 番茄炒蛋 + 一碗米饭 1634092703856 | main | 小白在打王者 1634092705874 | pool-2-thread-1 | 菜好了 1634092705874 | pool-1-thread-1 | 饭好了 1634092707884 | pool-1-thread-2 | 服务员打饭完成,耗时4028 1634092707885 | main | 番茄炒蛋 + 米饭 好了 ,小白开吃 CompletableFuture实现 @Test public void CPExample2() { printTimeAndThread("小白进入餐厅"); printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭"); CompletableFuture makeRice = CompletableFuture.supplyAsync(() -> { sleepSeconds(300); FutureTaskTest.printTimeAndThread("做米饭"); return "米饭"; }); CompletableFuture makeDish = CompletableFuture.supplyAsync(() -> { sleepSeconds(200); printTimeAndThread("厨师炒菜"); return "番茄炒蛋"; }); final CompletableFuture allDone = makeDish.thenCombine(makeRice, (dish, rice) -> { final long start = System.currentTimeMillis(); sleepSeconds(200); printTimeAndThread(String.format("服务员打饭完成,耗时%d", System.currentTimeMillis() - start)); return String.format("%s + %s 好了", dish, rice); }); printTimeAndThread("小白在打王者"); printTimeAndThread(String.format("%s ,小白开吃", allDone.join())); } //代码引用自:https://gitee.com/phui/share-concurrent/tree/master

结果

1634092882975 | main | 小白进入餐厅 1634092882975 | main | 小白点了 番茄炒蛋 + 一碗米饭 1634092882981 | main | 小白在打王者 1634092885005 | ForkJoinPool.commonPool-worker-2 | 厨师炒菜 1634092886001 | ForkJoinPool.commonPool-worker-1 | 做米饭 1634092888010 | ForkJoinPool.commonPool-worker-1 | 服务员打饭完成,耗时2009 1634092888011 | main | 番茄炒蛋 + 米饭 好了 ,小白开吃 不同点 CompletableFuture是在前两个任务完成后,自动调用后续任务的;而Future则阻塞了一个线程,用来等待前两个任务完成。 Future需要显示的指定线程池,而CompletableFuture可以默认使用ForkJoinPool。 CompletableFuture可以使用类似Stream表达式的API表现出任务之间的逻辑关系,而Future不能在语法上直观的表示出这个关系。 CompletableFuture获取结果的join方法抛出的是RuntimeException,而get方法抛出的是受检异常InterruptedException, ExecutionException CompletableFuture的基本使用 创建一个异步任务

CompletableFuture提供了两个API用于开启一个异步任务。分别是使用Runnable接口对象作为参数的runAsync方法与使用Supplier接口对象对作为参数的supplyAsync方法。两种API的区别就是Runnable接口与Supplier接口的区别,即runAsync方法没有返回值,而supplyAsync方法有返回值。

public static CompletableFuture runAsync(Runnable runnable) public static CompletableFuture supplyAsync(Supplier supplier)

两个方法使用默认的ForkJoinPool作为线程池执行异步任务。如果需要指定其他的线程池,可以使用以下重载API:

public static CompletableFuture runAsync(Runnable runnable,Executor executor) public static CompletableFuture supplyAsync(Supplier supplier,Executor executor)

使用举例:

@Test public void CompletableFutureStart() { CompletableFuture.runAsync(() -> { ThreadTools.sleepSeconds(1); System.out.printf("run on %s %n", Thread.currentThread().getName()); }); CompletableFuture.supplyAsync(() -> { ThreadTools.sleepSeconds(2); System.out.printf("run on %s %n", Thread.currentThread().getName()); return "success"; }); ThreadTools.sleepSeconds(5); }

执行结果:

run on ForkJoinPool.commonPool-worker-1 run on ForkJoinPool.commonPool-worker-2 获取任务结果

与Future的区别 CompletableFuture比Future多提供了两种获取结果的方法:getNow与Join。

public T get() public T get(long timeout, TimeUnit unit) public T getNow(T valueIfAbsent) public T join() getNow类似于Java8的Stream中的optional.orElse()方法,当结果不存在或没有完成时返回给定的结果。 join与get类似,都会阻塞线程,二者区别在于抛出的异常不同。join可能抛出unchecked exception,不需要强制try catch。get在方法签名中声明了受检异常,必须要在编码时进行处理。

使用举例

@Test public void CompletableFutureGet() { try { final Void unused = CompletableFuture.runAsync(() -> { ThreadTools.sleepSeconds(1); System.out.printf("run on %s %n", Thread.currentThread().getName()); }).get(); System.out.println("CompletableFuture task 1 result "+unused); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } final String result = CompletableFuture.supplyAsync(() -> { ThreadTools.sleepSeconds(2); System.out.printf("run on %s %n", Thread.currentThread().getName()); return "success"; }).join(); System.out.println("CompletableFuture task2 result "+result); ThreadTools.sleepSeconds(5); }

结果

run on ForkJoinPool.commonPool-worker-1 CompletableFuture task 1 result null run on ForkJoinPool.commonPool-worker-1 CompletableFuture task2 result success 依赖于前置任务的回调 thenApplyXXX

作用:当前阶段(调用该方法的CompletionStage对象)非异常完成时,以当前阶段的结果为入参,执行给定的函数,函数有返回值。

public CompletionStage thenApply(Function other,Runnable action); public CompletionStage runAfterBothAsync(CompletionStage other,Runnable action); public CompletionStage runAfterBothAsync(CompletionStage other,Runnable action,Executor executor); thenCombineXXX

当调用该方法的前置任务与给点的前置任务都完成时,执行BiFunction函数。**函数的入参为前置两阶段的返回结果,函数无返回值。**阶段的结果CompletableFuture对象中result字段值为函数计算结果。

public CompletionStage thenCombine(CompletionStage other,Runnable action); public CompletionStage runAfterEitherAsync(CompletionStage other,Runnable action); public CompletionStage runAfterEitherAsync(CompletionStage other,Runnable action,Executor executor); 举例 /** * 依赖于两个前置任务的API举例 */ @Test public void bothOrAny() { final CompletableFuture task1 = CompletableFuture.supplyAsync(() -> { ThreadTools.sleepSeconds(1); return "task 1"; }); final CompletableFuture task2 = CompletableFuture.supplyAsync(() -> { ThreadTools.sleepSeconds(2); return "task 2"; }); //依赖于两个前置任务都完成 final CompletableFuture both = task2.thenAcceptBothAsync(task1, (r1, r2) -> { System.out.printf("两个任务都已经完成。结果1:%s ,结果2: %s%n", r1, r2); }); //依赖于两个前置任务,但有一个完成即可 final CompletableFuture any = task1.acceptEitherAsync(task2, (r) -> { System.out.printf("有一个任务已经完成,结果是:%s%n", r); }); //阻塞主线程结束 any.runAfterBoth(both, ()->{}).join(); }

输出结果

有一个任务已经完成,结果是:task 1 两个任务都已经完成。结果1:task 2 ,结果2: task 1 依赖于多个任务的回调

除了依赖于一个、两个前置任务的API,更常用的是基于批量任务完成时机的判断。CompletableFuture也提供了对应的api。

任意一个完成后--anyOf public static CompletableFuture anyOf(CompletableFuture... cfs)

CompletableFuture提供了静态方法,用来表示当所有的前置任务中,有任意一个任务完成(包括异常)的阶段。可以在此阶段后执行CompletableFuture的其他操作。

所有任务都完成后--allOf public static CompletableFuture allOf(CompletableFuture... cfs)

与anyOf类似,allOf静态方法表示传入的所有前置任务都完成的阶段。可以在此阶段后执行CompletableFuture的其他操作。

**注:**如果异步任务执行异常,也看作是完成状态。

举例 @Test public void anyANdAll() { final ExecutorService threadPool = Executors.newFixedThreadPool(10); final long startAll = System.currentTimeMillis(); final CompletableFuture[] completableFutures = IntStream.rangeClosed(1, 10).mapToObj(index -> CompletableFuture.supplyAsync(() -> { final long start = System.currentTimeMillis(); try { Thread.sleep(100 * index); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("线程'%s'执行完成,花费%d毫秒%n", Thread.currentThread().getName(), System.currentTimeMillis() - start); return index; }, threadPool)).toArray(CompletableFuture[]::new); final Object result = CompletableFuture.anyOf(completableFutures).join(); System.out.printf("any of 得到结果:%s,共花费%d毫秒%n", result, System.currentTimeMillis() - startAll); CompletableFuture.allOf(completableFutures).join();//阻塞主线程 System.out.printf("all of 得到结果,共花费%d毫秒%n", System.currentTimeMillis() - startAll); }

输出结果

线程'pool-1-thread-1'执行完成,花费110毫秒 any of 得到结果:1,共花费122毫秒 线程'pool-1-thread-2'执行完成,花费205毫秒 线程'pool-1-thread-3'执行完成,花费300毫秒 线程'pool-1-thread-4'执行完成,花费411毫秒 线程'pool-1-thread-5'执行完成,花费505毫秒 线程'pool-1-thread-6'执行完成,花费615毫秒 线程'pool-1-thread-7'执行完成,花费710毫秒 线程'pool-1-thread-8'执行完成,花费804毫秒 线程'pool-1-thread-9'执行完成,花费915毫秒 线程'pool-1-thread-10'执行完成,花费1011毫秒 all of 得到结果,共花费1024毫秒 异常处理

处理异常的API与其他回调API类似,但触发时机不同而已。

exceptionally

作用:当前阶段异常时,以异常为入参执行给定的函数。当前阶段非异常执行完成时,不执行逻辑,方法返回与当前阶段相同的非异常返回值。

public CompletionStage exceptionally(Function fn);

举例

@Test public void CPExceptional() { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("抛出的异常"); }).exceptionally((e) -> { System.out.printf("异常消息%s%n", e.getMessage()); return "new result"; }).thenAccept(r -> { System.out.printf("最终结果1:%s%n", r); }); CompletableFuture.supplyAsync(() -> "没有抛出异常") .exceptionally((e) -> { System.out.printf("异常消息%s%n", e.getMessage()); return "new result"; }).thenAccept(r -> System.out.printf("最终结果2:%s%n", r)); }

输出结果

异常消息java.lang.RuntimeException: 抛出的异常 最终结果1:new result 最终结果2:没有抛出异常 API的分类与总结 三种基本任务类型

CompletableFuture对于每一种任务执行时机,都提供了多个API。这些APi可以与接口参数的函数式接口形成对应关系。 只关心前置任务完成的Runable:这种API以Runable函数式接口为参数,Runable接口的特点是没有入参,没有返回值。因此这是一种只关系前置任务是否完成的API。对应CompletableFuture中的xxxRun方法。

关心前置任务结果的消费者Consumer:这种API以Consumer函数式接口为参数,Consumer接口有入参,没有返回值。因此这是一种关心前置任务结果,但只消费前置结果的API。对应CompletableFuture中的xxxAccept方法。

关心前置任务的生产者Function:这种API以Function函数传接口为参数,Function接口有入参,有返回值。因此这是一种可以对前置任务的结果进行操作的API。对应的CompletableFuture中的xxxApply方法。

三种重载方法

CompletableFuture描述任务关系的api中,同一个功能的方法大都提供了三个重载方法。类似:xxx,xxxAsync,xxxAsync(...Executor executor)。三个方法的区别如下:

xxx方法,表示当前任务不会主动提交到线程池中执行。

xxxAsync方法,表示任务需要在默认的Fork/Join线程池中异步执行。

xxxAsync(...Executor executor)方法,表示任务需要在指定的线程池中异步执行。

@Test public void thenApplyRunTime() { System.out.println("前置任务已完成的情况测试"); final CompletableFuture voidCompletableFuture = CompletableFuture.runAsync(() -> { System.out.printf("前置任务运行线程:%s%n", Thread.currentThread().getName()); }); ThreadTools.sleepSeconds(1); voidCompletableFuture.thenRun(() -> System.out.printf("thenRun 运行线程:%s%n", Thread.currentThread().getName())).join(); System.out.println("----------------------------------------------"); System.out.println("前置任务未完成的情况测试"); CompletableFuture.runAsync(() -> { System.out.printf("前置任务运行线程:%s%n", Thread.currentThread().getName()); ThreadTools.sleepSeconds(1); }).thenRun(() -> System.out.printf("thenRun 运行线程:%s%n", Thread.currentThread().getName())) .thenRunAsync(()->System.out.printf("thenRunAsync 运行线程:%s%n", Thread.currentThread().getName())) .join(); System.out.println("----------------------------------------------"); System.out.println("前置任务未完成,同时有其他在线程池中执行的任务"); final CompletableFuture voidCompletableFuture1 = CompletableFuture.runAsync(() -> { ThreadTools.sleepSeconds(2); System.out.printf("前置任务运行线程:%s%n", Thread.currentThread().getName()); }); voidCompletableFuture1.thenRun(() ->{ ThreadTools.sleepMillis(100); System.out.printf("thenRun 运行线程:%s%n", Thread.currentThread().getName());}); voidCompletableFuture1.thenRun(() ->{ ThreadTools.sleepMillis(100); System.out.printf("thenRun 运行线程:%s%n", Thread.currentThread().getName());}); voidCompletableFuture1.thenRun(() ->{ ThreadTools.sleepMillis(100); System.out.printf("thenRun 运行线程:%s%n", Thread.currentThread().getName());}); voidCompletableFuture1.thenRun(() ->{ ThreadTools.sleepMillis(100); System.out.printf("thenRun 运行线程:%s%n", Thread.currentThread().getName());}); voidCompletableFuture1.thenRun(() ->{ ThreadTools.sleepMillis(100); System.out.printf("thenRun 运行线程:%s%n", Thread.currentThread().getName());}); voidCompletableFuture1.thenRunAsync(()->System.out.printf("thenRunAsync 运行线程:%s%n", Thread.currentThread().getName())) .join(); ThreadTools.sleepSeconds(10); }

结果输出

前置任务已完成的情况测试 前置任务运行线程:ForkJoinPool.commonPool-worker-9 thenRun 运行线程:main ---------------------------------------------- 前置任务未完成的情况测试 前置任务运行线程:ForkJoinPool.commonPool-worker-9 thenRun 运行线程:ForkJoinPool.commonPool-worker-9 thenRunAsync 运行线程:ForkJoinPool.commonPool-worker-9 ---------------------------------------------- 前置任务未完成,同时有其他在线程池中执行的任务 前置任务运行线程:ForkJoinPool.commonPool-worker-2 thenRunAsync 运行线程:ForkJoinPool.commonPool-worker-9 thenRun 运行线程:ForkJoinPool.commonPool-worker-2 thenRun 运行线程:ForkJoinPool.commonPool-worker-9 thenRun 运行线程:ForkJoinPool.commonPool-worker-2 thenRun 运行线程:ForkJoinPool.commonPool-worker-9 thenRun 运行线程:ForkJoinPool.commonPool-worker-2 Process finished with exit code 0 分类

CompletableFuture的API有三种维度可以进行分类:

描述的任务关系 任务的基本类型 API的重载方法。 三种分类维度相互组合,构成了CompletableFuture中的大部分API。

CompletebleFuture(1)



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3