MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 获取返回值与异常信息

2023-01-053.4k 阅读

Java 中 CompletableFuture 获取返回值与异常信息

CompletableFuture 概述

在 Java 并发编程领域,CompletableFuture 是 Java 8 引入的一个强大工具,它为异步编程提供了更便捷、灵活的方式。CompletableFuture 实现了 Future 接口和 CompletionStage 接口,这意味着它既可以像传统的 Future 那样获取异步操作的结果,又具备了更丰富的异步操作组合和处理能力。

Future 接口是 Java 早期用于异步计算的方式,但它存在一些局限性。例如,Future 只能通过 get() 方法阻塞等待异步任务完成来获取结果,无法在任务完成时得到通知,也不便于对多个异步任务进行组合操作。而 CompletableFuture 弥补了这些不足,它允许我们以更优雅的方式处理异步计算,包括异步任务的链式调用、并行执行多个任务以及处理任务中的异常。

获取返回值

常规获取方式

  1. 使用 get() 方法 CompletableFuture 继承自 Future 接口,因此可以使用 get() 方法来获取异步任务的返回值。这种方式会阻塞当前线程,直到异步任务完成并返回结果。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureExample {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                // 模拟一个耗时操作
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务执行完成";
            });
    
            String result = future.get();
            System.out.println(result);
        }
    }
    

    在上述代码中,CompletableFuture.supplyAsync() 方法创建了一个异步任务,该任务会在后台线程中执行。future.get() 方法会阻塞主线程,直到异步任务完成并返回结果,然后将结果打印出来。

  2. 使用 get(long timeout, TimeUnit unit) 方法 有时候,我们不希望无限期地阻塞等待异步任务完成,可以使用 get(long timeout, TimeUnit unit) 方法,它会在指定的时间内等待任务完成。如果在指定时间内任务没有完成,会抛出 TimeoutException

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class CompletableFutureTimeoutExample {
        public static void main(String[] args) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                // 模拟一个耗时操作
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务执行完成";
            });
    
            try {
                String result = future.get(2, TimeUnit.SECONDS);
                System.out.println(result);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    

    在这个例子中,future.get(2, TimeUnit.SECONDS) 方法会等待 2 秒。由于异步任务需要 3 秒才能完成,所以会抛出 TimeoutException

异步获取方式

  1. 使用 thenApply() 方法 thenApply() 方法用于在异步任务完成后,对返回值进行进一步的处理。它接收一个 Function 作为参数,该 Function 的输入是异步任务的返回值,输出是处理后的结果。thenApply() 方法返回一个新的 CompletableFuture,这个新的 CompletableFuture 的结果就是 Function 处理后的结果。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureThenApplyExample {
        public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> "Hello")
                   .thenApply(s -> s + ", World!")
                   .thenAccept(System.out::println);
        }
    }
    

    在上述代码中,CompletableFuture.supplyAsync(() -> "Hello") 创建了一个异步任务,返回字符串 "Hello"。thenApply(s -> s + ", World!") 对返回值进行处理,将其与 ", World!" 拼接。thenAccept(System.out::println) 用于消费最终的结果并打印输出。

  2. 使用 thenCompose() 方法 thenCompose() 方法用于将两个 CompletableFuture 串联起来,前一个 CompletableFuture 的结果作为后一个 CompletableFuture 的输入。它接收一个 Function 作为参数,该 Function 的输入是前一个 CompletableFuture 的返回值,输出是另一个 CompletableFuture

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureThenComposeExample {
        public static CompletableFuture<String> getMessage() {
            return CompletableFuture.supplyAsync(() -> "Hello");
        }
    
        public static CompletableFuture<String> appendMessage(String message) {
            return CompletableFuture.supplyAsync(() -> message + ", World!");
        }
    
        public static void main(String[] args) {
            getMessage()
                   .thenCompose(CompletableFutureThenComposeExample::appendMessage)
                   .thenAccept(System.out::println);
        }
    }
    

    在这个例子中,getMessage() 方法返回一个 CompletableFuture,其结果为 "Hello"。thenCompose(CompletableFutureThenComposeExample::appendMessage) 将这个结果作为参数传递给 appendMessage() 方法,appendMessage() 方法返回一个新的 CompletableFuture,最终输出 "Hello, World!"。

