Java CompletableFuture链式调用优化异步编程流程
Java CompletableFuture 链式调用基础
在 Java 的异步编程领域,CompletableFuture
是一个强大的工具。它允许我们以一种更简洁、更可控的方式编写异步代码。CompletableFuture
实现了 Future
接口和 CompletionStage
接口,这使得它既能够获取异步操作的结果(类似于传统的 Future
),又能够在异步操作完成时执行后续的操作(通过 CompletionStage
提供的丰富方法)。
CompletableFuture 的创建
在开始链式调用之前,我们首先要创建 CompletableFuture
实例。CompletableFuture
提供了多种静态方法来创建实例。
-
使用
CompletableFuture.supplyAsync
创建有返回值的异步任务CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello, CompletableFuture!"; });
在上述代码中,
supplyAsync
方法接收一个Supplier
接口的实现,它会在一个新的线程中执行Supplier
的get
方法,并返回一个CompletableFuture
,其结果就是Supplier
的返回值。 -
使用
CompletableFuture.runAsync
创建无返回值的异步任务CompletableFuture<Void> futureVoid = CompletableFuture.runAsync(() -> { // 模拟耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("This is a void CompletableFuture task."); });
runAsync
方法接收一个Runnable
接口的实现,它同样会在新线程中执行Runnable
的run
方法,但返回的CompletableFuture
的结果类型是Void
,因为Runnable
本身没有返回值。
基本的链式调用方法
-
thenApply
thenApply
方法用于在CompletableFuture
完成时,对其结果进行转换。它接收一个Function
接口的实现,该Function
的输入是前一个CompletableFuture
的结果,输出是新的CompletableFuture
的结果。CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> s + ", World") .thenApply(String::toUpperCase) .thenAccept(System.out::println);
在上述代码中,首先通过
supplyAsync
创建一个返回 "Hello" 的CompletableFuture
。然后使用thenApply
将结果转换为 "Hello, World",接着再次使用thenApply
将其转换为大写的 "HELLO, WORLD",最后使用thenAccept
打印结果。thenAccept
方法接收一个Consumer
接口的实现,用于在CompletableFuture
完成时消费其结果,但不返回新的CompletableFuture
。 -
thenCompose
thenCompose
方法与thenApply
类似,但它接收的Function
返回的是另一个CompletableFuture
。这在需要进行异步操作的链式调用时非常有用。CompletableFuture.supplyAsync(() -> 10) .thenCompose(i -> CompletableFuture.supplyAsync(() -> i * 2)) .thenAccept(System.out::println);
这里首先创建一个返回 10 的
CompletableFuture
,然后通过thenCompose
,将 10 作为输入,创建并返回一个新的CompletableFuture
,其结果是 10 乘以 2,即 20。最后使用thenAccept
打印结果。如果使用thenApply
代替thenCompose
,返回的将是一个CompletableFuture<CompletableFuture<Integer>>
,而不是直接的CompletableFuture<Integer>
,这会使后续处理变得复杂。 -
thenCombine
thenCombine
方法用于将两个CompletableFuture
的结果进行合并。它接收另一个CompletableFuture
和一个BiFunction
,BiFunction
的两个输入分别是两个CompletableFuture
的结果,输出是新的CompletableFuture
的结果。CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 3); future1.thenCombine(future2, (a, b) -> a + b) .thenAccept(System.out::println);
在上述代码中,
future1
返回 5,future2
返回 3。通过thenCombine
,将两个结果相加得到 8,并打印出来。
CompletableFuture 链式调用的高级应用
处理异常
在异步编程中,异常处理是至关重要的。CompletableFuture
提供了多种方法来优雅地处理异常。
-
exceptionally
exceptionally
方法用于在CompletableFuture
发生异常时,提供一个默认值或进行异常处理。它接收一个Function
,该Function
的输入是异常,输出是一个替代值。CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("Simulated exception"); } return "Success"; }).exceptionally(ex -> { System.out.println("Caught exception: " + ex.getMessage()); return "Default value"; }).thenAccept(System.out::println);
在上述代码中,有 50% 的概率会抛出一个运行时异常。通过
exceptionally
,当异常发生时,会捕获异常并打印异常信息,同时返回一个默认值 "Default value" 并打印。 -
handle
handle
方法可以同时处理正常结果和异常情况。它接收一个BiFunction
,第一个参数是正常结果(如果没有异常),第二个参数是异常(如果有异常)。CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("Simulated exception"); } return "Success"; }).handle((result, ex) -> { if (ex != null) { System.out.println("Caught exception: " + ex.getMessage()); return "Default value"; } return result; }).thenAccept(System.out::println);
这里
handle
方法在异常发生时进行处理并返回默认值,在没有异常时直接返回正常结果。与exceptionally
相比,handle
提供了更全面的处理方式,因为它可以同时处理正常和异常情况。
并行异步操作
-
allOf
allOf
方法用于等待多个CompletableFuture
都完成。它接收多个CompletableFuture
作为参数,并返回一个新的CompletableFuture<Void>
,当所有传入的CompletableFuture
都完成时,这个新的CompletableFuture
才完成。CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "A"; }); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "B"; }); CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureA, futureB); allFuture.thenRun(() -> { try { System.out.println("All futures completed. Result of A: " + futureA.get()); System.out.println("Result of B: " + futureB.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }).join();
在上述代码中,
futureA
和futureB
是两个异步任务,分别模拟 2 秒和 1 秒的耗时操作。通过allOf
创建一个新的CompletableFuture<Void>
,当futureA
和futureB
都完成时,allFuture
才完成。然后通过thenRun
在所有任务完成后打印结果。 -
anyOf
anyOf
方法与allOf
相反,它等待多个CompletableFuture
中的任意一个完成。它接收多个CompletableFuture
作为参数,并返回一个新的CompletableFuture
,这个新的CompletableFuture
的结果就是第一个完成的CompletableFuture
的结果。CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "C"; }); CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } return "D"; }); CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futureC, futureD); anyFuture.thenAccept(result -> System.out.println("First completed result: " + result)).join();
这里
futureC
和futureD
是两个异步任务,futureD
会先完成。通过anyOf
创建的anyFuture
会在futureD
完成时就完成,其结果就是futureD
的结果 "D",并打印出来。
CompletableFuture 链式调用的性能优化
线程池的合理使用
默认情况下,CompletableFuture
的异步操作使用 ForkJoinPool.commonPool()
线程池。然而,在某些场景下,这可能不是最优的选择。
-
自定义线程池 当我们有大量的异步任务,并且这些任务的性质不同(例如 I/O 密集型和 CPU 密集型任务混合)时,使用自定义线程池可以提高性能。
ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task result"; }, executor) .thenApply(s -> s + " processed") .thenAccept(System.out::println); executor.shutdown();
在上述代码中,我们创建了一个固定大小为 10 的线程池
executor
。通过supplyAsync
的第二个参数将异步任务提交到这个自定义线程池。这样可以根据任务的特点合理分配线程资源,避免ForkJoinPool.commonPool()
可能出现的线程饥饿等问题。 -
线程池大小的调整 线程池大小的选择需要根据任务的类型和系统资源来决定。对于 CPU 密集型任务,线程池大小一般设置为 CPU 核心数加 1,以充分利用 CPU 资源并考虑到可能的线程上下文切换开销。对于 I/O 密集型任务,线程池大小可以设置得较大,因为 I/O 操作会使线程处于等待状态,不会占用 CPU 资源,更多的线程可以提高整体的吞吐量。
int cpuCoreCount = Runtime.getRuntime().availableProcessors(); ExecutorService cpuIntensiveExecutor = Executors.newFixedThreadPool(cpuCoreCount + 1); ExecutorService ioIntensiveExecutor = Executors.newFixedThreadPool(cpuCoreCount * 2);
在实际应用中,可以根据任务的类型选择合适的线程池来执行
CompletableFuture
的异步任务。
减少不必要的链式调用
虽然 CompletableFuture
的链式调用提供了简洁的编程方式,但过多的不必要链式调用可能会导致性能问题。
-
合并操作 如果多个
thenApply
操作只是简单的转换,并且可以合并为一个操作,那么应该进行合并。// 原始方式 CompletableFuture.supplyAsync(() -> "hello") .thenApply(String::toUpperCase) .thenApply(s -> s + " WORLD") .thenApply(String::trim) .thenAccept(System.out::println); // 优化方式 CompletableFuture.supplyAsync(() -> "hello") .thenApply(s -> { s = s.toUpperCase(); s = s + " WORLD"; return s.trim(); }) .thenAccept(System.out::println);
在上述代码中,优化后的方式将多个
thenApply
操作合并为一个,减少了中间CompletableFuture
的创建和不必要的线程切换,从而提高了性能。 -
避免过度嵌套 过度嵌套的
CompletableFuture
链式调用可能会使代码难以理解和维护,同时也可能影响性能。例如,在多层嵌套的thenApply
或thenCompose
中,如果可以通过其他方式简化,应该进行优化。// 过度嵌套 CompletableFuture.supplyAsync(() -> 1) .thenApply(i -> i * 2) .thenCompose(j -> CompletableFuture.supplyAsync(() -> j + 3)) .thenApply(k -> k * 4) .thenCompose(l -> CompletableFuture.supplyAsync(() -> l - 5)) .thenAccept(System.out::println); // 优化方式 CompletableFuture.supplyAsync(() -> { int result = 1; result = result * 2; result = result + 3; result = result * 4; result = result - 5; return result; }).thenAccept(System.out::println);
优化后的方式将复杂的嵌套操作合并为一个异步任务,减少了中间的异步操作和线程切换,提高了性能和代码的可读性。
CompletableFuture 链式调用在实际项目中的应用场景
微服务间的异步调用
在微服务架构中,经常需要调用多个微服务接口,并对结果进行组合或处理。CompletableFuture
的链式调用可以很好地满足这种需求。
-
聚合数据 假设我们有一个电商系统,需要从商品微服务获取商品信息,从库存微服务获取库存信息,并将两者聚合展示给用户。
CompletableFuture<Product> productFuture = CompletableFuture.supplyAsync(() -> { // 调用商品微服务获取商品信息 return getProductFromMicroservice(); }); CompletableFuture<Integer> stockFuture = CompletableFuture.supplyAsync(() -> { // 调用库存微服务获取库存信息 return getStockFromMicroservice(); }); productFuture.thenCombine(stockFuture, (product, stock) -> { product.setStock(stock); return product; }).thenAccept(productWithStock -> { // 展示给用户 displayProduct(productWithStock); });
在上述代码中,通过
CompletableFuture
分别异步调用商品微服务和库存微服务,然后使用thenCombine
将两个结果聚合,最后展示给用户。 -
级联调用 有时候微服务间的调用是级联的,例如订单微服务调用支付微服务,支付微服务调用银行接口。
CompletableFuture<PaymentResult> paymentFuture = CompletableFuture.supplyAsync(() -> { // 调用支付微服务 return callPaymentMicroservice(); }); paymentFuture.thenCompose(paymentResult -> CompletableFuture.supplyAsync(() -> { if (paymentResult.isSuccess()) { // 调用银行接口确认支付 return callBankInterface(paymentResult); } return new BankResponse(false, "Payment failed"); })).thenAccept(bankResponse -> { // 处理银行响应 handleBankResponse(bankResponse); });
这里通过
thenCompose
实现了级联调用,只有当支付微服务调用成功时才会调用银行接口。
大数据处理
在大数据处理场景中,经常需要对大量数据进行异步处理和结果合并。
-
并行计算 假设有一个任务是对一个大数据集进行并行计算,例如计算每个数据的平方并求和。
List<Integer> dataList = Arrays.asList(1, 2, 3, 4, 5); List<CompletableFuture<Integer>> futureList = new ArrayList<>(); for (int data : dataList) { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> data * data); futureList.add(future); } CompletableFuture<Integer> sumFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])) .thenApply(v -> futureList.stream() .mapToInt(f -> { try { return f.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return 0; } }) .sum()); sumFuture.thenAccept(sum -> System.out.println("Sum of squares: " + sum)).join();
在上述代码中,首先为每个数据创建一个异步计算平方的
CompletableFuture
,并添加到列表中。然后使用allOf
等待所有计算完成,接着通过thenApply
计算所有平方值的总和。 -
数据转换和合并 对于从不同数据源获取的数据进行转换和合并,
CompletableFuture
链式调用也非常有用。CompletableFuture<List<String>> dataSource1Future = CompletableFuture.supplyAsync(() -> { // 从数据源 1 获取数据 return getDataSource1(); }); CompletableFuture<List<String>> dataSource2Future = CompletableFuture.supplyAsync(() -> { // 从数据源 2 获取数据 return getDataSource2(); }); dataSource1Future.thenCombine(dataSource2Future, (list1, list2) -> { List<String> combinedList = new ArrayList<>(list1); combinedList.addAll(list2); return combinedList; }).thenApply(CompletableFutureChain::transformData) .thenAccept(transformedData -> { // 处理转换后的数据 processTransformedData(transformedData); });
这里通过
thenCombine
将两个数据源的数据合并,然后通过thenApply
对合并后的数据进行转换,最后处理转换后的数据。
通过以上对 CompletableFuture
链式调用的基础、高级应用、性能优化以及实际应用场景的详细介绍,相信你对如何利用 CompletableFuture
优化异步编程流程有了更深入的理解和掌握。在实际项目中,合理运用 CompletableFuture
的链式调用,可以提高代码的可读性、可维护性以及系统的性能和响应速度。