Java CompletableFuture的链式调用与组合
Java CompletableFuture的链式调用
在Java中,CompletableFuture
提供了强大的异步编程能力,链式调用是其核心特性之一。它允许我们以一种流畅的方式编写异步操作序列,使得代码更加简洁和易读。
基本链式调用
假设我们有一个简单的异步任务,例如从网络获取数据。通常,我们可能会这样写:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureChainExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时操作,比如从网络获取数据
return "Data from network";
});
CompletableFuture<String> processedFuture = future.thenApply(data -> {
// 对获取到的数据进行处理
return "Processed: " + data;
});
String result = processedFuture.get();
System.out.println(result);
}
}
在这个例子中,CompletableFuture.supplyAsync
方法创建了一个异步任务,该任务返回一个 CompletableFuture
对象。thenApply
方法则在异步任务完成后,对其结果进行处理。thenApply
方法接受一个 Function
作为参数,该 Function
会将前一个 CompletableFuture
的结果作为输入,并返回一个新的结果。
链式调用的优势
链式调用的主要优势在于代码的可读性和维护性。传统的异步编程,尤其是使用回调函数时,代码可能会变得非常复杂,出现“回调地狱”的情况。例如,考虑以下传统回调风格的代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CallbackExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
// 第一个异步任务
String data = "Data from network";
executor.submit(() -> {
// 基于第一个任务结果的第二个异步任务
String processedData = "Processed: " + data;
System.out.println(processedData);
});
});
executor.shutdown();
}
}
可以看到,随着异步任务的增加,代码变得越来越难以阅读和维护。而使用 CompletableFuture
的链式调用,代码结构更加清晰,每个异步步骤一目了然。
多步链式调用
CompletableFuture
支持多个步骤的链式调用。假设我们有一个更复杂的场景,需要对获取的数据进行多次处理:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class MultiStepChainExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Data from network";
})
.thenApply(data -> {
return "Step 1: " + data;
})
.thenApply(data -> {
return "Step 2: " + data;
})
.thenApply(data -> {
return "Step 3: " + data;
});
String result = future.get();
System.out.println(result);
}
}
在这个例子中,我们依次对数据进行了三次处理,每个 thenApply
方法都基于前一个方法的结果进行操作。这种链式调用使得整个异步处理流程非常清晰。
处理异常
在链式调用中,异常处理是非常重要的。CompletableFuture
提供了 exceptionally
方法来处理异步任务中的异常。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ExceptionHandlingInChainExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Simulated exception");
}
return "Data from network";
})
.thenApply(data -> {
return "Processed: " + data;
})
.exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return "Default value";
});
String result = future.get();
System.out.println(result);
}
}
在这个例子中,如果 supplyAsync
中的异步任务抛出异常,exceptionally
方法会捕获该异常,并返回一个默认值。这样可以确保即使在异步任务出现异常的情况下,程序也能继续运行,而不会崩溃。
Java CompletableFuture的组合
除了链式调用,CompletableFuture
还支持任务的组合。这意味着我们可以将多个异步任务组合成一个更复杂的任务,以满足不同的业务需求。
组合多个独立任务
假设我们有两个独立的异步任务,例如从不同的数据源获取数据,然后将这两个数据进行合并。我们可以使用 CompletableFuture
的 thenCombine
方法来实现:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CombineIndependentTasksExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 模拟从第一个数据源获取数据
return "Data from source 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 模拟从第二个数据源获取数据
return "Data from source 2";
});
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (data1, data2) -> {
return data1 + " and " + data2;
});
String result = combinedFuture.get();
System.out.println(result);
}
}
在这个例子中,thenCombine
方法接受两个 CompletableFuture
对象和一个 BiFunction
。当这两个 CompletableFuture
都完成时,BiFunction
会将它们的结果作为参数,并返回一个合并后的结果。
按顺序组合任务
有时候,我们需要按照特定的顺序执行多个异步任务,即使这些任务在逻辑上是独立的。例如,我们可能需要先获取用户信息,然后根据用户信息获取用户的订单列表。可以使用 thenCompose
方法来实现这种按顺序的组合:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
class User {
private String name;
public User(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
class Order {
private String orderInfo;
public Order(String orderInfo) {
this.orderInfo = orderInfo;
}
public String getOrderInfo() {
return orderInfo;
}
}
public class SequentialCompositionExample {
public static CompletableFuture<User> getUser() {
return CompletableFuture.supplyAsync(() -> {
return new User("John");
});
}
public static CompletableFuture<Order> getOrder(User user) {
return CompletableFuture.supplyAsync(() -> {
return new Order("Order for " + user.getName());
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Order> future = getUser()
.thenCompose(user -> getOrder(user));
Order result = future.get();
System.out.println(result.getOrderInfo());
}
}
在这个例子中,thenCompose
方法会等待第一个 CompletableFuture
(getUser
)完成,然后将其结果作为参数传递给第二个 CompletableFuture
(getOrder
)。这样就确保了任务按照顺序执行。
并行执行任务并等待所有完成
如果我们有多个独立的异步任务,并且希望它们并行执行,然后等待所有任务都完成后再进行下一步操作,可以使用 CompletableFuture
的 allOf
方法。例如,我们需要从多个数据库表中获取数据,然后进行汇总:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AllOfExample {
public static CompletableFuture<Integer> getDataFromTable1() {
return CompletableFuture.supplyAsync(() -> {
// 模拟从表1获取数据
return 10;
});
}
public static CompletableFuture<Integer> getDataFromTable2() {
return CompletableFuture.supplyAsync(() -> {
// 模拟从表2获取数据
return 20;
});
}
public static CompletableFuture<Integer> getDataFromTable3() {
return CompletableFuture.supplyAsync(() -> {
// 模拟从表3获取数据
return 30;
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture[] futures = {
getDataFromTable1(),
getDataFromTable2(),
getDataFromTable3()
};
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures);
allFutures.join();
int total = 0;
for (CompletableFuture<Integer> future : futures) {
total += future.get();
}
System.out.println("Total: " + total);
}
}
在这个例子中,allOf
方法接受一个 CompletableFuture
数组,并返回一个新的 CompletableFuture
。当所有传入的 CompletableFuture
都完成时,这个新的 CompletableFuture
才会完成。我们可以通过调用 join
方法等待所有任务完成,然后再处理它们的结果。
任意一个任务完成即返回
有时候,我们只关心多个异步任务中任意一个任务的完成情况。例如,我们向多个服务器发送请求,只要有一个服务器响应,我们就可以停止等待。可以使用 CompletableFuture
的 anyOf
方法来实现:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AnyOfExample {
public static CompletableFuture<String> sendRequestToServer1() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Response from server 1";
});
}
public static CompletableFuture<String> sendRequestToServer2() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Response from server 2";
});
}
public static CompletableFuture<String> sendRequestToServer3() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Response from server 3";
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture[] futures = {
sendRequestToServer1(),
sendRequestToServer2(),
sendRequestToServer3()
};
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futures);
String result = (String) anyFuture.get();
System.out.println(result);
}
}
在这个例子中,anyOf
方法接受一个 CompletableFuture
数组,并返回一个新的 CompletableFuture
。当任意一个传入的 CompletableFuture
完成时,这个新的 CompletableFuture
就会完成,并返回已完成任务的结果。
CompletableFuture链式调用与组合的高级应用
在实际应用中,CompletableFuture
的链式调用与组合可以用于更复杂的场景,例如分布式系统中的数据处理、微服务之间的交互等。
分布式数据处理
假设我们有一个分布式系统,需要从多个节点获取数据,然后对这些数据进行聚合和分析。可以使用 CompletableFuture
的组合功能来实现:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class DistributedDataProcessingExample {
public static CompletableFuture<List<Integer>> getDataFromNode1() {
return CompletableFuture.supplyAsync(() -> {
List<Integer> data = new ArrayList<>();
data.add(1);
data.add(2);
return data;
});
}
public static CompletableFuture<List<Integer>> getDataFromNode2() {
return CompletableFuture.supplyAsync(() -> {
List<Integer> data = new ArrayList<>();
data.add(3);
data.add(4);
return data;
});
}
public static CompletableFuture<List<Integer>> getDataFromNode3() {
return CompletableFuture.supplyAsync(() -> {
List<Integer> data = new ArrayList<>();
data.add(5);
data.add(6);
return data;
});
}
public static CompletableFuture<Integer> aggregateData(List<List<Integer>> allData) {
return CompletableFuture.supplyAsync(() -> {
int sum = 0;
for (List<Integer> dataList : allData) {
for (int num : dataList) {
sum += num;
}
}
return sum;
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<List<Integer>> future1 = getDataFromNode1();
CompletableFuture<List<Integer>> future2 = getDataFromNode2();
CompletableFuture<List<Integer>> future3 = getDataFromNode3();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
CompletableFuture<List<List<Integer>>> combinedDataFuture = allFutures.thenApply(v -> {
List<List<Integer>> combinedData = new ArrayList<>();
try {
combinedData.add(future1.get());
combinedData.add(future2.get());
combinedData.add(future3.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return combinedData;
});
CompletableFuture<Integer> resultFuture = combinedDataFuture.thenCompose(combinedData -> aggregateData(combinedData));
int result = resultFuture.get();
System.out.println("Aggregated result: " + result);
}
}
在这个例子中,我们从三个不同的节点获取数据,使用 allOf
方法等待所有数据获取完成,然后将这些数据组合起来进行聚合计算。
微服务交互
在微服务架构中,一个服务可能需要调用多个其他微服务来完成一个业务逻辑。例如,一个订单服务可能需要调用用户服务获取用户信息,调用库存服务检查库存,然后根据这些信息处理订单。可以使用 CompletableFuture
来管理这些微服务之间的异步调用:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
class UserService {
public static CompletableFuture<String> getUserInfo() {
return CompletableFuture.supplyAsync(() -> {
// 模拟调用用户服务获取用户信息
return "User information";
});
}
}
class InventoryService {
public static CompletableFuture<String> checkInventory() {
return CompletableFuture.supplyAsync(() -> {
// 模拟调用库存服务检查库存
return "Inventory is available";
});
}
}
class OrderService {
public static CompletableFuture<String> processOrder(String userInfo, String inventoryInfo) {
return CompletableFuture.supplyAsync(() -> {
// 模拟根据用户信息和库存信息处理订单
return "Order processed with " + userInfo + " and " + inventoryInfo;
});
}
}
public class MicroserviceInteractionExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> userFuture = UserService.getUserInfo();
CompletableFuture<String> inventoryFuture = InventoryService.checkInventory();
CompletableFuture<String> combinedFuture = userFuture.thenCombine(inventoryFuture, (userInfo, inventoryInfo) -> {
return OrderService.processOrder(userInfo, inventoryInfo).join();
});
String result = combinedFuture.get();
System.out.println(result);
}
}
在这个例子中,订单服务通过 thenCombine
方法组合了用户服务和库存服务的结果,然后进行订单处理。
链式调用与组合中的线程管理
在使用 CompletableFuture
的链式调用和组合时,线程管理是一个重要的方面。默认情况下,CompletableFuture
使用 ForkJoinPool.commonPool()
来执行异步任务。然而,在某些情况下,我们可能需要自定义线程池。
使用自定义线程池
假设我们有一个高负载的应用,需要为 CompletableFuture
任务分配专门的线程池,以避免与其他任务竞争资源。可以通过 supplyAsync
等方法的重载版本来指定线程池:
import java.util.concurrent.*;
public class CustomThreadPoolExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Data from async task";
}, executor);
CompletableFuture<String> processedFuture = future.thenApply(data -> {
return "Processed: " + data;
});
String result = processedFuture.get();
System.out.println(result);
executor.shutdown();
}
}
在这个例子中,我们创建了一个固定大小为5的线程池,并将其传递给 supplyAsync
方法。这样,异步任务将在我们自定义的线程池中执行。
线程池对链式调用和组合的影响
使用自定义线程池可以更好地控制任务的执行,特别是在高并发场景下。例如,在链式调用中,如果某个任务非常耗时,使用默认线程池可能会导致其他任务等待。而通过自定义线程池,可以为不同类型的任务分配不同的线程资源。
在任务组合时,例如使用 allOf
方法等待多个任务完成,自定义线程池可以确保这些任务在合理的资源下并行执行。如果不使用自定义线程池,可能会因为 ForkJoinPool.commonPool()
的线程数量限制,导致任务执行效率低下。
链式调用与组合中的性能优化
为了提高 CompletableFuture
链式调用和组合的性能,我们可以采取一些优化措施。
减少不必要的中间结果
在链式调用中,尽量避免创建过多不必要的中间结果。例如,如果某个 thenApply
方法只是对数据进行简单的转换,并且这个转换结果不会在其他地方使用,可以考虑将多个转换操作合并到一个 thenApply
方法中。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class OptimizeIntermediateResultsExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Original data";
});
// 优化前
CompletableFuture<String> optimizedFuture1 = future
.thenApply(data -> {
return data + " - step1";
})
.thenApply(data -> {
return data + " - step2";
});
// 优化后
CompletableFuture<String> optimizedFuture2 = future
.thenApply(data -> {
return data + " - step1 - step2";
});
String result1 = optimizedFuture1.get();
String result2 = optimizedFuture2.get();
System.out.println(result1);
System.out.println(result2);
}
}
在这个例子中,优化后的代码减少了一个中间结果的创建,从而提高了性能。
合理使用并行任务
在任务组合中,合理使用并行任务可以显著提高性能。例如,在使用 allOf
方法时,如果任务之间没有依赖关系,应该确保它们并行执行。同时,要注意线程池的大小设置,避免因为线程过多导致上下文切换开销过大。
import java.util.concurrent.*;
public class ParallelTaskOptimizationExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
}, executor);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 3;
}, executor);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
allFutures.join();
int total = 0;
try {
total = future1.get() + future2.get() + future3.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("Total: " + total);
executor.shutdown();
}
}
在这个例子中,我们通过自定义线程池,确保三个任务并行执行,提高了整体的执行效率。同时,要根据实际情况调整线程池的大小,以达到最佳的性能。
链式调用与组合在响应式编程中的应用
CompletableFuture
的链式调用和组合在响应式编程中也有重要的应用。响应式编程强调异步数据流和事件驱动,CompletableFuture
可以很好地与响应式编程框架(如 Reactor、RxJava)集成。
与 Reactor 集成
Reactor 是一个基于响应式流规范的 Java 框架。我们可以将 CompletableFuture
转换为 Reactor 的 Mono
或 Flux
,以便在 Reactor 中进行更复杂的响应式处理。
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureToReactorExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Data from CompletableFuture";
});
Mono<String> mono = Mono.fromFuture(future);
mono.subscribe(data -> {
System.out.println("Received data from Mono: " + data);
});
}
}
在这个例子中,我们使用 Mono.fromFuture
方法将 CompletableFuture
转换为 Mono
。然后可以使用 Mono
的各种操作符进行进一步的响应式处理。
与 RxJava 集成
RxJava 也是一个流行的响应式编程框架。同样,我们可以将 CompletableFuture
转换为 RxJava 的 Observable
或 Single
。
import io.reactivex.Single;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureToRxJavaExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Data from CompletableFuture";
});
Single<String> single = Single.fromFuture(future);
single.subscribe(data -> {
System.out.println("Received data from Single: " + data);
});
}
}
通过这种集成,我们可以在 CompletableFuture
的基础上,利用响应式编程框架的强大功能,如操作符链、背压处理等,来构建更复杂和高效的异步应用程序。
通过深入理解 CompletableFuture
的链式调用与组合,以及合理应用线程管理、性能优化和与响应式编程框架的集成,我们可以在 Java 中构建出高效、可维护的异步应用程序。无论是处理分布式系统中的数据,还是管理微服务之间的交互,CompletableFuture
都提供了强大的工具来满足我们的需求。