获取异常信息

异常处理方式

  1. 使用 try - catch 块包裹 get() 方法 当使用 get() 方法获取异步任务的返回值时,如果异步任务在执行过程中抛出异常,get() 方法会将该异常包装成 ExecutionExceptionInterruptedException 重新抛出。我们可以使用 try - catch 块来捕获并处理这些异常。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureExceptionGetExample {
        public static void main(String[] args) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("任务执行出错");
            });
    
            try {
                String result = future.get();
                System.out.println(result);
            } catch (InterruptedException | ExecutionException e) {
                System.out.println("捕获到异常: " + e.getCause().getMessage());
            }
        }
    }
    

    在上述代码中,异步任务抛出了一个 RuntimeException。当调用 future.get() 时,get() 方法会抛出 ExecutionException,其内部包含了原始的 RuntimeException。通过 e.getCause() 可以获取到原始异常,并打印异常信息。

  2. 使用 exceptionally() 方法 exceptionally() 方法用于在异步任务出现异常时,提供一个默认的处理逻辑。它接收一个 Function 作为参数,该 Function 的输入是异常对象,输出是一个替代的结果。exceptionally() 方法返回一个新的 CompletableFuture,如果异步任务正常完成,这个新的 CompletableFuture 的结果就是原异步任务的结果;如果异步任务出现异常,这个新的 CompletableFuture 的结果就是 Function 处理后的结果。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureExceptionallyExample {
        public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("任务执行出错");
            })
                   .exceptionally(e -> {
                        System.out.println("捕获到异常: " + e.getMessage());
                        return "默认结果";
                    })
                   .thenAccept(System.out::println);
        }
    }
    

    在这个例子中,当异步任务抛出 RuntimeException 时,exceptionally() 方法中的 Function 会被调用,打印异常信息并返回 "默认结果"。

  3. 使用 whenComplete() 方法 whenComplete() 方法用于在异步任务完成(无论是正常完成还是出现异常)时执行一个回调函数。它接收两个参数,第一个参数是异步任务的返回值(如果任务正常完成),第二个参数是异常对象(如果任务出现异常)。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureWhenCompleteExample {
        public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("任务执行出错");
            })
                   .whenComplete((result, e) -> {
                        if (e != null) {
                            System.out.println("捕获到异常: " + e.getMessage());
                        } else {
                            System.out.println("任务正常完成,结果: " + result);
                        }
                    });
    
            // 防止主线程退出
            try {
                Thread.sleep(2000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }
    

    在上述代码中,whenComplete() 方法中的回调函数会根据任务的执行情况判断是否出现异常,并打印相应的信息。需要注意的是,whenComplete() 方法不会改变 CompletableFuture 的结果,也不会中断异常的传播。如果需要对异常进行处理并返回一个新的结果,可以结合 exceptionally() 方法使用。

  4. 使用 handle() 方法 handle() 方法结合了 thenApply()whenComplete() 的功能,它在异步任务完成(正常或异常)时执行一个回调函数,并返回一个新的 CompletableFuture。回调函数接收两个参数,第一个参数是异步任务的返回值(如果任务正常完成),第二个参数是异常对象(如果任务出现异常)。回调函数的返回值会作为新的 CompletableFuture 的结果。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureHandleExample {
        public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("任务执行出错");
            })
                   .handle((result, e) -> {
                        if (e != null) {
                            System.out.println("捕获到异常: " + e.getMessage());
                            return "默认结果";
                        } else {
                            return result;
                        }
                    })
                   .thenAccept(System.out::println);
        }
    }
    

    在这个例子中,当异步任务出现异常时,handle() 方法中的回调函数会捕获异常,打印异常信息并返回 "默认结果"。如果任务正常完成,回调函数会直接返回任务的结果。

多个 CompletableFuture 的异常处理与返回值获取

