Java 中 CompletableFuture 异步执行功能
Java 中 CompletableFuture 异步执行功能概述
在Java编程中,随着应用程序复杂度的增加以及对性能和响应性要求的提高,异步编程变得愈发重要。CompletableFuture 作为Java 8引入的一个强大工具,为异步编程提供了一种简洁且高效的方式。它不仅允许我们异步执行任务,还能方便地处理异步操作的结果、错误以及组合多个异步操作。
CompletableFuture 的基础使用
创建 CompletableFuture
-
使用
CompletableFuture.runAsync
创建无返回值的异步任务CompletableFuture.runAsync
方法用于异步执行一个没有返回值的任务。它接收一个Runnable
对象作为参数,并返回一个CompletableFuture<Void>
。示例代码如下:import java.util.concurrent.CompletableFuture; public class CompletableFutureExample { public static void main(String[] args) { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务执行完毕"); }); // 主线程可以继续执行其他任务 System.out.println("主线程继续执行"); // 等待异步任务完成 try { future.get(); } catch (Exception e) { e.printStackTrace(); } } }
在上述代码中,
CompletableFuture.runAsync
启动了一个异步任务,该任务会睡眠2秒后输出“异步任务执行完毕”。主线程会继续执行并输出“主线程继续执行”,然后通过future.get()
等待异步任务完成。 -
使用
CompletableFuture.supplyAsync
创建有返回值的异步任务CompletableFuture.supplyAsync
方法用于异步执行一个有返回值的任务。它接收一个Supplier
对象作为参数,并返回一个CompletableFuture<T>
,其中T
是返回值的类型。示例代码如下:import java.util.concurrent.CompletableFuture; public class CompletableFutureSupplyExample { public static void main(String[] args) { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 42; }); // 主线程可以继续执行其他任务 System.out.println("主线程继续执行"); // 获取异步任务的返回值 try { Integer result = future.get(); System.out.println("异步任务的返回值: " + result); } catch (Exception e) { e.printStackTrace(); } } }
这里,
CompletableFuture.supplyAsync
启动的异步任务睡眠2秒后返回42。主线程获取到异步任务的返回值并输出。
获取异步任务的结果
-
使用
get()
方法get()
方法用于获取异步任务的结果。如果异步任务尚未完成,调用get()
方法的线程会被阻塞,直到任务完成。例如:CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello, CompletableFuture!"); try { String result = future.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); }
上述代码中,
future.get()
会阻塞主线程,直到异步任务返回“Hello, CompletableFuture!”并赋值给result
。 -
使用
get(long timeout, TimeUnit unit)
方法get(long timeout, TimeUnit unit)
方法允许设置等待异步任务完成的最长时间。如果在指定时间内任务未完成,会抛出TimeoutException
。示例如下:import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class CompletableFutureTimeoutExample { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务完成"; }); try { String result = future.get(2, TimeUnit.SECONDS); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } } }
在这个例子中,异步任务需要3秒完成,但只设置了2秒的等待时间,因此会抛出
TimeoutException
。
处理异步任务的完成和错误
处理任务完成
-
使用
thenAccept
方法thenAccept
方法用于在异步任务完成后执行一个Consumer
操作。它接收一个Consumer
对象作为参数,该Consumer
会处理异步任务的结果,但不返回新的结果。示例代码如下:CompletableFuture.supplyAsync(() -> "异步结果") .thenAccept(result -> System.out.println("处理异步结果: " + result));
这里,异步任务返回“异步结果”,
thenAccept
中的Consumer
会输出“处理异步结果: 异步结果”。 -
使用
thenApply
方法thenApply
方法用于在异步任务完成后执行一个Function
操作,并返回新的CompletableFuture
。Function
会处理异步任务的结果并返回一个新的值。示例如下:CompletableFuture.supplyAsync(() -> 10) .thenApply(result -> result * 2) .thenAccept(finalResult -> System.out.println("最终结果: " + finalResult));
异步任务先返回10,
thenApply
将其乘以2,最终输出“最终结果: 20”。
处理任务错误
-
使用
exceptionally
方法exceptionally
方法用于在异步任务发生异常时提供一个替代结果。它接收一个Function
对象作为参数,该Function
以Throwable
为输入并返回一个替代值。示例代码如下:CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("模拟异常"); } return "正常结果"; }) .exceptionally(ex -> { System.out.println("捕获到异常: " + ex.getMessage()); return "替代结果"; }) .thenAccept(result -> System.out.println("结果: " + result));
上述代码中,异步任务有50%的概率抛出异常,
exceptionally
捕获到异常并返回“替代结果”。 -
使用
handle
方法handle
方法可以同时处理异步任务的正常结果和异常情况。它接收一个BiFunction
对象作为参数,该BiFunction
的第一个参数是异步任务的结果(如果没有异常),第二个参数是异常(如果有异常),并返回一个新的值。示例如下:CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("模拟异常"); } return "正常结果"; }) .handle((result, ex) -> { if (ex != null) { System.out.println("捕获到异常: " + ex.getMessage()); return "替代结果"; } return result; }) .thenAccept(finalResult -> System.out.println("最终结果: " + finalResult));
此例中,
handle
同样处理了异常并返回相应结果。
组合多个 CompletableFuture
串行组合
-
使用
thenCompose
方法thenCompose
方法用于将两个CompletableFuture
串行组合。它接收一个Function
对象作为参数,该Function
以第一个CompletableFuture
的结果为输入,并返回另一个CompletableFuture
。示例代码如下:CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ", World!")) .thenAccept(System.out::println);
首先,第一个异步任务返回“Hello”,
thenCompose
接收这个结果并启动第二个异步任务,将其拼接为“Hello, World!”并输出。 -
使用
thenApply
与flatMap
的对比thenApply
方法返回的是一个包装了新结果的CompletableFuture
,而thenCompose
(类似于flatMap
)会将内部的CompletableFuture
展开。例如:CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> CompletableFuture.supplyAsync(() -> s + ", World!")) .thenAccept(future -> { try { System.out.println(future.get()); } catch (Exception e) { e.printStackTrace(); } });
这里
thenApply
返回的是CompletableFuture<CompletableFuture<String>>
,需要额外的get()
操作来获取最终结果。而thenCompose
直接返回CompletableFuture<String>
。
并行组合
-
使用
CompletableFuture.allOf
方法CompletableFuture.allOf
方法用于等待所有给定的CompletableFuture
都完成。它接收多个CompletableFuture
作为参数,并返回一个CompletableFuture<Void>
。示例如下:CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务1完成"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务2完成"; }); CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2); allFuture.join(); try { System.out.println(future1.get()); System.out.println(future2.get()); } catch (Exception e) { e.printStackTrace(); }
这里
allOf
等待future1
和future2
都完成,然后可以获取它们各自的结果。 -
使用
CompletableFuture.anyOf
方法CompletableFuture.anyOf
方法用于等待任何一个给定的CompletableFuture
完成。它接收多个CompletableFuture
作为参数,并返回一个CompletableFuture<Object>
,其结果是第一个完成的CompletableFuture
的结果。示例代码如下:CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务3完成"; }); CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务4完成"; }); CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future3, future4); try { System.out.println(anyFuture.get()); } catch (Exception e) { e.printStackTrace(); }
由于
future4
耗时更短,anyOf
会返回future4
的结果“任务4完成”。
CompletableFuture 的原理与实现细节
异步执行的线程模型
CompletableFuture 的异步执行依赖于Java的线程池。runAsync
和 supplyAsync
方法如果不传入自定义的 Executor
,会使用 ForkJoinPool.commonPool()
作为默认的线程池。ForkJoinPool
是Java 7引入的一种特殊线程池,它采用工作窃取算法,能够有效地利用多线程环境下的计算资源。
例如,在多核CPU环境中,ForkJoinPool
可以将任务细分并分配到不同的线程中执行,提高整体的执行效率。当一个线程完成自己的任务后,它可以从其他忙碌线程的任务队列中窃取任务来执行,从而减少线程的空闲时间。
结果的存储与传播
CompletableFuture 通过内部的状态变量来存储任务的执行状态(如未完成、已完成、异常完成等)以及结果。当异步任务完成时,会更新这些状态变量,并通知等待的线程。
例如,thenAccept
、thenApply
等方法注册的回调函数会被存储在一个内部队列中。当任务完成时,会按照顺序依次执行这些回调函数。如果任务异常完成,exceptionally
和 handle
等方法注册的异常处理逻辑也会被执行。
与其他异步框架的比较
-
与 Future 的比较
Future
是Java 5引入的用于异步操作的接口,它提供了基本的异步任务管理功能,如获取任务结果、检查任务是否完成等。然而,Future
存在一些局限性。例如,它缺乏对异步任务完成后的处理能力,不能方便地组合多个异步任务,并且获取结果时只能通过阻塞方式(get()
方法)。 相比之下,CompletableFuture
继承自Future
并扩展了其功能。它提供了丰富的方法来处理异步任务的完成、错误以及组合多个异步任务,使得异步编程更加简洁和灵活。 -
与 RxJava 的比较 RxJava 是一个基于观察者模式的异步编程框架,它提供了强大的异步流处理功能。与
CompletableFuture
相比,RxJava 更加侧重于处理异步数据流,支持复杂的操作符(如 map、filter、flatMap 等)来操作数据流。CompletableFuture
则更专注于单个异步任务的处理以及任务之间的简单组合。在简单的异步任务场景下,CompletableFuture
的使用更加简洁,而在处理复杂的异步数据流场景下,RxJava 则更具优势。
在实际项目中的应用场景
高并发请求处理
在Web应用开发中,当需要同时处理多个外部API请求时,使用 CompletableFuture
可以将这些请求异步化,提高系统的响应速度。例如,一个电商应用需要同时获取商品信息、库存信息和用户评价信息,可以通过 CompletableFuture
并行发起这三个请求,然后等待所有请求完成后再进行结果的整合和展示。
CompletableFuture<String> productInfoFuture = CompletableFuture.supplyAsync(() -> {
// 模拟获取商品信息
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "商品信息";
});
CompletableFuture<String> stockInfoFuture = CompletableFuture.supplyAsync(() -> {
// 模拟获取库存信息
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "库存信息";
});
CompletableFuture<String> reviewInfoFuture = CompletableFuture.supplyAsync(() -> {
// 模拟获取用户评价信息
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "用户评价信息";
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(productInfoFuture, stockInfoFuture, reviewInfoFuture);
allFuture.join();
try {
System.out.println("整合结果: " + productInfoFuture.get() + ", " + stockInfoFuture.get() + ", " + reviewInfoFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
异步计算与数据处理
在大数据处理场景中,有时需要对大量数据进行异步计算。例如,对一批用户数据进行统计分析,每个用户的数据处理可以作为一个异步任务,使用 CompletableFuture
并行处理,最后汇总结果。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class DataAnalysisExample {
public static void main(String[] args) {
List<Integer> userData = new ArrayList<>();
for (int i = 0; i < 10; i++) {
userData.add(i);
}
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (Integer data : userData) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 模拟数据处理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return data * 2;
});
futures.add(future);
}
List<Integer> results = new ArrayList<>();
for (CompletableFuture<Integer> future : futures) {
try {
results.add(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("汇总结果: " + results);
}
}
任务编排与流程控制
在复杂的业务流程中,可能需要按照一定的顺序执行多个异步任务,并且在任务之间传递数据。CompletableFuture
的串行和并行组合方法可以很好地满足这种需求。例如,一个订单处理流程,首先需要异步验证用户信息,然后异步检查库存,最后异步生成订单。
CompletableFuture.supplyAsync(() -> {
// 模拟用户信息验证
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "用户信息验证通过";
})
.thenCompose(s -> CompletableFuture.supplyAsync(() -> {
// 模拟库存检查
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + ", 库存检查通过";
}))
.thenCompose(s -> CompletableFuture.supplyAsync(() -> {
// 模拟订单生成
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + ", 订单生成成功";
}))
.thenAccept(System.out::println);
通过以上详细的介绍、丰富的代码示例以及对原理和实际应用场景的探讨,相信你对Java中 CompletableFuture
的异步执行功能有了深入的理解。在实际编程中,可以根据具体的需求灵活运用 CompletableFuture
,提升应用程序的性能和响应性。