Java 中 CompletableFuture 异步任务执行顺序控制
2021-08-135.3k 阅读
Java 中 CompletableFuture 异步任务执行顺序控制
CompletableFuture 简介
在 Java 编程中,随着应用程序复杂性的增加,处理异步任务变得越来越重要。CompletableFuture
是 Java 8 引入的一个强大工具,用于处理异步计算。它实现了 Future
接口和 CompletionStage
接口,不仅能够表示一个异步操作的结果,还提供了丰富的方法来控制异步任务的执行顺序、组合多个异步任务等。
CompletableFuture
的核心优势在于它的灵活性和易用性。与传统的 Future
相比,CompletableFuture
可以在任务完成时自动触发后续操作,而不需要像 Future
那样通过轮询或阻塞的方式获取结果。这使得编写异步代码变得更加简洁和高效。
创建 CompletableFuture
- 使用
supplyAsync
创建有返回值的异步任务
在上述代码中,CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { // 模拟一个耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 1 completed"; });
supplyAsync
方法接受一个Supplier
接口的实现,这个Supplier
接口的get
方法中定义了异步任务的具体逻辑。supplyAsync
方法会在一个默认的ForkJoinPool.commonPool()
线程池中执行这个任务,并返回一个CompletableFuture
对象,该对象最终会包含任务的返回结果。 - 使用
runAsync
创建无返回值的异步任务CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { // 模拟一个耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task 2 completed"); });
runAsync
方法接受一个Runnable
接口的实现,它适用于不需要返回值的异步任务。同样,这个任务会在默认的线程池中执行,返回的CompletableFuture
对象在任务完成时会包含null
值。
异步任务执行顺序控制基础
- thenApply 方法
thenApply
方法用于在当前CompletableFuture
完成后,对其结果进行处理并返回一个新的CompletableFuture
。
在这里,首先通过CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> s + ", World"); future3.join(); // "Hello, World"
supplyAsync
创建了一个异步任务,该任务返回字符串"Hello"
。然后使用thenApply
方法,对这个结果进行处理,将其与", World"
拼接,返回一个新的CompletableFuture
,其结果为"Hello, World"
。thenApply
方法接受一个Function
接口的实现,该Function
接口的apply
方法定义了对结果的处理逻辑。 - thenAccept 方法
thenAccept
方法用于在当前CompletableFuture
完成后,消费其结果,但不返回新的结果。
上述代码中,异步任务返回CompletableFuture.supplyAsync(() -> "Message") .thenAccept(System.out::println);
"Message"
,thenAccept
方法接受这个结果并通过System.out::println
进行打印。thenAccept
方法接受一个Consumer
接口的实现,Consumer
接口的accept
方法定义了对结果的消费逻辑。 - thenRun 方法
thenRun
方法用于在当前CompletableFuture
完成后,执行一个无参数的Runnable
。
异步任务先返回CompletableFuture.supplyAsync(() -> "Result") .thenRun(() -> System.out.println("Task is completed"));
"Result"
,然后thenRun
方法执行定义的Runnable
,打印出"Task is completed"
。thenRun
方法接受一个Runnable
接口的实现,它不关心之前任务的结果。
顺序执行多个异步任务
- 串行任务链
可以通过多次调用
thenApply
、thenAccept
或thenRun
方法来创建一个串行的任务链。
上述代码中,第一个异步任务返回CompletableFuture.supplyAsync(() -> "Step 1") .thenApply(s -> s + " -> Step 2") .thenApply(s -> s + " -> Step 3") .thenAccept(System.out::println);
"Step 1"
,然后依次通过thenApply
方法对结果进行处理,添加" -> Step 2"
和" -> Step 3"
,最后通过thenAccept
方法打印最终结果"Step 1 -> Step 2 -> Step 3"
。这样就实现了多个异步任务的顺序执行,每个任务依赖前一个任务的结果。 - 复杂任务链中的异常处理
在任务链执行过程中,如果某个任务出现异常,整个任务链会中断。可以使用
exceptionally
方法来处理异常。
在这个例子中,CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("Simulated error"); } return "Success"; }) .thenApply(s -> s + " -> Processed") .exceptionally(ex -> { System.err.println("Caught exception: " + ex.getMessage()); return "Default value"; }) .thenAccept(System.out::println);
supplyAsync
中的任务有 50% 的概率抛出异常。如果抛出异常,exceptionally
方法会捕获异常,打印异常信息,并返回一个默认值"Default value"
。如果没有异常,任务链会正常执行,返回"Success -> Processed"
。
并行执行异步任务并合并结果
- allOf 方法
allOf
方法用于等待所有给定的CompletableFuture
都完成。它返回一个新的CompletableFuture
,当所有输入的CompletableFuture
都完成时,这个新的CompletableFuture
也完成。
这里创建了两个异步任务CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Future 4 result"; }); CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Future 5 result"; }); CompletableFuture<Void> allFutures = CompletableFuture.allOf(future4, future5); allFutures.join(); try { String result4 = future4.get(); String result5 = future5.get(); System.out.println(result4 + " and " + result5); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
future4
和future5
,它们会并行执行。allOf
方法返回的CompletableFuture
allFutures
会在future4
和future5
都完成时完成。通过调用join
方法等待所有任务完成,然后通过get
方法获取每个任务的结果。 - anyOf 方法
anyOf
方法用于等待任何一个给定的CompletableFuture
完成。它返回一个新的CompletableFuture
,当任何一个输入的CompletableFuture
完成时,这个新的CompletableFuture
也完成,其结果为第一个完成的CompletableFuture
的结果。
在这个例子中,CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "Future 6 result"; }); CompletableFuture<String> future7 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } return "Future 7 result"; }); CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future6, future7); try { Object result = anyFuture.get(); System.out.println("First completed result: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
future6
和future7
并行执行,anyOf
方法返回的anyFuture
会在future6
或future7
任何一个完成时完成。通过get
方法获取第一个完成的任务的结果,这里由于future7
耗时较短,会先完成,所以打印出"First completed result: Future 7 result"
。
基于依赖关系的异步任务执行顺序控制
- thenCompose 方法
thenCompose
方法用于将一个CompletableFuture
的结果作为另一个CompletableFuture
的输入,并返回一个新的CompletableFuture
。它与thenApply
的区别在于,thenApply
返回的是一个已经包含处理结果的CompletableFuture
,而thenCompose
返回的是一个由另一个CompletableFuture
构成的新CompletableFuture
。
这里第一个异步任务返回CompletableFuture<String> future8 = CompletableFuture.supplyAsync(() -> "Input") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " -> Processed")); future8.join(); // "Input -> Processed"
"Input"
,thenCompose
方法接受这个结果,并将其作为新的CompletableFuture
的输入,这个新的CompletableFuture
会返回"Input -> Processed"
。 - handle 方法
handle
方法用于在CompletableFuture
完成(无论是正常完成还是异常完成)时,对结果或异常进行处理,并返回一个新的CompletableFuture
。
与CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("Simulated error"); } return "Success"; }) .handle((result, ex) -> { if (ex != null) { System.err.println("Caught exception: " + ex.getMessage()); return "Default value"; } return result + " -> Processed"; }) .thenAccept(System.out::println);
exceptionally
方法类似,handle
方法可以处理异常,但它同时也能处理正常完成的情况。在上述代码中,如果任务正常完成,会在结果后添加" -> Processed"
;如果任务抛出异常,会捕获异常并返回默认值。
CompletableFuture 执行顺序控制在实际项目中的应用场景
- 微服务调用链
在微服务架构中,一个业务操作可能需要调用多个微服务。例如,一个电商应用中,获取商品详情可能需要调用商品服务获取基本信息,再调用库存服务获取库存信息,最后调用评论服务获取评论信息。可以使用
CompletableFuture
来并行调用这些微服务,然后合并结果。
通过这种方式,可以显著提高系统的响应速度,因为各个微服务调用是并行进行的,而不是串行依次调用。CompletableFuture<String> productInfoFuture = CompletableFuture.supplyAsync(() -> { // 模拟调用商品服务 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Product details"; }); CompletableFuture<String> stockInfoFuture = CompletableFuture.supplyAsync(() -> { // 模拟调用库存服务 try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } return "Stock available"; }); CompletableFuture<String> reviewInfoFuture = CompletableFuture.supplyAsync(() -> { // 模拟调用评论服务 try { Thread.sleep(2500); } catch (InterruptedException e) { e.printStackTrace(); } return "Positive reviews"; }); CompletableFuture<Void> allServices = CompletableFuture.allOf(productInfoFuture, stockInfoFuture, reviewInfoFuture); allServices.join(); try { String productInfo = productInfoFuture.get(); String stockInfo = stockInfoFuture.get(); String reviewInfo = reviewInfoFuture.get(); System.out.println("Combined result: " + productInfo + ", " + stockInfo + ", " + reviewInfo); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
- 数据预处理和后处理流水线
在大数据处理场景中,可能需要对数据进行一系列的预处理操作,如清洗、转换,然后进行计算,最后进行后处理,如结果格式化。可以使用
CompletableFuture
来构建一个异步处理流水线。
这样,数据在不同的异步任务中依次进行清洗、处理和格式化,每个任务依赖前一个任务的结果,实现了高效的异步数据处理流水线。CompletableFuture<String> dataFuture = CompletableFuture.supplyAsync(() -> "Dirty data"); CompletableFuture<String> cleanedDataFuture = dataFuture.thenApply(s -> { // 模拟数据清洗 return s.replace("Dirty", "Cleaned"); }); CompletableFuture<String> processedDataFuture = cleanedDataFuture.thenApply(s -> { // 模拟数据处理 return s + " -> Processed"; }); CompletableFuture<String> formattedDataFuture = processedDataFuture.thenApply(s -> { // 模拟结果格式化 return "Formatted: " + s; }); formattedDataFuture.thenAccept(System.out::println);
高级技巧:自定义线程池
- 使用自定义线程池执行 CompletableFuture 任务
默认情况下,
CompletableFuture
使用ForkJoinPool.commonPool()
线程池来执行任务。但在一些场景下,可能需要使用自定义线程池来满足特定的需求,比如限制并发数、设置线程优先级等。
在上述代码中,通过ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> future9 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task result"; }, executor); future9.join(); executor.shutdown();
Executors.newFixedThreadPool(3)
创建了一个固定大小为 3 的线程池executor
。然后使用supplyAsync
方法的第二个参数,将这个线程池传递进去,这样异步任务就会在这个自定义线程池中执行。最后,任务完成后关闭线程池。 - 线程池对任务执行顺序的影响
自定义线程池的配置会影响
CompletableFuture
任务的执行顺序。例如,如果使用一个单线程的线程池,那么多个CompletableFuture
任务会依次执行,而不是并行执行。
在这个例子中,由于使用了单线程的线程池,ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); CompletableFuture<String> future10 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 10 result"; }, singleThreadExecutor); CompletableFuture<String> future11 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 11 result"; }, singleThreadExecutor); CompletableFuture<Void> allTasks = CompletableFuture.allOf(future10, future11); allTasks.join(); try { String result10 = future10.get(); String result11 = future11.get(); System.out.println(result10 + " and " + result11); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } singleThreadExecutor.shutdown();
future10
和future11
会依次执行,先执行future10
,再执行future11
,最后打印出两个任务的结果。
注意事项
- 内存泄漏风险
如果在
CompletableFuture
中创建了资源(如数据库连接、文件句柄等),并且没有正确释放,可能会导致内存泄漏。确保在任务完成后及时关闭或释放这些资源。CompletableFuture<Void> future12 = CompletableFuture.runAsync(() -> { // 模拟打开一个数据库连接 Connection connection = null; try { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password"); // 执行数据库操作 } catch (SQLException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } });
- 异常处理的完整性
在使用
CompletableFuture
时,要确保异常处理的完整性。如果没有正确处理异常,可能会导致程序出现未捕获的异常,影响系统的稳定性。不仅要在任务执行过程中处理异常,还要在任务链的各个环节进行适当的异常处理。
上述代码在任务执行和结果处理过程中都考虑了异常情况,保证了程序的健壮性。CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("Simulated error"); } return "Success"; }) .thenApply(s -> s + " -> Processed") .exceptionally(ex -> { System.err.println("Caught exception: " + ex.getMessage()); return "Default value"; }) .thenAccept(System.out::println);
通过深入理解和合理运用 CompletableFuture
的各种方法,开发者可以在 Java 中高效地控制异步任务的执行顺序,提高程序的性能和响应能力,使其在各种复杂的应用场景中发挥重要作用。无论是在微服务架构、大数据处理还是其他异步编程场景中,CompletableFuture
都是一个不可或缺的工具。