Java CompletableFuture在复杂业务逻辑中的异步编程应用
Java CompletableFuture 基础概念
在Java 8引入CompletableFuture之前,处理异步操作和并发编程是一项相对复杂的任务。传统的Future接口存在一些局限性,例如无法手动完成Future、获取结果时需要阻塞线程等。CompletableFuture的出现极大地改善了这种情况,它不仅实现了Future接口,还提供了更强大的异步编程能力。
CompletableFuture代表一个异步计算的结果。它可以在计算完成后被获取,并且可以通过回调函数来处理计算结果,而不需要阻塞调用线程。这使得编写异步代码变得更加简洁和高效。
创建CompletableFuture
- 使用
CompletableFuture.supplyAsync
创建有返回值的异步任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, CompletableFuture!";
});
String result = future.get();
System.out.println(result);
}
}
在上述代码中,CompletableFuture.supplyAsync
方法接受一个 Supplier
作为参数,该 Supplier
中的代码会在一个新的线程中异步执行。get
方法会阻塞当前线程,直到异步任务完成并返回结果。
- 使用
CompletableFuture.runAsync
创建无返回值的异步任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Async task completed without return value.");
});
future.get();
}
}
CompletableFuture.runAsync
方法接受一个 Runnable
作为参数,执行的任务没有返回值。同样,get
方法用于等待任务完成。
处理异步任务的结果
1. thenApply
方法
thenApply
方法用于在异步任务完成后,对其结果进行转换。它接受一个 Function
作为参数,该 Function
会在异步任务完成后被调用,输入参数为异步任务的返回值,返回值为转换后的结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenApplyExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + ", CompletableFuture!");
String result = future.get();
System.out.println(result);
}
}
在上述代码中,supplyAsync
方法返回一个包含 “Hello” 的 CompletableFuture
,接着 thenApply
方法对这个结果进行转换,添加了 “, CompletableFuture!”,最终输出 “Hello, CompletableFuture!”。
2. thenAccept
方法
thenAccept
方法用于在异步任务完成后,消费其结果,但不返回新的结果。它接受一个 Consumer
作为参数,该 Consumer
会在异步任务完成后被调用,输入参数为异步任务的返回值。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenAcceptExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(s -> System.out.println(s + ", CompletableFuture!"));
// 主线程不能立即退出,否则异步任务可能还未执行
Thread.sleep(2000);
}
}
在这个例子中,supplyAsync
方法返回一个包含 “Hello” 的 CompletableFuture
,thenAccept
方法对这个结果进行消费,将其打印出来。由于主线程需要等待异步任务执行完毕,所以使用 Thread.sleep
方法进行了短暂等待。
3. thenRun
方法
thenRun
方法用于在异步任务完成后,执行一个无参数的 Runnable
。它不关心异步任务的返回值,只是在任务完成后执行给定的 Runnable
。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenRunExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("Task completed, but I don't care about the result."));
// 主线程不能立即退出,否则异步任务可能还未执行
Thread.sleep(2000);
}
}
在上述代码中,supplyAsync
方法返回结果 “Hello”,但 thenRun
方法并不关心这个结果,只是在任务完成后打印一条消息。同样,主线程需要等待异步任务执行完毕。
组合多个 CompletableFuture
在复杂业务逻辑中,经常需要组合多个异步任务。CompletableFuture提供了丰富的方法来实现这一点。
1. thenCompose
方法
thenCompose
方法用于将两个异步任务进行组合,第一个任务的结果作为第二个任务的输入。它接受一个 Function
作为参数,该 Function
返回一个新的 CompletableFuture
。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenComposeExample {
public static CompletableFuture<String> fetchUserInfo(String userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟从数据库或网络获取用户信息
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "User " + userId + " info";
});
}
public static CompletableFuture<String> processUserInfo(String userInfo) {
return CompletableFuture.supplyAsync(() -> {
// 模拟对用户信息进行处理
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Processed: " + userInfo;
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = fetchUserInfo("123")
.thenCompose(ThenComposeExample::processUserInfo);
String result = future.get();
System.out.println(result);
}
}
在上述代码中,fetchUserInfo
方法异步获取用户信息,processUserInfo
方法异步处理用户信息。thenCompose
方法将这两个异步任务组合起来,使得第一个任务的结果能够作为第二个任务的输入。
2. thenCombine
方法
thenCombine
方法用于将两个异步任务的结果进行合并。它接受另一个 CompletableFuture
和一个 BiFunction
作为参数,BiFunction
的两个输入参数分别为两个异步任务的结果,返回值为合并后的结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenCombineExample {
public static CompletableFuture<String> task1() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result of task1";
});
}
public static CompletableFuture<String> task2() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result of task2";
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = task1()
.thenCombine(task2(), (r1, r2) -> r1 + " and " + r2);
String result = future.get();
System.out.println(result);
}
}
在这个例子中,task1
和 task2
是两个独立的异步任务,thenCombine
方法将它们的结果合并在一起,最终输出 “Result of task1 and Result of task2”。
3. allOf
方法
allOf
方法用于等待所有给定的 CompletableFuture
都完成。它接受多个 CompletableFuture
作为参数,返回一个新的 CompletableFuture<Void>
。当所有传入的 CompletableFuture
都完成时,这个新的 CompletableFuture
才会完成。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AllOfExample {
public static CompletableFuture<String> task1() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 1 completed";
});
}
public static CompletableFuture<String> task2() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 2 completed";
});
}
public static CompletableFuture<String> task3() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 3 completed";
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> allTasksFuture = CompletableFuture.allOf(task1(), task2(), task3());
allTasksFuture.get();
System.out.println("All tasks completed.");
}
}
在上述代码中,task1
、task2
和 task3
是三个异步任务,allOf
方法会等待这三个任务都完成后,才使得 allTasksFuture
完成,然后打印 “All tasks completed.”。
4. anyOf
方法
anyOf
方法用于等待给定的 CompletableFuture
中任意一个完成。它接受多个 CompletableFuture
作为参数,返回一个新的 CompletableFuture
,这个新的 CompletableFuture
会在任意一个传入的 CompletableFuture
完成时完成,其结果为第一个完成的 CompletableFuture
的结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AnyOfExample {
public static CompletableFuture<String> task1() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 1 completed";
});
}
public static CompletableFuture<String> task2() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 2 completed";
});
}
public static CompletableFuture<String> task3() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 3 completed";
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Object> anyTaskFuture = CompletableFuture.anyOf(task1(), task2(), task3());
String result = (String) anyTaskFuture.get();
System.out.println("First completed task result: " + result);
}
}
在这个例子中,task1
、task2
和 task3
是三个异步任务,anyOf
方法会等待其中任意一个任务完成,然后 anyTaskFuture
就会完成,其结果为第一个完成的任务的结果。在这个例子中,task2
最快完成,所以最终输出 “First completed task result: Task 2 completed”。
处理异步任务的异常
在异步编程中,异常处理是非常重要的一部分。CompletableFuture提供了多种处理异常的方式。
1. exceptionally
方法
exceptionally
方法用于在异步任务发生异常时,提供一个备用的结果。它接受一个 Function
作为参数,该 Function
的输入参数为异常对象,返回值为备用结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ExceptionallyExample {
public static CompletableFuture<String> potentiallyFailingTask() {
return CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Simulated exception");
}
return "Task completed successfully";
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = potentiallyFailingTask()
.exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return "Default result";
});
String result = future.get();
System.out.println("Result: " + result);
}
}
在上述代码中,potentiallyFailingTask
方法模拟了一个可能会抛出异常的异步任务。exceptionally
方法捕获到异常后,打印异常信息,并返回一个默认结果 “Default result”。
2. handle
方法
handle
方法用于在异步任务完成或发生异常时,对结果或异常进行处理。它接受一个 BiFunction
作为参数,第一个输入参数为异步任务的结果(如果任务正常完成),第二个输入参数为异常对象(如果任务发生异常),返回值为处理后的结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class HandleExample {
public static CompletableFuture<String> potentiallyFailingTask() {
return CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Simulated exception");
}
return "Task completed successfully";
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = potentiallyFailingTask()
.handle((result, ex) -> {
if (ex != null) {
System.out.println("Caught exception: " + ex.getMessage());
return "Default result";
}
return "Processed result: " + result;
});
String finalResult = future.get();
System.out.println("Final result: " + finalResult);
}
}
在这个例子中,handle
方法根据任务是否正常完成来进行不同的处理。如果任务发生异常,打印异常信息并返回默认结果;如果任务正常完成,对结果进行处理并返回。
CompletableFuture 在复杂业务逻辑中的应用场景
1. 电商系统中的商品详情页加载
在电商系统中,商品详情页可能需要从多个数据源获取信息,例如商品基本信息、库存信息、用户评价等。这些信息的获取可以通过异步任务并行执行,然后将结果合并展示。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ProductDetailExample {
public static CompletableFuture<String> getProductInfo(String productId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟从数据库获取商品基本信息
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Product " + productId + " basic info";
});
}
public static CompletableFuture<String> getStockInfo(String productId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟从库存系统获取库存信息
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Product " + productId + " stock info";
});
}
public static CompletableFuture<String> getReviews(String productId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟从评价系统获取用户评价
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Product " + productId + " reviews";
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
String productId = "123";
CompletableFuture<String> productInfoFuture = getProductInfo(productId);
CompletableFuture<String> stockInfoFuture = getStockInfo(productId);
CompletableFuture<String> reviewsFuture = getReviews(productId);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(productInfoFuture, stockInfoFuture, reviewsFuture);
allFutures.thenRun(() -> {
try {
String productInfo = productInfoFuture.get();
String stockInfo = stockInfoFuture.get();
String reviews = reviewsFuture.get();
System.out.println("Product Detail: " + productInfo + ", " + stockInfo + ", " + reviews);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).get();
}
}
在上述代码中,getProductInfo
、getStockInfo
和 getReviews
方法分别异步获取商品基本信息、库存信息和用户评价。allOf
方法等待这三个任务都完成后,将结果合并展示。
2. 分布式系统中的数据聚合
在分布式系统中,可能需要从多个节点获取数据并进行聚合。例如,一个分布式日志系统,需要从多个日志节点获取日志数据,然后进行分析和汇总。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class DistributedDataAggregationExample {
public static CompletableFuture<String> fetchDataFromNode(String node) {
return CompletableFuture.supplyAsync(() -> {
// 模拟从节点获取数据
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Data from " + node;
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<String> nodes = new ArrayList<>();
nodes.add("Node1");
nodes.add("Node2");
nodes.add("Node3");
List<CompletableFuture<String>> futures = new ArrayList<>();
for (String node : nodes) {
futures.add(fetchDataFromNode(node));
}
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.thenRun(() -> {
StringBuilder aggregatedData = new StringBuilder();
for (CompletableFuture<String> future : futures) {
try {
aggregatedData.append(future.get()).append(", ");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("Aggregated Data: " + aggregatedData.toString());
}).get();
}
}
在这个例子中,fetchDataFromNode
方法异步从每个节点获取数据。通过 allOf
方法等待所有数据获取完成后,将数据进行聚合并打印。
3. 搜索引擎中的多数据源搜索
搜索引擎可能需要从多个数据源(如网页数据库、文档库等)中搜索相关信息,然后将结果合并展示给用户。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class MultiSourceSearchExample {
public static CompletableFuture<List<String>> searchWeb(String query) {
return CompletableFuture.supplyAsync(() -> {
// 模拟网页搜索
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<String> results = new ArrayList<>();
results.add("Web result 1 for " + query);
results.add("Web result 2 for " + query);
return results;
});
}
public static CompletableFuture<List<String>> searchDocuments(String query) {
return CompletableFuture.supplyAsync(() -> {
// 模拟文档搜索
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<String> results = new ArrayList<>();
results.add("Document result 1 for " + query);
results.add("Document result 2 for " + query);
return results;
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
String query = "Java CompletableFuture";
CompletableFuture<List<String>> webSearchFuture = searchWeb(query);
CompletableFuture<List<String>> documentSearchFuture = searchDocuments(query);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(webSearchFuture, documentSearchFuture);
allFutures.thenRun(() -> {
List<String> allResults = new ArrayList<>();
try {
allResults.addAll(webSearchFuture.get());
allResults.addAll(documentSearchFuture.get());
System.out.println("All search results: " + allResults);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).get();
}
}
在上述代码中,searchWeb
和 searchDocuments
方法分别异步从网页和文档库中搜索相关信息。通过 allOf
方法等待两个搜索任务完成后,将结果合并展示。
CompletableFuture 的线程池使用
在默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool()
来执行异步任务。然而,在某些情况下,我们可能需要使用自定义的线程池,以满足特定的业务需求,例如控制线程数量、设置线程优先级等。
使用自定义线程池
可以通过 supplyAsync
和 runAsync
方法的重载版本来指定使用的线程池。
import java.util.concurrent.*;
public class CustomThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task executed with custom thread pool";
}, executor)
.thenAccept(System.out::println)
.thenRun(() -> executor.shutdown());
}
}
在上述代码中,首先创建了一个固定大小为 5 的线程池 executor
。然后使用 supplyAsync
方法的重载版本,将自定义线程池作为参数传入,这样异步任务就会在自定义线程池中执行。任务完成后,通过 thenRun
方法关闭线程池。
线程池大小的选择
选择合适的线程池大小对于性能至关重要。如果线程池太小,可能导致任务等待,无法充分利用系统资源;如果线程池太大,可能会增加线程上下文切换的开销,导致性能下降。
一般来说,对于 CPU 密集型任务,线程池大小可以设置为 CPU 核心数加 1,以确保在某个线程因偶尔的 I/O 操作等原因阻塞时,其他线程仍能继续使用 CPU 资源。对于 I/O 密集型任务,线程池大小可以根据预估的 I/O 等待时间和 CPU 处理时间的比例适当增大,例如设置为 CPU 核心数的 2 到 3 倍。
CompletableFuture 的性能优化
减少不必要的异步任务
虽然异步编程可以提高系统的响应性,但过多的异步任务可能会增加系统的复杂性和性能开销。在设计时,应仔细评估哪些任务真正需要异步执行,避免将一些简单、快速的任务也异步化。
合理设置超时
在异步任务执行过程中,设置合理的超时时间可以避免任务长时间阻塞。CompletableFuture 本身没有直接提供设置超时的方法,但可以通过 ExecutorService
的 submit
方法结合 Future
的 get(timeout, unit)
方法来实现。
import java.util.concurrent.*;
public class TimeoutExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
// 模拟耗时操作
Thread.sleep(5000);
return "Task completed";
});
try {
String result = future.get(2, TimeUnit.SECONDS);
System.out.println(result);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
System.out.println("Task timed out or interrupted.");
future.cancel(true);
} finally {
executor.shutdown();
}
}
}
在上述代码中,submit
方法提交一个异步任务,然后通过 get(2, TimeUnit.SECONDS)
方法设置超时时间为 2 秒。如果任务在 2 秒内未完成,将抛出 TimeoutException
,并取消任务。
避免不必要的阻塞
在处理 CompletableFuture 时,应尽量避免在主线程或关键线程中调用 get
方法阻塞等待结果。可以通过使用回调函数(如 thenApply
、thenAccept
等)来处理结果,以保持程序的异步性。
总结
CompletableFuture 为 Java 开发者提供了强大的异步编程能力,使得在复杂业务逻辑中处理异步任务变得更加简洁和高效。通过合理使用 CompletableFuture 的各种方法,如创建异步任务、处理结果、组合任务、处理异常等,我们可以构建出高性能、高响应性的应用程序。同时,在使用 CompletableFuture 时,要注意线程池的选择、性能优化等方面,以充分发挥其优势。希望本文的内容能帮助你更好地理解和应用 CompletableFuture 进行异步编程。