并行执行多个 CompletableFuture

  1. 使用 CompletableFuture.allOf() 方法 CompletableFuture.allOf() 方法用于等待所有给定的 CompletableFuture 都完成。它接收多个 CompletableFuture 作为参数,并返回一个新的 CompletableFuture。这个新的 CompletableFuture 在所有传入的 CompletableFuture 都完成时才会完成。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureAllOfExample {
        public static void main(String[] args) {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务1完成";
            });
    
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务2完成";
            });
    
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
    
            allFutures.thenRun(() -> {
                try {
                    System.out.println(future1.get());
                    System.out.println(future2.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).join();
        }
    }
    

    在上述代码中,future1future2 是两个并行执行的异步任务。CompletableFuture.allOf(future1, future2) 返回一个新的 CompletableFuture,当 future1future2 都完成时,allFutures 才会完成。thenRun() 方法在 allFutures 完成后执行,通过 future1.get()future2.get() 获取各自的返回值。

    对于异常处理,如果其中任何一个 CompletableFuture 抛出异常,allOf() 返回的 CompletableFuture 也会以异常完成。我们可以通过在 thenRun() 中捕获异常来处理,如上述代码所示。

  2. 使用 CompletableFuture.anyOf() 方法 CompletableFuture.anyOf() 方法用于等待任何一个给定的 CompletableFuture 完成。它接收多个 CompletableFuture 作为参数,并返回一个新的 CompletableFuture。这个新的 CompletableFuture 在任何一个传入的 CompletableFuture 完成时就会完成,其结果就是第一个完成的 CompletableFuture 的结果。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureAnyOfExample {
        public static void main(String[] args) {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务1完成";
            });
    
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务2完成";
            });
    
            CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
    
            anyFuture.thenAccept(result -> System.out.println("第一个完成的任务结果: " + result)).join();
        }
    }
    

    在这个例子中,future1future2 并行执行。CompletableFuture.anyOf(future1, future2) 返回的 anyFuturefuture1future2 任何一个完成时就会完成,thenAccept() 方法打印出第一个完成的任务的结果。

    对于异常处理,如果所有的 CompletableFuture 都以异常完成,anyOf() 返回的 CompletableFuture 也会以异常完成,并且异常类型是第一个抛出异常的 CompletableFuture 的异常类型。我们可以通过在 thenAccept() 或后续的处理方法中捕获异常来处理。

组合多个 CompletableFuture 的返回值

  1. 使用 thenCombine() 方法 thenCombine() 方法用于将两个 CompletableFuture 的结果进行合并。它接收两个参数,第一个是另一个 CompletableFuture,第二个是一个 BiFunction。当两个 CompletableFuture 都完成时,BiFunction 会被调用,其输入是两个 CompletableFuture 的返回值,输出是合并后的结果。thenCombine() 方法返回一个新的 CompletableFuture,其结果就是 BiFunction 处理后的结果。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureThenCombineExample {
        public static void main(String[] args) {
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
    
            CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (a, b) -> a + b);
    
            combinedFuture.thenAccept(result -> System.out.println("合并结果: " + result)).join();
        }
    }
    

    在上述代码中,future1future2 分别返回 10 和 20。thenCombine(future2, (a, b) -> a + b) 将两个结果相加,combinedFuture 的结果就是 30,最后通过 thenAccept() 打印出来。

    如果 future1future2 任何一个出现异常,thenCombine() 返回的 CompletableFuture 也会以异常完成。我们可以通过在后续的处理方法(如 exceptionally()handle() 等)中捕获异常来处理。

  2. 使用 thenAcceptBoth() 方法 thenAcceptBoth() 方法与 thenCombine() 类似,但它不返回合并后的结果,而是直接消费两个 CompletableFuture 的结果。它接收两个参数,第一个是另一个 CompletableFuture,第二个是一个 BiConsumer。当两个 CompletableFuture 都完成时,BiConsumer 会被调用,其输入是两个 CompletableFuture 的返回值。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureThenAcceptBothExample {
        public static void main(String[] args) {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
    
            future1.thenAcceptBoth(future2, (a, b) -> System.out.println(a + " " + b)).join();
        }
    }
    

    在这个例子中,future1future2 分别返回 "Hello" 和 "World"。thenAcceptBoth(future2, (a, b) -> System.out.println(a + " " + b)) 直接将两个结果拼接并打印出来。

    同样,如果 future1future2 任何一个出现异常,thenAcceptBoth() 会使后续的处理以异常完成,我们可以通过异常处理方法来处理这种情况。

