Java 中 CompletableFuture 多个任务 AnyOf 关系
Java 中 CompletableFuture 的 AnyOf 关系详解
CompletableFuture 简介
在 Java 8 引入 CompletableFuture 之前,处理异步任务相对复杂,往往需要依赖诸如 Future 接口以及手动线程管理等方式。Future 接口提供了一种异步计算结果获取的方式,但它存在局限性,例如无法方便地对异步结果进行链式操作、组合多个异步任务等。
CompletableFuture 实现了 Future 接口,并在此基础上提供了丰富的方法来支持异步任务的链式调用、组合、异常处理等操作。它基于 Fork/Join 框架和异步回调机制,使得编写异步代码变得更加简洁和高效。CompletableFuture 可以在任务完成时自动触发后续操作,无需开发者手动轮询 Future 的状态。
AnyOf 关系的概念
CompletableFuture 的 anyOf
方法用于创建一个新的 CompletableFuture,这个新的 CompletableFuture 会在多个给定的 CompletableFuture 中 任意一个 完成时就完成,其结果为第一个完成的 CompletableFuture 的结果。如果任意一个任务因异常而完成,那么 anyOf
返回的 CompletableFuture 也会以该异常完成。
方法签名
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
该方法接受一个可变参数,参数类型为 CompletableFuture<?>,返回一个新的 CompletableFuture,其泛型类型为 Object,因为它的结果可能是传入的任意一个 CompletableFuture 的结果类型。
简单示例代码
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAnyOfExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 1 completed";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 2 completed";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
System.out.println(anyOfFuture.get());
}
}
在上述代码中,future1
和 future2
是两个异步任务,分别模拟了耗时 2 秒和 1 秒的操作。CompletableFuture.anyOf(future1, future2)
创建了一个新的 CompletableFuture anyOfFuture
。anyOfFuture.get()
会阻塞直到 future1
或 future2
任意一个完成。由于 future2
耗时更短,所以通常会输出 "Future 2 completed"
。
异常处理
当参与 anyOf
的 CompletableFuture 中有一个抛出异常时,anyOf
返回的 CompletableFuture 也会以该异常完成。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAnyOfExceptionExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Future 1 exception");
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 2 completed";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
anyOfFuture.exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return null;
}).thenAccept(result -> {
if (result != null) {
System.out.println("Result: " + result);
}
});
}
}
在这个例子中,future1
会抛出一个运行时异常。anyOfFuture
会以 future1
的异常完成,通过 exceptionally
方法可以捕获并处理这个异常。
与其他 CompletableFuture 方法组合使用
- 与 thenApply 组合
可以在
anyOf
返回的 CompletableFuture 上使用thenApply
方法对结果进行进一步处理。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAnyOfThenApplyExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 1 completed";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 2 completed";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
CompletableFuture<String> processedFuture = anyOfFuture.thenApply(result -> {
return "Processed: " + result;
});
System.out.println(processedFuture.get());
}
}
这里,thenApply
方法对 anyOfFuture
的结果进行了处理,在结果前添加了 "Processed: "
。
- 与 thenAccept 组合
thenAccept
方法用于在 CompletableFuture 完成时接受结果并执行副作用操作。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAnyOfThenAcceptExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 1 completed";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 2 completed";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
anyOfFuture.thenAccept(result -> {
System.out.println("Received result: " + result);
});
// 为了确保主线程不会提前退出
Thread.sleep(3000);
}
}
在这个示例中,thenAccept
方法在 anyOfFuture
完成时打印出结果。由于主线程可能会提前退出,这里使用 Thread.sleep
确保主线程等待异步任务完成。
AnyOf 关系的原理
从实现层面来看,anyOf
方法会为每个传入的 CompletableFuture 注册一个完成监听器。当任意一个 CompletableFuture 完成时,其完成监听器会被触发。这个监听器会尝试完成 anyOf
返回的 CompletableFuture,并将完成的 CompletableFuture 的结果或异常传递给它。
在 CompletableFuture 的内部实现中,使用了基于状态的机制。每个 CompletableFuture 有一个状态字段,用于表示其当前状态,如未开始、进行中、正常完成、异常完成等。当一个 CompletableFuture 完成时,它会更新自己的状态,并触发所有注册的监听器。anyOf
方法注册的监听器会检查 anyOf
返回的 CompletableFuture 的状态,如果它还未完成,就会根据完成的 CompletableFuture 的结果或异常来完成 anyOf
返回的 CompletableFuture。
在实际应用场景中的使用
- 资源获取 假设一个应用程序需要从多个数据源获取数据,但只要获取到其中一个数据源的数据就可以继续后续操作。例如,应用程序可以同时从本地缓存和远程服务器获取用户数据,只要其中一个数据源返回数据,就可以使用该数据进行处理。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAnyOfResourceExample {
public static CompletableFuture<String> getFromLocalCache() {
return CompletableFuture.supplyAsync(() -> {
// 模拟从本地缓存获取数据
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Data from local cache";
});
}
public static CompletableFuture<String> getFromRemoteServer() {
return CompletableFuture.supplyAsync(() -> {
// 模拟从远程服务器获取数据
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Data from remote server";
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(getFromLocalCache(), getFromRemoteServer());
System.out.println(anyOfFuture.get());
}
}
在这个例子中,getFromLocalCache
和 getFromRemoteServer
分别模拟从本地缓存和远程服务器获取数据。anyOf
方法确保只要有一个数据源返回数据,就可以继续后续操作。
- 服务调用优化 在微服务架构中,可能存在多个服务提供相似的功能。可以同时调用这些服务,只要其中一个服务成功响应,就可以使用其结果,从而提高系统的响应速度。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAnyOfMicroserviceExample {
public static CompletableFuture<String> callService1() {
return CompletableFuture.supplyAsync(() -> {
// 模拟服务1调用
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Response from service 1";
});
}
public static CompletableFuture<String> callService2() {
return CompletableFuture.supplyAsync(() -> {
// 模拟服务2调用
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Response from service 2";
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(callService1(), callService2());
System.out.println(anyOfFuture.get());
}
}
这里,callService1
和 callService2
模拟两个提供相似功能的微服务调用。anyOf
方法使得只要有一个服务响应,就可以获取结果并继续处理。
性能考虑
虽然 anyOf
提供了一种高效的方式来组合多个异步任务,但在使用时也需要考虑性能问题。由于它会同时启动所有传入的 CompletableFuture,可能会消耗较多的系统资源,尤其是当任务数量较多或任务本身比较耗时的情况下。
为了优化性能,可以根据具体场景对任务进行合理的筛选和分组。例如,如果知道某些任务通常会更快完成,可以将它们优先放在 anyOf
的参数列表中。另外,对于资源消耗较大的任务,可以考虑使用线程池来控制并发度,避免系统资源过度消耗。
与 AllOf 关系的对比
- 完成条件
anyOf
:只要其中一个 CompletableFuture 完成,返回的 CompletableFuture 就完成。allOf
:所有传入的 CompletableFuture 都完成,返回的 CompletableFuture 才完成。如果其中任何一个 CompletableFuture 抛出异常,allOf
返回的 CompletableFuture 也会以该异常完成。
- 返回结果
anyOf
:返回第一个完成的 CompletableFuture 的结果。allOf
:返回的 CompletableFuture 的结果为null
,主要用于等待所有任务完成,通常用于执行一些所有任务完成后才需要的操作,如汇总所有任务的结果等。
总结与注意事项
CompletableFuture 的 anyOf
方法为处理多个异步任务的“或”关系提供了便捷的方式。它允许我们在多个异步任务中只要有一个完成就可以继续后续操作,适用于多种实际应用场景,如资源获取、服务调用优化等。
在使用 anyOf
时,需要注意异常处理,确保能够正确捕获和处理任务中抛出的异常。同时,要考虑性能问题,合理安排任务和控制并发度,避免过度消耗系统资源。与 allOf
等其他 CompletableFuture 方法相比,要清楚它们之间的区别,根据具体需求选择合适的方法来组合异步任务。
通过深入理解和合理使用 CompletableFuture 的 anyOf
关系,开发者可以编写出更加高效、灵活的异步代码,提升应用程序的性能和响应能力。
希望通过本文的详细介绍和示例代码,你对 Java 中 CompletableFuture 的 AnyOf 关系有了更深入的理解和掌握。在实际开发中,根据具体需求灵活运用这一特性,能够显著提升代码的异步处理能力。