CompletableFuture使用说明

说明

CompletableFuture是JDK8中新增加的工具类,该类提供了丰富的API用于异步编程,根据作用不同,对应的方法可以被分为以下几个不同系列。

对象创建

1
2
3
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

这两个方法都用于创建CompletableFuture对象,唯一不同是前一个方法统一使用一个公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数,大量调用该方法可能会造成线程池资源耗尽,而后一个方法自己传入指定的线程池,行为更可控。

这里传入的主要参数是Supplier<U>类型的,因此传入的表达式必须是没有入参的。在创建完CompletableFuture对象之后,会自动地异步执行supplierget方法。

1
2
3
public static CompletableFuture<Void> runAsync(Runnable runnable)

public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

这一组方法与上面一组的区别是,在创建完CompletableFuture对象之后,会自动地异步执行runnable对象的run方法,而run方法是没有返回值的。

描述串行关系

1
2
3
4
5
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

这一组thenApply方法传入参数为Function<T,U>类型,因此表达式支持T类型的入参和U类型的出参,其中入参即为上一个CompletableFuture流程的结果。而三个方法之间的差异很容易看出来,有Async后缀则异步执行,否则同步执行,后续不再对此差异做说明。值得一提的是这里默认的线程池依然是那个ForkJoinPool的公共线程池。

1
2
3
4
5
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)

这一组thenAccept与其他组的差异是入参为Consumer<T>类型,即表达式的入参T类型,无返回值。

1
2
3
4
5
public CompletableFuture<Void> thenRun(Runnable action)

public CompletableFuture<Void> thenRunAsync(Runnable action)

public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)

这一组thenRun与其他组的差异是入参为Runnable类型,即表达式没有入参,也没有返回值。

1
2
3
4
5
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)

这一组thenComposethenApply比较相似,都是传入的Function对象,差异是thenCompose传入表达式的返回值是另一个CompletableFuture对象,因此适用于两个CompletableFuture流程的连接。

AND关系

1
2
3
4
5
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) 

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)

该系列方法用于对调用者和other两者的结果进行汇聚关系的操作,thenCombine这一组与其他组的区别在于入参为BiFunction<T,U,V>对象,即表达式既有入参,也有类型为V的返回值,其中两个入参分别为上一个CompletableFuture流程和other的结果。

1
2
3
4
5
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) 

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)

thenAcceptBoth这一组与其他组的区别在于入参为BiConsumer<T,U>类型,即表达式有入参无返回值。

1
2
3
4
5
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action)

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)

runAfterBoth这一组与其他组的区别在于入参为Runnable类型,即表达式没有入参也没有返回值。

OR关系

1
2
3
4
5
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)

该系列方法取调用者和other对象完成任务较快的那个线程进行后续处理。具体到applyToEither这一组和其他组的区别在于入参Function<T, U>,即表达式有T类型入参和U类型返回值,其中入参就是较快完成的那个CompletableFuture流程的结果。

1
2
3
4
5
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)

acceptEither这一组和其他组的区别在于入参类型Consumer<T>,即表达式有T类型入参,没有返回值。

1
2
3
4
5
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)

runAfterEither这一组和其他组的区别在于入参类型Runnable,即表达式没有入参也没有返回值。

异常处理

1
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

当遇到处理异常时,可以在exceptionally内处理异常,入参为Function<Throwable,T>,即表达式的入参为异常对象且可以返回T类型返回值。该方法作用类似同步编程的catch

1
2
3
4
5
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
1
2
3
4
5
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

whenCompletehandle两组的作用都类似同步编程的finally,因此无论是否发生异常都会执行这两组中的方法,两组区别在于入参BiConsumer<T,Throwable>BiFunction<T, Throwable, U>,前者的表达式没有返回值,而后者的表达式是有类型U的返回值的。

多流程处理

1
2
3
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOfanyOf可以处理多个流程的执行状态,allOf在所有流程完成后返回一个新建的CompletableFuture<Void>对象,anyOf只要有一个流程返回则返回该流程的结果。

获取结果

1
2
3
4
5
6
7
public T get() throws InterruptedException, ExecutionException

public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

public T join()

public T getNow(T valueIfAbsent)

get()方法会阻塞直到返回执行结果,除了future执行异常会抛出ExecutionException,线程中断抛出InterruptedException外,future被取消的话还会抛出运行时异常CancellationException

get(long timeout, TimeUnit unit)增加了超时时间作为入参,因此达到指定时间还会抛出TimeoutException

join()get()的区别在于抛出的都是运行时异常,计算被取消抛出的是CancellationExceptionfuture已经完成并抛出异常则该方法会抛出CompletionException

getNow(T valueIfAbsent)方法在join基础上增加了入参valueIfAbsent,若调用该方法时future并未完成则会立刻返回valueIfAbsent

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Main {
public static void main(String[] args) throws Exception {
// 第一个任务:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油");
});
// cfQuery成功后继续执行下一个任务:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// cfFetch成功后打印结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(2000);
}

static String queryCode(String name) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return "601857";
}

static Double fetchPrice(String code) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}