Java8 CompletableFuture使用

准备Executor

这里以ThreadPoolExecutor为例

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1,
    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.CallerRunsPolicy());

异步执行任务

执行无返回值任务

CompletableFuture.runAsync(() -> System.out.println("a"), threadPoolExecutor);

执行有返回值任务

CompletableFuture<String> bFuture = CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor);  
System.out.println(bFuture.get());

任务执行完成后处理

thenApply方法,对任务执行结果进行函数变换

CompletableFuture<String> bFuture = CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor)
    .thenApply(String::toUpperCase);

handle方法,对任务执行结果进行函数变换,包括正常结果与异常

CompletableFuture<String> bFuture = CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor)
    .handle((result, exception) -> {  
        if (result != null) {  
            return result;  
        } else {  
            return exception;  
        }  
});

exceptionally方法,当任务执行抛出异常时,对对任务执行结果进行函数变换

bFuture.exceptionally(exception -> {  
    return exception.getMessage();  
});

thenAccept方法,消费任务执行结果

CompletableFuture<String> bFuture = CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor)
    .thenAccept(System.out::println);

whenComplete方法,消费任务执行结果,包括正常结果与异常

CompletableFuture<String> bFuture = CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor)
    .whenComplete((result, exception) -> {  
        System.out.println(result);
        System.out.println(exception.getMessage());
    });

thenRun方法,任务执行完成后继续执行其它操作

CompletableFuture<String> bFuture = CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor)
    .thenRun(() -> System.out.println("c"));

thenCompose方法,任务执行完成后继续执行下一个异步任务

CompletableFuture<String> bFuture = CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor)
    .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + "b", threadPoolExecutor));

thenCombine方法,并发执行两个异步任务,任务全部完成后对执行结果进行函数变换

CompletableFuture<String> bFuture = CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor)
    .thenCombine(CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor),
        (result1, result2) -> result1 + result2);

allOf方法,等待多个异步任务全部完成

CompletableFuture<String> bFuture = CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor);
CompletableFuture<String> cFuture = CompletableFuture.supplyAsync(() -> "c", threadPoolExecutor);

CompletableFuture<Void> allFuture = CompletableFuture.allOf(bFuture, cFuture);  
allFuture.join();

anyOf方法,获取多个异步任务第一个完成的结果

CompletableFuture<String> bFuture = CompletableFuture.supplyAsync(() -> "b", threadPoolExecutor);
CompletableFuture<String> cFuture = CompletableFuture.supplyAsync(() -> "c", threadPoolExecutor);

CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(bFuture, cFuture);  
anyFuture.get();