Java 中 CompletableFuture 多个任务 OR 组合关系
CompletableFuture 简介
在 Java 8 引入 CompletableFuture
之前,处理异步任务相对繁琐。CompletableFuture
提供了一种更简洁、灵活的方式来处理异步计算,它实现了 Future
和 CompletionStage
接口。Future
接口主要用于获取异步任务的结果,但它缺乏对异步任务完成时的回调处理等功能。而 CompletionStage
接口则弥补了这一不足,提供了诸如任务完成时的回调、任务组合等丰富的操作。
CompletableFuture
允许我们以链式调用的方式编写异步代码,使得异步操作的逻辑更加清晰。例如,我们可以很方便地在一个异步任务完成后接着执行另一个任务,而无需手动管理线程和复杂的同步机制。
多个任务的 OR 组合关系概述
在实际开发中,我们经常会遇到这样的场景:多个异步任务,只要其中一个任务成功完成,整个组合任务就视为成功;只有当所有任务都失败时,组合任务才被认为失败。这种关系就是多个任务的 OR 组合关系。
在 CompletableFuture
中,有几种方式来实现这种 OR 组合关系,接下来我们将详细探讨。
使用 applyToEither 方法
applyToEither
方法用于当两个 CompletableFuture
中任意一个完成时,对其结果应用给定的函数。其方法签名如下:
<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,? extends U> fn);
这里 other
是另一个 CompletableFuture
,fn
是用于处理完成的 CompletableFuture
结果的函数。
下面通过一个简单的代码示例来展示 applyToEither
的用法:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class ApplyToEitherExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 2";
});
CompletableFuture<String> combinedFuture = future1.applyToEither(future2, result -> {
System.out.println("The first completed future result is: " + result);
return result.toUpperCase();
});
System.out.println(combinedFuture.get());
}
}
在上述代码中,future1
和 future2
是两个异步任务,分别模拟了耗时 2 秒和 1 秒的操作。applyToEither
方法会等待 future1
或 future2
其中一个先完成,然后对完成的结果应用 Function
函数。在这个例子中,future2
会先完成,所以输出结果为:
The first completed future result is: Result from Future 2
RESULT FROM FUTURE 2
使用 acceptEither 方法
acceptEither
方法与 applyToEither
类似,不同之处在于它不返回处理后的结果,而是直接消费完成的 CompletableFuture
的结果。其方法签名如下:
CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> action);
这里 action
是一个 Consumer
,用于消费完成的 CompletableFuture
的结果。
以下是代码示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class AcceptEitherExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 2";
});
CompletableFuture<Void> combinedFuture = future1.acceptEither(future2, result -> {
System.out.println("The first completed future result is: " + result);
});
combinedFuture.get();
}
}
在这个示例中,acceptEither
方法等待 future1
或 future2
其中一个完成,然后将完成的结果传递给 Consumer
。由于 future2
先完成,输出结果为:
The first completed future result is: Result from Future 2
使用 runAfterEither 方法
runAfterEither
方法在两个 CompletableFuture
中任意一个完成时,执行一个无参数的 Runnable
任务。其方法签名如下:
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action);
下面是代码示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class RunAfterEitherExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 2";
});
CompletableFuture<Void> combinedFuture = future1.runAfterEither(future2, () -> {
System.out.println("One of the futures has completed.");
});
combinedFuture.get();
}
}
在这个例子中,当 future1
或 future2
任意一个完成时,就会执行 Runnable
任务,输出结果为:
One of the futures has completed.
处理多个 CompletableFuture 的 OR 关系
前面介绍的方法主要是针对两个 CompletableFuture
的情况。当我们需要处理多个 CompletableFuture
的 OR 关系时,可以通过递归或借助 Stream
来实现。
递归方式
通过递归的方式,我们可以将多个 CompletableFuture
逐步组合成 OR 关系。以下是一个简单的实现示例:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MultipleFuturesOrRecursiveExample {
public static CompletableFuture<String> orCombine(List<CompletableFuture<String>> futures) {
if (futures.size() == 1) {
return futures.get(0);
}
CompletableFuture<String> future1 = futures.get(0);
CompletableFuture<String> future2 = orCombine(futures.subList(1, futures.size()));
return future1.applyToEither(future2, result -> result);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<CompletableFuture<String>> futures = new ArrayList<>();
futures.add(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 1";
}));
futures.add(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 2";
}));
futures.add(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 3";
}));
CompletableFuture<String> combinedFuture = orCombine(futures);
System.out.println(combinedFuture.get());
}
}
在上述代码中,orCombine
方法通过递归将多个 CompletableFuture
组合成 OR 关系。在 main
方法中,创建了三个异步任务,分别模拟不同的耗时操作。最终输出的是最先完成的任务的结果,在这个例子中,future3
会最先完成,输出结果为:
Result from Future 3
使用 Stream 方式
借助 Java 8 的 Stream
API,我们可以更简洁地处理多个 CompletableFuture
的 OR 关系。以下是代码示例:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class MultipleFuturesOrStreamExample {
public static CompletableFuture<String> orCombine(List<CompletableFuture<String>> futures) {
return futures.stream()
.reduce(CompletableFuture::applyToEither)
.orElse(CompletableFuture.completedFuture(null));
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<CompletableFuture<String>> futures = new ArrayList<>();
futures.add(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 1";
}));
futures.add(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 2";
}));
futures.add(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result from Future 3";
}));
CompletableFuture<String> combinedFuture = orCombine(futures);
System.out.println(combinedFuture.get());
}
}
在这个示例中,Stream
的 reduce
方法将多个 CompletableFuture
通过 applyToEither
方法进行组合。最终输出的也是最先完成的任务的结果,同样,future3
最先完成,输出结果为:
Result from Future 3
异常处理
在处理多个任务的 OR 组合关系时,异常处理是非常重要的。当所有任务都失败时,我们需要捕获并处理异常。
在 CompletableFuture
中,我们可以使用 exceptionally
方法来处理异常。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class ExceptionHandlingExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Future 1 failed");
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Future 2 failed");
});
CompletableFuture<String> combinedFuture = future1.applyToEither(future2, result -> result)
.exceptionally(ex -> {
System.out.println("An exception occurred: " + ex.getMessage());
return "Default result";
});
System.out.println(combinedFuture.get());
}
}
在上述代码中,future1
和 future2
都抛出了异常。通过 exceptionally
方法,我们捕获了异常并返回了一个默认结果,输出结果为:
An exception occurred: Future 1 failed
Default result
实际应用场景
- 数据获取与缓存:在从多个数据源获取数据时,如果其中一个数据源成功获取到数据,就可以直接使用该数据,而不需要等待其他数据源。例如,先尝试从缓存中获取数据,如果缓存中没有,则从数据库中获取。只要缓存或数据库中有一个成功返回数据,就可以继续后续操作。
- 服务调用:当调用多个微服务获取数据时,只要其中一个微服务成功返回所需数据,就可以满足业务需求。例如,在一个电商应用中,查询商品信息时,可以同时调用库存服务和价格服务,只要其中一个服务成功返回信息,就可以展示给用户相关内容。
- 并行计算优化:在进行一些复杂的计算任务时,可以将任务分解为多个子任务并行执行,只要有一个子任务成功计算出满足条件的结果,就可以停止其他子任务,提高计算效率。
性能考虑
在使用 CompletableFuture
进行多个任务的 OR 组合时,性能是一个需要考虑的因素。虽然异步操作可以提高整体的并发性能,但过多的任务并行执行可能会导致资源消耗过大,例如线程池资源耗尽等问题。
- 线程池管理:合理配置线程池的大小,根据系统的硬件资源和任务的特性来调整线程池参数。如果任务是 I/O 密集型的,可以适当增加线程池的大小;如果是 CPU 密集型的,则需要谨慎调整,避免过多线程导致 CPU 上下文切换开销增大。
- 任务优先级:可以根据任务的重要性或预期执行时间为任务设置优先级。例如,对于一些耗时较短且重要的任务,可以优先执行,以提高整体的响应速度。
- 资源监控与调优:使用工具如 Java 自带的
jconsole
、VisualVM
等监控系统的资源使用情况,包括 CPU、内存、线程池状态等,根据监控结果进行相应的调优。
总结与注意事项
通过 CompletableFuture
的 applyToEither
、acceptEither
、runAfterEither
等方法,我们可以方便地实现多个任务的 OR 组合关系。在处理多个任务时,可以通过递归或 Stream
方式进行组合。同时,异常处理和性能优化也是不容忽视的方面。
在实际应用中,需要根据具体的业务场景和系统资源情况,合理选择实现方式,以达到最佳的性能和稳定性。并且要注意线程安全问题,避免在异步操作中出现数据竞争等问题。
希望通过本文的介绍,你对 Java
中 CompletableFuture
多个任务的 OR 组合关系有了更深入的理解和掌握,能够在实际项目中灵活运用这些知识,提升代码的质量和性能。
以上就是关于 Java
中 CompletableFuture
多个任务 OR 组合关系的详细内容,希望对你有所帮助。如果你还有其他疑问或需要进一步了解的内容,请随时提问。