Java8 CompletableFuture 异步任务

您所在的位置:网站首页 多线程completeablefuture Java8 CompletableFuture 异步任务

Java8 CompletableFuture 异步任务

2023-08-07 22:53| 来源: 网络整理| 查看: 265

更多 Java 并发编程方面的文章,请参见文集《Java 并发编程》

所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。

关于 Java Future,请首先参见

Java 实现多线程的三种方式 Java FutureTask 可异步执行的任务

JDK5 新增了 Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。 例如:

public static void main(String[] args) throws Exception { ExecutorService es = Executors.newSingleThreadExecutor(); // 在 Java8 中,推荐使用 Lambda 来替代匿名 Callable 实现类 Future f = es.submit(() -> { System.out.println(Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 123; }); // 当前 main 线程阻塞,直至 future 得到值 System.out.println(f.get()); es.shutdown(); }

阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式呢?即当计算结果完成及时通知监听者。(例如通过回调的方式)

关于 Future 接口,还有如下一段描述:

The Future interface was added in Java 5 to serve as a result of an asynchronous computation, but it did not have any methods to combine these computations or handle possible errors. 不能很好地组合多个异步任务,也不能处理可能的异常。

CompletableFuture

Java 8 中, 新增加了一个包含 50 个方法左右的类 CompletableFuture,它提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

对于阻塞或者轮询方式,依然可以通过 CompletableFuture 类的 CompletionStage 和 Future 接口方式支持。

CompletableFuture 类声明了 CompletionStage 接口,CompletionStage 接口实际上提供了同步或异步运行计算的舞台,所以我们可以通过实现多个 CompletionStage 命令,并且将这些命令串联在一起的方式实现多个命令之间的触发。

我们可以通过 CompletableFuture.supplyAsync(this::sendMsg); 这么一行代码创建一个简单的异步计算。在这行代码中,supplyAsync 支持异步地执行我们指定的方法,这个例子中的异步执行方法是 sendMsg。当然,我们也可以使用 Executor 执行异步程序,默认是 ForkJoinPool.commonPool()。

我们也可以在异步计算结束之后指定回调函数,例如 CompletableFuture.supplyAsync(this::sendMsg) .thenAccept(this::notify); 这行代码中的 thenAccept 被用于增加回调函数,在我们的示例中 notify 就成了异步计算的消费者,它会处理计算结果。

CompletionStage 接口

A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages. 一个可能执行的异步计算的某个阶段,在另一个CompletionStage完成时执行一个操作或计算一个值。 一个阶段完成后,其计算结束。但是,该计算阶段可能会触发下一个计算阶段。

最简单的例子

CompletableFuture 实际上也实现了 Future 接口:

public class CompletableFuture implements Future, CompletionStage

所以我们也可以利用 CompletableFuture 来实现基本的 Future 功能,例如:

public static void main(String[] args) throws Exception { CompletableFuture future = new CompletableFuture(); // 在 Java8 中,推荐使用 Lambda 来替代匿名 Runnable 实现类 new Thread( () -> { try { // 模拟一段耗时的操作 Thread.sleep(2000); future.complete("I have completed"); } catch (Exception e) { } } ).start(); System.out.println(future.get()); }

此时此刻主线程 future.get() 将得到字符串的结果 I have completed,同时完成回调以后将会立即生效。注意 complete() 方法只能调用一次,后续调用将被忽略。

注意:get() 方法可能会抛出异常 InterruptedException 和 ExecutionException。

如果我们已经知道了异步任务的结果,我们也可以直接创建一个已完成的 future,如下:

public static void main(String[] args) throws Exception { // Returns a new CompletableFuture that is already completed with the given value. CompletableFuture future = CompletableFuture.completedFuture("I have completed"); System.out.println(future.get()); }

如果在异步执行过程中,我们觉得执行会超时或者会出现问题,我们也可以通过 cancle() 方法取消,此时调用 get() 方法时会产生异常 java.util.concurrent.CancellationException,代码如下:

public static void main(String[] args) throws Exception { CompletableFuture future = new CompletableFuture(); // 在 Java8 中,推荐使用 Lambda 来替代匿名 Runnable 实现类 new Thread( () -> { try { // 模拟一段耗时的操作 Thread.sleep(2000); future.cancel(false); } catch (Exception e) { } } ).start(); System.out.println(future.get()); } 使用工厂方法创建 CompletableFuture

在上述的代码中,我们手动地创建 CompletableFuture,并且手动的创建一个线程(或者利用线程池)来启动异步任务,这样似乎有些复杂。

其实我们可以利用 CompletableFuture 的工厂方法,传入 Supplier 或者 Runnable 的实现类,直接得到一个 CompletableFuture 的实例:

public static CompletableFuture supplyAsync(Supplier supplier) public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) public static CompletableFuture runAsync(Runnable runnable) public static CompletableFuture runAsync(Runnable runnable, Executor executor)

第一个和第三个方法,没有 Executor 参数,将会使用 ForkJoinPool.commonPool() (全局的,在 JDK8 中介绍的通用池),这适用于 CompletableFuture 类中的大多数的方法。

Runnable 接口方法 public abstract void run(); 没有返回值 Supplier 接口方法 T get(); 有返回值。如果你需要处理异步操作并返回结果,使用前两种 Supplier 方法

一个小的 Tips:

Both Runnable and Supplier are functional interfaces that allow passing their instances as lambda expressions thanks to the new Java 8 feature. 使用 Lambda 表达式来传入 Supplier 或者 Runnable 的实现类。

一个示例代码如下:

public static void main(String[] args) throws Exception { // 在 Java8 中,推荐使用 Lambda 来替代匿名 Supplier 实现类 CompletableFuture future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (Exception e) { } return "I have completed"; }); System.out.println(future.get()); } 转换和作用于异步任务的结果 (thenApply)

我们可以叠加功能,把多个 future 组合在一起等

public CompletableFuture thenApply(Function... cfs)

一个示例代码如下:

public static void main(String[] args) throws Exception { CompletableFuture future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2); // 这个方法不会合并结果,可以看到他的返回值是 Void 类型 combinedFuture.get(); // 我们需要手动来处理每一个并行异步任务的结果 String combined = Stream.of(future1, future2) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); System.out.println(combined); // Hello World }

有时候我们可能不需要等待所有的异步任务都执行完毕,只要任何一个任务完成就返回结果。我们可以使用 anyOf() 方法:

public static CompletableFuture anyOf(CompletableFuture... cfs)

一个示例代码如下:

public static void main(String[] args) throws Exception { CompletableFuture future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (Exception e) { } return "World"; } ); CompletableFuture combinedFuture = CompletableFuture.anyOf(future1, future2); System.out.println(combinedFuture.get()); // Hello } 异常的处理

我们可以在 handle() 方法里处理异常:

public CompletableFuture handle(BiFunction


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3