Java 中 CompletableFuture 获取返回值与异常信息
Java 中 CompletableFuture 获取返回值与异常信息
CompletableFuture 概述
在 Java 并发编程领域,CompletableFuture
是 Java 8 引入的一个强大工具,它为异步编程提供了更便捷、灵活的方式。CompletableFuture
实现了 Future
接口和 CompletionStage
接口,这意味着它既可以像传统的 Future
那样获取异步操作的结果,又具备了更丰富的异步操作组合和处理能力。
Future
接口是 Java 早期用于异步计算的方式,但它存在一些局限性。例如,Future
只能通过 get()
方法阻塞等待异步任务完成来获取结果,无法在任务完成时得到通知,也不便于对多个异步任务进行组合操作。而 CompletableFuture
弥补了这些不足,它允许我们以更优雅的方式处理异步计算,包括异步任务的链式调用、并行执行多个任务以及处理任务中的异常。
获取返回值
常规获取方式
-
使用 get() 方法
CompletableFuture
继承自Future
接口,因此可以使用get()
方法来获取异步任务的返回值。这种方式会阻塞当前线程,直到异步任务完成并返回结果。import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟一个耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务执行完成"; }); String result = future.get(); System.out.println(result); } }
在上述代码中,
CompletableFuture.supplyAsync()
方法创建了一个异步任务,该任务会在后台线程中执行。future.get()
方法会阻塞主线程,直到异步任务完成并返回结果,然后将结果打印出来。 -
使用 get(long timeout, TimeUnit unit) 方法 有时候,我们不希望无限期地阻塞等待异步任务完成,可以使用
get(long timeout, TimeUnit unit)
方法,它会在指定的时间内等待任务完成。如果在指定时间内任务没有完成,会抛出TimeoutException
。import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CompletableFutureTimeoutExample { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟一个耗时操作 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务执行完成"; }); try { String result = future.get(2, TimeUnit.SECONDS); System.out.println(result); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); } } }
在这个例子中,
future.get(2, TimeUnit.SECONDS)
方法会等待 2 秒。由于异步任务需要 3 秒才能完成,所以会抛出TimeoutException
。
异步获取方式
-
使用 thenApply() 方法
thenApply()
方法用于在异步任务完成后,对返回值进行进一步的处理。它接收一个Function
作为参数,该Function
的输入是异步任务的返回值,输出是处理后的结果。thenApply()
方法返回一个新的CompletableFuture
,这个新的CompletableFuture
的结果就是Function
处理后的结果。import java.util.concurrent.CompletableFuture; public class CompletableFutureThenApplyExample { public static void main(String[] args) { CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> s + ", World!") .thenAccept(System.out::println); } }
在上述代码中,
CompletableFuture.supplyAsync(() -> "Hello")
创建了一个异步任务,返回字符串 "Hello"。thenApply(s -> s + ", World!")
对返回值进行处理,将其与 ", World!" 拼接。thenAccept(System.out::println)
用于消费最终的结果并打印输出。 -
使用 thenCompose() 方法
thenCompose()
方法用于将两个CompletableFuture
串联起来,前一个CompletableFuture
的结果作为后一个CompletableFuture
的输入。它接收一个Function
作为参数,该Function
的输入是前一个CompletableFuture
的返回值,输出是另一个CompletableFuture
。import java.util.concurrent.CompletableFuture; public class CompletableFutureThenComposeExample { public static CompletableFuture<String> getMessage() { return CompletableFuture.supplyAsync(() -> "Hello"); } public static CompletableFuture<String> appendMessage(String message) { return CompletableFuture.supplyAsync(() -> message + ", World!"); } public static void main(String[] args) { getMessage() .thenCompose(CompletableFutureThenComposeExample::appendMessage) .thenAccept(System.out::println); } }
在这个例子中,
getMessage()
方法返回一个CompletableFuture
,其结果为 "Hello"。thenCompose(CompletableFutureThenComposeExample::appendMessage)
将这个结果作为参数传递给appendMessage()
方法,appendMessage()
方法返回一个新的CompletableFuture
,最终输出 "Hello, World!"。
获取异常信息
异常处理方式
-
使用 try - catch 块包裹 get() 方法 当使用
get()
方法获取异步任务的返回值时,如果异步任务在执行过程中抛出异常,get()
方法会将该异常包装成ExecutionException
或InterruptedException
重新抛出。我们可以使用try - catch
块来捕获并处理这些异常。import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureExceptionGetExample { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("任务执行出错"); }); try { String result = future.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { System.out.println("捕获到异常: " + e.getCause().getMessage()); } } }
在上述代码中,异步任务抛出了一个
RuntimeException
。当调用future.get()
时,get()
方法会抛出ExecutionException
,其内部包含了原始的RuntimeException
。通过e.getCause()
可以获取到原始异常,并打印异常信息。 -
使用 exceptionally() 方法
exceptionally()
方法用于在异步任务出现异常时,提供一个默认的处理逻辑。它接收一个Function
作为参数,该Function
的输入是异常对象,输出是一个替代的结果。exceptionally()
方法返回一个新的CompletableFuture
,如果异步任务正常完成,这个新的CompletableFuture
的结果就是原异步任务的结果;如果异步任务出现异常,这个新的CompletableFuture
的结果就是Function
处理后的结果。import java.util.concurrent.CompletableFuture; public class CompletableFutureExceptionallyExample { public static void main(String[] args) { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("任务执行出错"); }) .exceptionally(e -> { System.out.println("捕获到异常: " + e.getMessage()); return "默认结果"; }) .thenAccept(System.out::println); } }
在这个例子中,当异步任务抛出
RuntimeException
时,exceptionally()
方法中的Function
会被调用,打印异常信息并返回 "默认结果"。 -
使用 whenComplete() 方法
whenComplete()
方法用于在异步任务完成(无论是正常完成还是出现异常)时执行一个回调函数。它接收两个参数,第一个参数是异步任务的返回值(如果任务正常完成),第二个参数是异常对象(如果任务出现异常)。import java.util.concurrent.CompletableFuture; public class CompletableFutureWhenCompleteExample { public static void main(String[] args) { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("任务执行出错"); }) .whenComplete((result, e) -> { if (e != null) { System.out.println("捕获到异常: " + e.getMessage()); } else { System.out.println("任务正常完成,结果: " + result); } }); // 防止主线程退出 try { Thread.sleep(2000); } catch (InterruptedException ex) { ex.printStackTrace(); } } }
在上述代码中,
whenComplete()
方法中的回调函数会根据任务的执行情况判断是否出现异常,并打印相应的信息。需要注意的是,whenComplete()
方法不会改变CompletableFuture
的结果,也不会中断异常的传播。如果需要对异常进行处理并返回一个新的结果,可以结合exceptionally()
方法使用。 -
使用 handle() 方法
handle()
方法结合了thenApply()
和whenComplete()
的功能,它在异步任务完成(正常或异常)时执行一个回调函数,并返回一个新的CompletableFuture
。回调函数接收两个参数,第一个参数是异步任务的返回值(如果任务正常完成),第二个参数是异常对象(如果任务出现异常)。回调函数的返回值会作为新的CompletableFuture
的结果。import java.util.concurrent.CompletableFuture; public class CompletableFutureHandleExample { public static void main(String[] args) { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("任务执行出错"); }) .handle((result, e) -> { if (e != null) { System.out.println("捕获到异常: " + e.getMessage()); return "默认结果"; } else { return result; } }) .thenAccept(System.out::println); } }
在这个例子中,当异步任务出现异常时,
handle()
方法中的回调函数会捕获异常,打印异常信息并返回 "默认结果"。如果任务正常完成,回调函数会直接返回任务的结果。
多个 CompletableFuture 的异常处理与返回值获取
并行执行多个 CompletableFuture
-
使用 CompletableFuture.allOf() 方法
CompletableFuture.allOf()
方法用于等待所有给定的CompletableFuture
都完成。它接收多个CompletableFuture
作为参数,并返回一个新的CompletableFuture
。这个新的CompletableFuture
在所有传入的CompletableFuture
都完成时才会完成。import java.util.concurrent.CompletableFuture; public class CompletableFutureAllOfExample { public static void main(String[] args) { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务1完成"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务2完成"; }); CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2); allFutures.thenRun(() -> { try { System.out.println(future1.get()); System.out.println(future2.get()); } catch (Exception e) { e.printStackTrace(); } }).join(); } }
在上述代码中,
future1
和future2
是两个并行执行的异步任务。CompletableFuture.allOf(future1, future2)
返回一个新的CompletableFuture
,当future1
和future2
都完成时,allFutures
才会完成。thenRun()
方法在allFutures
完成后执行,通过future1.get()
和future2.get()
获取各自的返回值。对于异常处理,如果其中任何一个
CompletableFuture
抛出异常,allOf()
返回的CompletableFuture
也会以异常完成。我们可以通过在thenRun()
中捕获异常来处理,如上述代码所示。 -
使用 CompletableFuture.anyOf() 方法
CompletableFuture.anyOf()
方法用于等待任何一个给定的CompletableFuture
完成。它接收多个CompletableFuture
作为参数,并返回一个新的CompletableFuture
。这个新的CompletableFuture
在任何一个传入的CompletableFuture
完成时就会完成,其结果就是第一个完成的CompletableFuture
的结果。import java.util.concurrent.CompletableFuture; public class CompletableFutureAnyOfExample { public static void main(String[] args) { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务1完成"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务2完成"; }); CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2); anyFuture.thenAccept(result -> System.out.println("第一个完成的任务结果: " + result)).join(); } }
在这个例子中,
future1
和future2
并行执行。CompletableFuture.anyOf(future1, future2)
返回的anyFuture
在future1
或future2
任何一个完成时就会完成,thenAccept()
方法打印出第一个完成的任务的结果。对于异常处理,如果所有的
CompletableFuture
都以异常完成,anyOf()
返回的CompletableFuture
也会以异常完成,并且异常类型是第一个抛出异常的CompletableFuture
的异常类型。我们可以通过在thenAccept()
或后续的处理方法中捕获异常来处理。
组合多个 CompletableFuture 的返回值
-
使用 thenCombine() 方法
thenCombine()
方法用于将两个CompletableFuture
的结果进行合并。它接收两个参数,第一个是另一个CompletableFuture
,第二个是一个BiFunction
。当两个CompletableFuture
都完成时,BiFunction
会被调用,其输入是两个CompletableFuture
的返回值,输出是合并后的结果。thenCombine()
方法返回一个新的CompletableFuture
,其结果就是BiFunction
处理后的结果。import java.util.concurrent.CompletableFuture; public class CompletableFutureThenCombineExample { public static void main(String[] args) { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20); CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (a, b) -> a + b); combinedFuture.thenAccept(result -> System.out.println("合并结果: " + result)).join(); } }
在上述代码中,
future1
和future2
分别返回 10 和 20。thenCombine(future2, (a, b) -> a + b)
将两个结果相加,combinedFuture
的结果就是 30,最后通过thenAccept()
打印出来。如果
future1
或future2
任何一个出现异常,thenCombine()
返回的CompletableFuture
也会以异常完成。我们可以通过在后续的处理方法(如exceptionally()
、handle()
等)中捕获异常来处理。 -
使用 thenAcceptBoth() 方法
thenAcceptBoth()
方法与thenCombine()
类似,但它不返回合并后的结果,而是直接消费两个CompletableFuture
的结果。它接收两个参数,第一个是另一个CompletableFuture
,第二个是一个BiConsumer
。当两个CompletableFuture
都完成时,BiConsumer
会被调用,其输入是两个CompletableFuture
的返回值。import java.util.concurrent.CompletableFuture; public class CompletableFutureThenAcceptBothExample { public static void main(String[] args) { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World"); future1.thenAcceptBoth(future2, (a, b) -> System.out.println(a + " " + b)).join(); } }
在这个例子中,
future1
和future2
分别返回 "Hello" 和 "World"。thenAcceptBoth(future2, (a, b) -> System.out.println(a + " " + b))
直接将两个结果拼接并打印出来。同样,如果
future1
或future2
任何一个出现异常,thenAcceptBoth()
会使后续的处理以异常完成,我们可以通过异常处理方法来处理这种情况。
CompletableFuture 获取返回值与异常信息的底层原理
CompletableFuture 的内部结构
CompletableFuture
内部使用了一个复杂的状态机来管理异步任务的执行状态。它包含了以下几个关键部分:
- 状态变量:
CompletableFuture
使用一个int
类型的变量来表示其状态,常见的状态包括未完成、正常完成、异常完成等。这些状态的变化决定了CompletableFuture
的行为和对各种操作的响应。 - 结果存储:当异步任务正常完成时,结果会被存储在
CompletableFuture
的一个字段中;当任务出现异常时,异常对象也会被存储在相应的字段中。 - 回调队列:
CompletableFuture
维护了一个回调队列,用于存储在任务完成时需要执行的回调函数。这些回调函数包括thenApply()
、thenAccept()
、whenComplete()
等方法中传入的函数。
获取返回值的底层实现
- get() 方法
get()
方法的底层实现是通过awaitDone()
方法来等待任务完成。awaitDone()
方法会不断检查CompletableFuture
的状态,如果任务未完成,会阻塞当前线程。当任务完成后,get()
方法会根据任务的状态获取结果或抛出异常。如果任务正常完成,会直接返回存储的结果;如果任务出现异常,会将异常包装成ExecutionException
抛出。 - 异步获取方法(如 thenApply() 等)
对于
thenApply()
、thenCompose()
等异步获取结果的方法,它们会在CompletableFuture
完成时,将传入的回调函数添加到回调队列中。当任务完成时,CompletableFuture
会依次执行回调队列中的回调函数,从而实现对结果的异步处理。每个回调函数的执行结果会作为下一个CompletableFuture
的输入或结果,以此实现链式调用和异步操作的组合。
异常处理的底层实现
- 异常的传播:当异步任务抛出异常时,
CompletableFuture
会将异常存储在内部的异常字段中,并将状态设置为异常完成。后续对CompletableFuture
的操作(如get()
方法)会检测到异常状态并抛出相应的异常。 - 异常处理方法(如 exceptionally() 等):
exceptionally()
、handle()
等异常处理方法会在CompletableFuture
出现异常时,将异常处理函数添加到回调队列中。当任务以异常完成时,CompletableFuture
会执行异常处理函数,从而实现对异常的处理。这些异常处理函数可以返回一个替代结果,以改变CompletableFuture
的最终结果。
实际应用场景
- 网络请求:在进行多个网络请求时,可以使用
CompletableFuture
并行发起请求,并在所有请求完成后处理结果。例如,从多个不同的 API 获取数据,然后将这些数据进行合并处理。 - 数据库操作:在进行复杂的数据库查询和更新操作时,
CompletableFuture
可以帮助我们异步执行多个数据库事务,提高系统的并发性能。例如,在更新多个表的数据时,可以将每个表的更新操作封装成一个CompletableFuture
,并行执行这些操作,然后在所有操作完成后进行结果校验。 - 计算密集型任务:对于一些计算密集型的任务,如数据处理、模型训练等,可以使用
CompletableFuture
将任务分解为多个子任务并行执行,加快整体的计算速度。例如,在对大规模数据集进行数据分析时,可以将数据集分成多个部分,每个部分的分析任务作为一个CompletableFuture
并行执行,最后合并分析结果。
通过深入理解 CompletableFuture
获取返回值与异常信息的方法和原理,开发者可以在 Java 并发编程中更高效、更灵活地处理异步任务,提升系统的性能和响应能力。在实际应用中,根据不同的业务场景选择合适的方法来获取返回值和处理异常,能够使代码更加健壮和可维护。