Java CompletableFuture thenCompose解耦异步任务的方法
Java CompletableFuture thenCompose解耦异步任务的方法
CompletableFuture 基础概念
在Java中,CompletableFuture
是Java 8引入的一个强大的类,用于处理异步计算。它实现了 Future
接口和 CompletionStage
接口,允许我们以一种更灵活、更简洁的方式编写异步代码。Future
接口在Java 5就已经引入,它提供了一种异步执行任务并获取结果的方式。但是,Future
存在一些局限性,比如我们很难对多个 Future
进行链式操作,也难以处理异步操作完成后的结果。而 CompletableFuture
解决了这些问题。
CompletableFuture
可以通过多种方式创建,例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个异步任务,这里可能是耗时的I/O操作、数据库查询等
return "Hello, CompletableFuture!";
});
在上述代码中,supplyAsync
方法接收一个 Supplier
作为参数,并异步执行 Supplier
中的逻辑,返回一个 CompletableFuture
对象,该对象最终会包含 Supplier
返回的结果。
thenCompose 方法的作用
thenCompose
方法是 CompletableFuture
类中一个非常重要的方法,它用于将两个异步操作链式连接起来。更具体地说,thenCompose
允许我们在一个 CompletableFuture
完成后,使用其结果来触发另一个 CompletableFuture
的执行,并且将这两个异步操作解耦。
假设我们有两个异步任务,任务A和任务B,任务B的执行依赖于任务A的结果。在传统的 Future
中,我们可能需要手动获取任务A的结果,然后再启动任务B。而使用 CompletableFuture
的 thenCompose
方法,我们可以更优雅地实现这种链式异步操作。
thenCompose
方法的签名如下:
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
其中,T
是当前 CompletableFuture
的结果类型,U
是新生成的 CompletableFuture
的结果类型。fn
是一个 Function
,它接收当前 CompletableFuture
的结果,并返回一个新的 CompletionStage
(通常是另一个 CompletableFuture
)。
代码示例1:简单的链式异步操作
下面通过一个简单的示例来演示 thenCompose
的基本用法。假设我们有两个异步任务,第一个任务是从数据库中查询用户信息,第二个任务是根据用户信息获取用户的订单列表。
首先,定义两个模拟的异步方法:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
class UserService {
// 模拟从数据库查询用户信息
public static CompletableFuture<String> getUserInfo() {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "User: John Doe";
});
}
// 模拟根据用户信息获取订单列表
public static CompletableFuture<String> getOrderList(String userInfo) {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Orders for " + userInfo + ": Order1, Order2";
});
}
}
然后,使用 thenCompose
方法将这两个异步任务连接起来:
public class ThenComposeExample1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> result = UserService.getUserInfo()
.thenCompose(UserService::getOrderList);
System.out.println(result.get());
}
}
在上述代码中,UserService.getUserInfo()
方法返回一个 CompletableFuture<String>
,表示获取用户信息的异步操作。thenCompose
方法接收 UserService::getOrderList
作为参数,这是一个方法引用,它将 getUserInfo
操作的结果作为参数传递给 getOrderList
方法,并返回一个新的 CompletableFuture<String>
,表示获取订单列表的异步操作。最终,通过 result.get()
获取整个链式异步操作的结果。
thenCompose 与 thenApply 的区别
在使用 CompletableFuture
进行链式操作时,thenApply
也是一个常用的方法,它与 thenCompose
有一些相似之处,但也有重要的区别。
thenApply
方法的签名如下:
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
thenApply
接收一个 Function
,该 Function
接收当前 CompletableFuture
的结果,并返回一个普通的结果(不是 CompletableFuture
)。而 thenCompose
接收的 Function
返回的是一个 CompletableFuture
。
例如,假设我们有一个 CompletableFuture<Integer>
,我们想对其结果进行平方操作,我们可以使用 thenApply
:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> squaredFuture = future1.thenApply(i -> i * i);
在上述代码中,thenApply
接收一个 Function
,该 Function
将 CompletableFuture<Integer>
的结果(这里是5)进行平方操作,并返回一个新的 CompletableFuture<Integer>
,其结果为25。
而如果我们想在结果基础上再启动一个异步操作,比如根据平方后的结果进行数据库查询,就需要使用 thenCompose
。例如:
class DatabaseService {
public static CompletableFuture<String> queryByNumber(int number) {
return CompletableFuture.supplyAsync(() -> {
// 模拟数据库查询
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result for number " + number;
});
}
}
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<String> queryResult = future2.thenCompose(i -> DatabaseService.queryByNumber(i * i));
在这个例子中,thenCompose
将 CompletableFuture<Integer>
的结果平方后,作为参数传递给 DatabaseService.queryByNumber
方法,该方法返回一个 CompletableFuture<String>
,表示数据库查询的异步操作。
代码示例2:处理复杂业务逻辑
在实际应用中,我们可能会遇到更复杂的业务逻辑,需要多个异步任务链式执行,并且每个任务都可能有不同的返回类型。下面通过一个更复杂的示例来展示 thenCompose
的应用。
假设我们要开发一个电商系统,需要从库存系统查询商品库存,然后根据库存判断是否有足够的商品可以发货,如果有则更新订单状态并通知用户。
首先,定义相关的服务类和方法:
import java.util.concurrent.CompletableFuture;
class InventoryService {
// 模拟查询商品库存
public static CompletableFuture<Integer> checkStock(String productId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 假设返回库存数量
return 10;
});
}
}
class OrderService {
// 模拟更新订单状态
public static CompletableFuture<String> updateOrderStatus(String orderId, int stock) {
if (stock > 0) {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Order " + orderId + " status updated to shipped";
});
} else {
return CompletableFuture.supplyAsync(() -> "Not enough stock for order " + orderId);
}
}
}
class NotificationService {
// 模拟通知用户
public static CompletableFuture<String> notifyUser(String message) {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "User notified: " + message;
});
}
}
然后,使用 thenCompose
来实现整个业务流程:
public class ThenComposeExample2 {
public static void main(String[] args) throws Exception {
String productId = "P001";
String orderId = "O001";
CompletableFuture<String> result = InventoryService.checkStock(productId)
.thenCompose(stock -> OrderService.updateOrderStatus(orderId, stock))
.thenCompose(OrderService::notifyUser);
System.out.println(result.get());
}
}
在上述代码中,首先通过 InventoryService.checkStock
查询商品库存,然后使用 thenCompose
将库存结果传递给 OrderService.updateOrderStatus
方法来更新订单状态,最后再使用 thenCompose
将更新订单状态的结果传递给 NotificationService.notifyUser
方法来通知用户。整个过程通过 thenCompose
实现了异步任务的链式调用和解耦。
异常处理
在使用 thenCompose
进行异步操作时,异常处理是非常重要的。CompletableFuture
提供了几种处理异常的方法,比如 exceptionally
、handle
等。
exceptionally
方法接收一个 Function
,当 CompletableFuture
执行过程中出现异常时,该 Function
会被调用,返回一个默认值或处理异常后的结果。例如:
CompletableFuture<Integer> futureWithException = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Simulated exception");
});
CompletableFuture<Integer> resultWithException = futureWithException.exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return -1;
});
try {
System.out.println(resultWithException.get());
} catch (Exception e) {
e.printStackTrace();
}
在上述代码中,futureWithException
故意抛出一个运行时异常。exceptionally
方法捕获到这个异常,并返回 -1 作为默认值。
当使用 thenCompose
进行链式异步操作时,异常会沿着链式传递。例如:
CompletableFuture<String> futureChainWithException = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("First step exception");
})
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " appended"));
CompletableFuture<String> resultChainWithException = futureChainWithException.exceptionally(ex -> {
System.out.println("Caught exception in chain: " + ex.getMessage());
return "Default value";
});
try {
System.out.println(resultChainWithException.get());
} catch (Exception e) {
e.printStackTrace();
}
在这个例子中,第一个 CompletableFuture
抛出异常,这个异常会传递给 thenCompose
链中的后续操作。exceptionally
方法会捕获到这个异常,并返回默认值。
handle
方法与 exceptionally
类似,但它可以同时处理正常结果和异常情况。handle
方法接收一个 BiFunction
,第一个参数是正常结果,第二个参数是异常。例如:
CompletableFuture<Integer> futureHandleExample = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Simulated exception");
});
CompletableFuture<Integer> resultHandleExample = futureHandleExample.handle((result, ex) -> {
if (ex != null) {
System.out.println("Caught exception: " + ex.getMessage());
return -1;
} else {
return result * 2;
}
});
try {
System.out.println(resultHandleExample.get());
} catch (Exception e) {
e.printStackTrace();
}
在上述代码中,handle
方法根据是否有异常来返回不同的结果。如果有异常,返回 -1;如果没有异常,返回结果的两倍。
并行执行与 thenCompose
有时候,我们可能希望在链式异步操作中,某些部分能够并行执行,以提高整体的执行效率。虽然 thenCompose
本身主要用于链式连接异步操作,但结合 CompletableFuture
的其他方法,可以实现并行与链式操作的结合。
例如,假设我们有两个独立的异步任务A和任务B,它们可以并行执行,然后任务C依赖于任务A和任务B的结果。我们可以这样实现:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
class ParallelService {
// 模拟异步任务A
public static CompletableFuture<String> taskA() {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result of task A";
});
}
// 模拟异步任务B
public static CompletableFuture<String> taskB() {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result of task B";
});
}
// 模拟依赖于任务A和任务B结果的任务C
public static CompletableFuture<String> taskC(String resultA, String resultB) {
return CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return resultA + " and " + resultB + " combined";
});
}
}
public class ParallelThenComposeExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> resultA = ParallelService.taskA();
CompletableFuture<String> resultB = ParallelService.taskB();
CompletableFuture<String> combinedResult = CompletableFuture.allOf(resultA, resultB)
.thenApply(v -> {
try {
return taskC(resultA.get(), resultB.get()).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
System.out.println(combinedResult.get());
}
}
在上述代码中,taskA
和 taskB
并行执行。CompletableFuture.allOf(resultA, resultB)
等待 resultA
和 resultB
都完成。然后,thenApply
方法获取 taskA
和 taskB
的结果,并调用 taskC
方法,taskC
方法返回一个新的 CompletableFuture
,最终获取整个操作的结果。
总结 thenCompose 的优势
- 解耦异步任务:
thenCompose
使得我们可以将复杂的异步业务逻辑拆分成多个独立的异步任务,每个任务专注于自己的功能,提高代码的可维护性和可读性。例如在电商系统的例子中,库存查询、订单状态更新和用户通知这几个任务被清晰地分开,通过thenCompose
连接起来。 - 链式调用:它提供了一种优雅的链式调用方式,避免了传统
Future
中手动获取结果再启动下一个任务的繁琐操作。使得异步代码看起来更像是同步代码,降低了编写和理解异步代码的难度。 - 异常处理方便:结合
CompletableFuture
的异常处理方法,如exceptionally
和handle
,可以方便地在链式异步操作中处理异常,确保整个异步流程的健壮性。
通过深入理解和灵活运用 CompletableFuture
的 thenCompose
方法,我们可以更高效地编写异步代码,提升应用程序的性能和响应性,尤其是在处理复杂的异步业务逻辑时。无论是简单的链式操作还是涉及并行执行和异常处理的复杂场景,thenCompose
都为我们提供了强大的工具。