Java 中 CompletableFuture 异步任务结果传递
Java 中 CompletableFuture 异步任务结果传递
CompletableFuture 简介
在 Java 并发编程领域,CompletableFuture
是 Java 8 引入的一个强大工具,它提供了一种异步处理任务并获取结果的机制。CompletableFuture
实现了 Future
接口和 CompletionStage
接口,不仅能像传统 Future
那样获取异步任务的结果,还支持更丰富的异步操作组合与结果传递方式,使得异步编程更加灵活和高效。
CompletableFuture
的设计理念是让开发者能够以一种链式调用的方式处理异步任务,这种方式与传统的基于回调或者轮询获取 Future
结果的方式相比,代码结构更加清晰,可读性更强。
创建 CompletableFuture
-
使用
CompletableFuture.supplyAsync
创建有返回值的异步任务CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟一个耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务执行完成"; });
在上述代码中,
supplyAsync
方法接受一个Supplier
类型的参数,该参数定义了异步任务的具体逻辑。supplyAsync
方法会在一个默认的ForkJoinPool.commonPool()
线程池中异步执行任务,并返回一个CompletableFuture
对象,通过这个对象可以获取任务的执行结果。 -
使用
CompletableFuture.runAsync
创建无返回值的异步任务CompletableFuture<Void> futureVoid = CompletableFuture.runAsync(() -> { // 模拟一个耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("无返回值任务执行完成"); });
runAsync
方法接受一个Runnable
类型的参数,用于定义异步执行的任务。由于Runnable
没有返回值,所以runAsync
返回的CompletableFuture
的泛型类型为Void
。
获取 CompletableFuture 的结果
-
使用
get
方法阻塞获取结果CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务执行完成"; }); try { String result = future.get(); System.out.println("获取到的结果: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
get
方法会阻塞当前线程,直到CompletableFuture
完成任务并返回结果。如果任务执行过程中抛出异常,get
方法会将异常包装成ExecutionException
重新抛出,同时还可能抛出InterruptedException
表示当前线程在等待过程中被中断。 -
使用
get(long timeout, TimeUnit unit)
方法设置超时获取结果CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务执行完成"; }); try { String result = future.get(2, TimeUnit.SECONDS); System.out.println("获取到的结果: " + result); } catch (InterruptedException | ExecutionException | TimeoutException e) { if (e instanceof TimeoutException) { System.out.println("获取结果超时"); } else { e.printStackTrace(); } }
此方法在指定的时间内等待
CompletableFuture
完成任务并返回结果。如果超过指定时间任务仍未完成,会抛出TimeoutException
。 -
使用
join
方法阻塞获取结果(不抛出受检异常)CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务执行完成"; }); String result = future.join(); System.out.println("获取到的结果: " + result);
join
方法与get
方法类似,都会阻塞当前线程直到获取到结果。但不同的是,join
方法不会抛出InterruptedException
和ExecutionException
这两个受检异常,而是将ExecutionException
中的原始异常直接抛出,将InterruptedException
包装成CompletionException
抛出。
异步任务结果传递
-
使用
thenApply
方法对结果进行转换CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "原始结果") .thenApply(result -> result + " 转换后"); try { String finalResult = future.get(); System.out.println(finalResult); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
thenApply
方法接受一个Function
类型的参数,该函数会在CompletableFuture
完成任务后被调用,传入任务的结果,并返回一个新的结果。thenApply
方法返回一个新的CompletableFuture
,其结果为函数转换后的结果。 -
使用
thenAccept
方法消费结果CompletableFuture.supplyAsync(() -> "任务结果") .thenAccept(result -> System.out.println("消费结果: " + result)); // 为了确保主线程不退出,添加如下代码 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
thenAccept
方法接受一个Consumer
类型的参数,该消费者会在CompletableFuture
完成任务后被调用,传入任务的结果,但不返回新的结果。thenAccept
方法返回的CompletableFuture
的结果为Void
。 -
使用
thenRun
方法执行后续无结果任务CompletableFuture.supplyAsync(() -> "任务完成") .thenRun(() -> System.out.println("后续无结果任务执行")); // 为了确保主线程不退出,添加如下代码 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
thenRun
方法接受一个Runnable
类型的参数,该任务会在CompletableFuture
完成任务后被调用,不接受任务的结果,也不返回新的结果。thenRun
方法返回的CompletableFuture
的结果为Void
。
多个 CompletableFuture 组合与结果传递
-
使用
thenCompose
方法组合两个 CompletableFutureCompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "第一个任务结果"); CompletableFuture<String> future2 = future1.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " 组合后")); try { String finalResult = future2.get(); System.out.println(finalResult); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
thenCompose
方法接受一个函数,该函数接受前一个CompletableFuture
的结果,并返回一个新的CompletableFuture
。它会将前一个CompletableFuture
的结果传递给函数,然后将函数返回的CompletableFuture
与前一个CompletableFuture
进行组合,最终返回组合后的CompletableFuture
。 -
使用
thenCombine
方法合并两个 CompletableFuture 的结果CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2"); CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (r1, r2) -> r1 + " 和 " + r2); try { String result = combinedFuture.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
thenCombine
方法接受另一个CompletableFuture
和一个BiFunction
。当两个CompletableFuture
都完成时,BiFunction
会被调用,传入两个CompletableFuture
的结果,并返回一个新的结果。thenCombine
方法返回一个新的CompletableFuture
,其结果为BiFunction
合并后的结果。 -
使用
allOf
方法等待所有 CompletableFuture 完成CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务1完成"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务2完成"; }); CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2); allFuture.join(); try { String result1 = future1.get(); String result2 = future2.get(); System.out.println(result1); System.out.println(result2); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
allOf
方法接受多个CompletableFuture
作为参数,返回一个新的CompletableFuture
。这个新的CompletableFuture
会在所有传入的CompletableFuture
都完成时完成,其结果为Void
。通过join
方法等待所有任务完成后,可以分别获取每个CompletableFuture
的结果。 -
使用
anyOf
方法等待任意一个 CompletableFuture 完成CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务1完成"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务2完成"; }); CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2); try { Object result = anyFuture.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
anyOf
方法接受多个CompletableFuture
作为参数,返回一个新的CompletableFuture
。这个新的CompletableFuture
会在任意一个传入的CompletableFuture
完成时完成,其结果为第一个完成的CompletableFuture
的结果。
CompletableFuture 的异常处理
-
使用
exceptionally
方法处理异常CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("模拟异常"); } return "正常结果"; }).exceptionally(ex -> { System.out.println("捕获到异常: " + ex.getMessage()); return "异常处理结果"; }); try { String result = future.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
exceptionally
方法接受一个Function
类型的参数,该函数会在CompletableFuture
执行过程中抛出异常时被调用,传入异常对象,并返回一个替代结果。这样可以在不影响其他异步操作的情况下,对异常进行处理并提供一个默认结果。 -
使用
handle
方法同时处理正常结果和异常CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("模拟异常"); } return "正常结果"; }).handle((result, ex) -> { if (ex != null) { System.out.println("捕获到异常: " + ex.getMessage()); return "异常处理结果"; } return result + " 处理后"; }); try { String finalResult = future.get(); System.out.println(finalResult); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
handle
方法接受一个BiFunction
类型的参数,无论CompletableFuture
是正常完成还是抛出异常,该函数都会被调用。函数的第一个参数为正常完成时的结果(如果有异常则为null
),第二个参数为异常对象(如果正常完成则为null
)。通过判断ex
是否为null
,可以分别处理正常结果和异常情况,并返回最终结果。
CompletableFuture 的执行线程控制
-
使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务执行完成"; }, executor); try { String result = future.get(); System.out.println("获取到的结果: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { executor.shutdown(); }
在
supplyAsync
方法的第二个参数中传入自定义的Executor
,可以指定CompletableFuture
异步任务在自定义的线程池中执行。这样可以更好地控制线程资源,例如设置线程池的大小、线程的优先级等。 -
使用
thenApplyAsync
等方法指定后续任务执行线程ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "原始结果") .thenApplyAsync(result -> result + " 转换后", executor); try { String finalResult = future.get(); System.out.println(finalResult); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { executor.shutdown(); }
thenApplyAsync
方法与thenApply
方法类似,但它接受一个Executor
参数,可以指定后续转换任务在特定的线程池中执行。同样,thenAcceptAsync
、thenRunAsync
等方法也支持这种方式来指定后续任务的执行线程。
CompletableFuture 在实际项目中的应用场景
-
高并发数据获取与处理 在大型电商系统中,可能需要同时从多个数据源获取商品信息、库存信息、价格信息等,然后对这些数据进行整合和处理。使用
CompletableFuture
可以并发地发起这些异步请求,并在所有请求完成后统一处理结果,大大提高系统的响应速度。 -
异步任务流水线处理 在一些数据处理流程中,可能需要按照一定的顺序依次执行多个异步任务,例如数据采集、数据清洗、数据分析等。
CompletableFuture
的链式调用和结果传递机制可以很好地满足这种需求,使得代码结构清晰,易于维护。 -
分布式系统中的异步通信 在分布式系统中,不同服务之间的调用可能是异步的。
CompletableFuture
可以用于处理这些异步调用的结果,例如在微服务架构中,一个服务调用多个其他服务获取数据,然后对这些数据进行合并和返回。通过CompletableFuture
可以方便地管理这些异步调用,提高系统的整体性能和可靠性。
通过深入理解和灵活运用 CompletableFuture
的异步任务结果传递机制,开发者能够编写出更高效、更灵活的并发程序,提升系统的性能和响应能力。无论是在小型应用还是大型分布式系统中,CompletableFuture
都为异步编程提供了强大而便捷的工具。在实际开发中,需要根据具体的业务需求和场景,合理选择 CompletableFuture
的各种方法,以实现最优的异步处理逻辑。同时,注意线程资源的管理和异常处理,确保程序的稳定性和可靠性。