CompletableFuture 获取返回值与异常信息的底层原理

CompletableFuture 的内部结构

CompletableFuture 内部使用了一个复杂的状态机来管理异步任务的执行状态。它包含了以下几个关键部分:

  1. 状态变量CompletableFuture 使用一个 int 类型的变量来表示其状态,常见的状态包括未完成、正常完成、异常完成等。这些状态的变化决定了 CompletableFuture 的行为和对各种操作的响应。
  2. 结果存储:当异步任务正常完成时,结果会被存储在 CompletableFuture 的一个字段中;当任务出现异常时,异常对象也会被存储在相应的字段中。
  3. 回调队列CompletableFuture 维护了一个回调队列,用于存储在任务完成时需要执行的回调函数。这些回调函数包括 thenApply()thenAccept()whenComplete() 等方法中传入的函数。

获取返回值的底层实现

  1. get() 方法 get() 方法的底层实现是通过 awaitDone() 方法来等待任务完成。awaitDone() 方法会不断检查 CompletableFuture 的状态,如果任务未完成,会阻塞当前线程。当任务完成后,get() 方法会根据任务的状态获取结果或抛出异常。如果任务正常完成,会直接返回存储的结果;如果任务出现异常,会将异常包装成 ExecutionException 抛出。
  2. 异步获取方法(如 thenApply() 等) 对于 thenApply()thenCompose() 等异步获取结果的方法,它们会在 CompletableFuture 完成时,将传入的回调函数添加到回调队列中。当任务完成时,CompletableFuture 会依次执行回调队列中的回调函数,从而实现对结果的异步处理。每个回调函数的执行结果会作为下一个 CompletableFuture 的输入或结果,以此实现链式调用和异步操作的组合。

异常处理的底层实现

  1. 异常的传播:当异步任务抛出异常时,CompletableFuture 会将异常存储在内部的异常字段中,并将状态设置为异常完成。后续对 CompletableFuture 的操作(如 get() 方法)会检测到异常状态并抛出相应的异常。
  2. 异常处理方法(如 exceptionally() 等)exceptionally()handle() 等异常处理方法会在 CompletableFuture 出现异常时,将异常处理函数添加到回调队列中。当任务以异常完成时,CompletableFuture 会执行异常处理函数,从而实现对异常的处理。这些异常处理函数可以返回一个替代结果,以改变 CompletableFuture 的最终结果。

实际应用场景

  1. 网络请求:在进行多个网络请求时,可以使用 CompletableFuture 并行发起请求,并在所有请求完成后处理结果。例如,从多个不同的 API 获取数据,然后将这些数据进行合并处理。
  2. 数据库操作:在进行复杂的数据库查询和更新操作时,CompletableFuture 可以帮助我们异步执行多个数据库事务,提高系统的并发性能。例如,在更新多个表的数据时,可以将每个表的更新操作封装成一个 CompletableFuture,并行执行这些操作,然后在所有操作完成后进行结果校验。
  3. 计算密集型任务:对于一些计算密集型的任务,如数据处理、模型训练等,可以使用 CompletableFuture 将任务分解为多个子任务并行执行,加快整体的计算速度。例如,在对大规模数据集进行数据分析时,可以将数据集分成多个部分,每个部分的分析任务作为一个 CompletableFuture 并行执行,最后合并分析结果。

通过深入理解 CompletableFuture 获取返回值与异常信息的方法和原理,开发者可以在 Java 并发编程中更高效、更灵活地处理异步任务,提升系统的性能和响应能力。在实际应用中,根据不同的业务场景选择合适的方法来获取返回值和处理异常,能够使代码更加健壮和可维护。