Java 中 CompletableFuture 多个任务 AND 组合关系
Java 中 CompletableFuture 多个任务 AND 组合关系
在现代的 Java 编程中,异步编程变得越来越重要。CompletableFuture
是 Java 8 引入的一个强大工具,它使得异步编程更加简洁和高效。当处理多个异步任务时,常常会遇到需要多个任务都成功完成后再进行下一步操作的场景,这就是所谓的多个任务的 AND 组合关系。
CompletableFuture 基础回顾
在深入探讨多个任务的 AND 组合关系之前,先简单回顾一下 CompletableFuture
的基础知识。CompletableFuture
实现了 Future
和 CompletionStage
接口,它代表一个异步计算的结果。
创建 CompletableFuture
-
使用
CompletableFuture.supplyAsync
创建有返回值的异步任务CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { // 模拟一个耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 1 completed"; });
在上述代码中,
supplyAsync
方法接受一个Supplier
作为参数,在新的线程中执行这个Supplier
的get
方法,并返回一个CompletableFuture
,其结果就是Supplier
的返回值。 -
使用
CompletableFuture.runAsync
创建无返回值的异步任务CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { // 模拟一个耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task 2 completed"); });
runAsync
方法接受一个Runnable
作为参数,同样在新线程中执行Runnable
的run
方法,返回的CompletableFuture
的结果类型为Void
。
获取 CompletableFuture 的结果
-
使用
get
方法阻塞获取结果try { String result = future1.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
get
方法会阻塞当前线程,直到CompletableFuture
完成并返回结果。如果CompletableFuture
执行过程中抛出异常,get
方法会将异常包装成ExecutionException
抛出。 -
使用
join
方法阻塞获取结果String result = future1.join(); System.out.println(result);
join
方法与get
方法类似,也是阻塞当前线程获取结果。不同之处在于,join
方法不会抛出InterruptedException
和ExecutionException
,如果CompletableFuture
执行过程中抛出异常,join
方法会将异常直接抛出。
多个 CompletableFuture 的 AND 组合关系
在实际应用中,常常需要多个异步任务都完成后再进行下一步操作。CompletableFuture
提供了几种方法来实现这种 AND 组合关系。
thenCombine 方法
thenCombine
方法用于将两个 CompletableFuture
的结果进行合并。当两个 CompletableFuture
都完成时,会执行 thenCombine
方法中的 BiFunction
。
-
示例代码
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 3"; }); CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 4"; }); CompletableFuture<String> combinedFuture = future3.thenCombine(future4, (result3, result4) -> { return result3 + " and " + result4; }); try { String finalResult = combinedFuture.get(); System.out.println(finalResult); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
在上述代码中,
future3
和future4
是两个异步任务。thenCombine
方法将这两个任务的结果合并,BiFunction
中的参数result3
和result4
分别是future3
和future4
的结果。最后通过get
方法获取合并后的结果并打印。 -
原理分析
thenCombine
方法返回一个新的CompletableFuture
,这个新的CompletableFuture
会在future3
和future4
都完成时被触发。当future3
和future4
中的任何一个发生异常时,新的CompletableFuture
也会以异常状态结束,并且get
方法会抛出相应的异常。
thenAcceptBoth 方法
thenAcceptBoth
方法与 thenCombine
方法类似,不同之处在于 thenAcceptBoth
方法不返回合并后的结果,而是直接消费两个 CompletableFuture
的结果。
-
示例代码
CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 5"; }); CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 6"; }); CompletableFuture<Void> voidFuture = future5.thenAcceptBoth(future6, (result5, result6) -> { System.out.println(result5 + " and " + result6); }); try { voidFuture.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
在这个例子中,
future5
和future6
是两个异步任务。thenAcceptBoth
方法中的BiConsumer
直接消费future5
和future6
的结果,而不返回新的结果。最后通过get
方法等待任务完成。 -
原理分析
thenAcceptBoth
方法返回一个CompletableFuture<Void>
,当future5
和future6
都完成时,BiConsumer
会被执行。如果future5
或future6
发生异常,返回的CompletableFuture<Void>
也会以异常状态结束,get
方法会抛出相应的异常。
allOf 方法
allOf
方法用于等待所有的 CompletableFuture
都完成。它返回一个新的 CompletableFuture<Void>
,当所有传入的 CompletableFuture
都正常完成时,这个新的 CompletableFuture
也会正常完成;如果其中任何一个 CompletableFuture
发生异常,新的 CompletableFuture
会以异常状态结束。
-
示例代码
CompletableFuture<String> future7 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 7"; }); CompletableFuture<String> future8 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 8"; }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future7, future8); allOfFuture.join(); try { String result7 = future7.get(); String result8 = future8.get(); System.out.println(result7 + " and " + result8); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
在上述代码中,
allOf
方法接受多个CompletableFuture
作为参数,并返回一个新的CompletableFuture<Void>
。通过join
方法等待所有任务完成,然后分别获取future7
和future8
的结果并打印。 -
原理分析
allOf
方法返回的CompletableFuture<Void>
会在所有传入的CompletableFuture
都完成时完成。它内部使用了一种计数机制,每当一个传入的CompletableFuture
完成时,计数减一。当计数为 0 时,表示所有任务都完成,此时返回的CompletableFuture<Void>
也会完成。如果在这个过程中任何一个CompletableFuture
发生异常,返回的CompletableFuture<Void>
会立即以异常状态结束。
处理异常情况
在处理多个 CompletableFuture
的 AND 组合关系时,异常处理是非常重要的。
单个 CompletableFuture 异常处理
- 使用
exceptionally
方法处理异常
在上述代码中,CompletableFuture<String> future9 = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("Simulated exception"); } return "Normal result"; }).exceptionally(ex -> { System.out.println("Caught exception: " + ex.getMessage()); return "Default value"; }); try { String result = future9.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
exceptionally
方法接受一个Function
作为参数,当CompletableFuture
发生异常时,这个Function
会被执行,返回一个默认值。
多个 CompletableFuture AND 组合时的异常处理
-
thenCombine
和thenAcceptBoth
方法的异常处理 如前文所述,当thenCombine
或thenAcceptBoth
中的任何一个CompletableFuture
发生异常时,返回的新CompletableFuture
也会以异常状态结束。可以通过exceptionally
方法处理这个异常。CompletableFuture<String> future10 = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("Exception in Future 10"); } return "Result from Future 10"; }); CompletableFuture<String> future11 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 11"; }); CompletableFuture<String> combinedFuture2 = future10.thenCombine(future11, (result10, result11) -> { return result10 + " and " + result11; }).exceptionally(ex -> { System.out.println("Caught exception: " + ex.getMessage()); return "Default combined result"; }); try { String finalResult = combinedFuture2.get(); System.out.println(finalResult); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
在这个例子中,
future10
有可能抛出异常,thenCombine
方法返回的combinedFuture2
可以通过exceptionally
方法处理异常并返回默认值。 -
allOf
方法的异常处理 对于allOf
方法返回的CompletableFuture<Void>
,同样可以通过exceptionally
方法处理异常。CompletableFuture<String> future12 = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("Exception in Future 12"); } return "Result from Future 12"; }); CompletableFuture<String> future13 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 13"; }); CompletableFuture<Void> allOfFuture2 = CompletableFuture.allOf(future12, future13).exceptionally(ex -> { System.out.println("Caught exception: " + ex.getMessage()); return null; }); allOfFuture2.join(); try { String result12 = future12.get(); String result13 = future13.get(); System.out.println(result12 + " and " + result13); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
在上述代码中,
allOfFuture2
是allOf
方法返回的CompletableFuture<Void>
,通过exceptionally
方法处理可能发生的异常。
应用场景
多个 CompletableFuture
的 AND 组合关系在实际开发中有很多应用场景。
微服务调用
在微服务架构中,一个业务操作可能需要调用多个微服务。例如,一个订单处理系统可能需要同时调用库存微服务和用户信息微服务。只有当这两个微服务都成功返回结果后,才能继续进行订单的处理。
CompletableFuture<InventoryInfo> inventoryFuture = CompletableFuture.supplyAsync(() -> {
// 调用库存微服务
return inventoryService.getInventoryInfo();
});
CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> {
// 调用用户信息微服务
return userService.getUserInfo();
});
CompletableFuture<Void> orderProcessFuture = inventoryFuture.thenAcceptBoth(userFuture, (inventoryInfo, userInfo) -> {
// 根据库存信息和用户信息处理订单
orderService.processOrder(inventoryInfo, userInfo);
});
try {
orderProcessFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
数据聚合
在数据处理中,可能需要从多个数据源获取数据,然后将这些数据聚合起来。例如,从一个数据库获取用户基本信息,从另一个数据库获取用户的交易记录,只有当这两个数据源的数据都获取成功后,才能生成完整的用户报告。
CompletableFuture<UserBasicInfo> basicInfoFuture = CompletableFuture.supplyAsync(() -> {
// 从数据库获取用户基本信息
return userBasicInfoDao.getUserBasicInfo();
});
CompletableFuture<UserTransactionRecord> transactionFuture = CompletableFuture.supplyAsync(() -> {
// 从数据库获取用户交易记录
return userTransactionRecordDao.getUserTransactionRecord();
});
CompletableFuture<UserReport> reportFuture = basicInfoFuture.thenCombine(transactionFuture, (basicInfo, transactionRecord) -> {
// 生成用户报告
return reportGenerator.generateReport(basicInfo, transactionRecord);
});
try {
UserReport report = reportFuture.get();
System.out.println(report);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
性能优化与注意事项
-
线程池的使用 在使用
CompletableFuture
时,默认情况下,supplyAsync
和runAsync
方法会使用ForkJoinPool.commonPool()
。对于一些需要大量异步任务的场景,commonPool
可能会出现线程饥饿等性能问题。可以通过创建自定义的Executor
来解决这个问题。ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<String> future14 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 14"; }, executor);
在上述代码中,创建了一个固定大小为 10 的线程池,并将其作为参数传递给
supplyAsync
方法。 -
避免不必要的阻塞 虽然
CompletableFuture
提供了get
和join
等阻塞方法,但在实际应用中,应尽量避免在异步任务链中过早地使用这些方法,以免失去异步编程的优势。可以通过链式调用的方式,让CompletableFuture
自动处理任务的依赖关系。CompletableFuture<String> future15 = CompletableFuture.supplyAsync(() -> "Initial result") .thenApply(result -> result + " processed") .thenApply(finalResult -> { System.out.println(finalResult); return finalResult; });
在这个例子中,通过链式调用
thenApply
方法,避免了中间使用阻塞方法。 -
异常处理的一致性 在处理多个
CompletableFuture
的 AND 组合关系时,异常处理应该保持一致性。无论是单个CompletableFuture
还是组合后的CompletableFuture
,都应该有合理的异常处理机制,以确保程序的健壮性。 -
资源管理 在异步任务中,如果涉及到资源的获取和释放,如数据库连接、文件句柄等,要确保在任务完成后正确地释放资源。可以使用
try - finally
块或者 Java 7 引入的try - with - resources
语法来管理资源。CompletableFuture<String> future16 = CompletableFuture.supplyAsync(() -> { Connection connection = null; try { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password"); // 使用连接进行数据库操作 return "Database operation result"; } catch (SQLException e) { e.printStackTrace(); return "Error result"; } finally { if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } });
在上述代码中,通过
try - finally
块确保数据库连接在任务完成后被正确关闭。
通过合理使用 CompletableFuture
的多个任务 AND 组合关系,可以大大提高 Java 程序的异步处理能力和性能,使代码更加简洁和易于维护。在实际应用中,要根据具体的业务场景和性能需求,选择合适的方法和策略来处理异步任务。同时,要注意异常处理、资源管理等方面的问题,以确保程序的健壮性和稳定性。
希望通过本文的介绍,你对 Java 中 CompletableFuture
的多个任务 AND 组合关系有了更深入的理解,并能够在实际项目中灵活运用。