MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 任务完成回调 handle 方法

2022-05-297.0k 阅读

Java 中 CompletableFuture 任务完成回调 handle 方法

在 Java 的异步编程领域,CompletableFuture 是一个强大的工具,它提供了丰富的方法来处理异步任务的结果、错误以及组合多个异步操作。其中,handle 方法是 CompletableFuture 中一个非常有用的任务完成回调方法,它允许我们在异步任务完成(无论是正常完成还是发生异常)时执行特定的操作,并返回一个新的 CompletableFuture 包含处理结果。

handle 方法的定义

CompletableFuture 类中 handle 方法有两个重载版本:

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable,? extends U> fn, Executor executor)
  • 第一个版本 handle(BiFunction<? super T, Throwable,? extends U> fn):同步地执行回调函数 fn。当异步任务完成(正常完成或出现异常)时,fn 会在调用 handle 方法的线程中执行。fn 是一个 BiFunction,它接受两个参数:异步任务的结果(如果任务正常完成)和任务执行过程中抛出的异常(如果任务出现异常),并返回一个类型为 U 的结果,这个结果会被包装在新的 CompletableFuture 中返回。

  • 第二个版本 handleAsync(BiFunction<? super T, Throwable,? extends U> fn):异步地执行回调函数 fn。当异步任务完成时,fn 会被提交到 ForkJoinPool.commonPool() 线程池中执行。同样,fn 返回的结果会被包装在新的 CompletableFuture 中返回。

  • 第三个版本 handleAsync(BiFunction<? super T, Throwable,? extends U> fn, Executor executor):与第二个版本类似,也是异步执行回调函数 fn,但不同的是,这里使用传入的 Executor 来执行 fn,而不是使用 ForkJoinPool.commonPool()

基本使用示例

下面通过一个简单的示例来展示 handle 方法的基本用法。假设我们有一个异步任务,它模拟从数据库中获取用户信息:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureHandleExample {

    public static CompletableFuture<String> getUserInfoAsync() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据库获取用户信息的耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "User Information";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = getUserInfoAsync();

        CompletableFuture<String> handleFuture = future.handle((result, ex) -> {
            if (ex != null) {
                // 处理异常情况
                System.err.println("获取用户信息时发生异常: " + ex.getMessage());
                return "默认用户信息";
            } else {
                // 处理正常结果
                System.out.println("获取到的用户信息: " + result);
                return result + " - 处理后的信息";
            }
        });

        System.out.println("最终结果: " + handleFuture.get());
    }
}

在上述示例中:

  1. getUserInfoAsync 方法返回一个 CompletableFuture,它异步地模拟从数据库获取用户信息,这里使用 Thread.sleep 模拟了一个耗时操作。
  2. 调用 future.handle 方法,传入一个 BiFunction。在 BiFunction 中,通过判断 ex 是否为 null 来确定任务是否正常完成。如果 ex 不为 null,说明任务出现异常,打印异常信息并返回默认用户信息;如果 exnull,则处理正常的结果,并返回处理后的信息。
  3. 最后通过 handleFuture.get() 获取最终的处理结果并打印。

异常处理与结果转换

handle 方法的一个重要作用是在异步任务完成时,无论是正常完成还是出现异常,都能进行统一的处理,并将结果转换为另一种类型。这在很多实际场景中非常有用,比如在微服务架构中,当调用远程服务获取数据时,可能会出现网络异常等情况,我们需要在出现异常时返回一个默认值,并且将结果转换为适合当前业务逻辑的格式。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureHandleExceptionAndTransformExample {

    public static CompletableFuture<Integer> divideAsync(int a, int b) {
        return CompletableFuture.supplyAsync(() -> {
            if (b == 0) {
                throw new IllegalArgumentException("除数不能为零");
            }
            return a / b;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = divideAsync(10, 2);

        CompletableFuture<String> handleFuture = future.handle((result, ex) -> {
            if (ex != null) {
                // 处理异常情况
                System.err.println("除法运算发生异常: " + ex.getMessage());
                return "运算失败";
            } else {
                // 处理正常结果
                System.out.println("除法运算结果: " + result);
                return "运算成功,结果为: " + result;
            }
        });

        System.out.println("最终结果: " + handleFuture.get());
    }
}

在这个例子中,divideAsync 方法异步地执行除法运算。如果除数为零,会抛出 IllegalArgumentException。通过 handle 方法,我们可以在异常发生时打印异常信息并返回一个表示运算失败的字符串,而在正常情况下,返回表示运算成功及结果的字符串。

链式调用与组合异步操作

CompletableFuture 的强大之处在于它支持链式调用,handle 方法也不例外。我们可以将多个 CompletableFuture 操作串联起来,每个操作基于前一个操作的结果(或异常)进行处理。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureChainingWithHandleExample {

    public static CompletableFuture<String> step1() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("执行步骤1");
            return "步骤1的结果";
        });
    }

    public static CompletableFuture<String> step2(String resultFromStep1) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("执行步骤2,输入为: " + resultFromStep1);
            return resultFromStep1 + " -> 步骤2处理后";
        });
    }

    public static CompletableFuture<String> step3(String resultFromStep2) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("执行步骤3,输入为: " + resultFromStep2);
            return resultFromStep2 + " -> 步骤3处理后";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = step1()
               .handle((result, ex) -> {
                    if (ex != null) {
                        System.err.println("步骤1发生异常: " + ex.getMessage());
                        return "步骤1异常时的默认值";
                    } else {
                        System.out.println("步骤1正常完成,结果为: " + result);
                        return result;
                    }
                })
               .thenApplyAsync(step2::apply)
               .handle((result, ex) -> {
                    if (ex != null) {
                        System.err.println("步骤2发生异常: " + ex.getMessage());
                        return "步骤2异常时的默认值";
                    } else {
                        System.out.println("步骤2正常完成,结果为: " + result);
                        return result;
                    }
                })
               .thenApplyAsync(step3::apply);

        System.out.println("最终结果: " + future.get());
    }
}

在这个示例中:

  1. step1 方法返回一个 CompletableFuture,它异步执行并返回一个字符串结果。
  2. step1 的结果使用 handle 方法进行处理,在正常和异常情况下分别进行相应操作,并返回处理后的结果。
  3. 使用 thenApplyAsync 将处理后的结果传递给 step2 方法进行下一步处理。
  4. 再次使用 handle 方法处理 step2 的结果,同样区分正常和异常情况。
  5. 最后通过 thenApplyAsync 将结果传递给 step3 方法进行最终处理。

通过这种链式调用的方式,我们可以构建复杂的异步处理流程,并且在每个步骤中都能对异常进行灵活处理。

handle 与其他 CompletableFuture 方法的对比

  1. 与 thenApply 对比thenApply 方法只在异步任务正常完成时执行回调函数,它接受一个 Function,该 Function 只接受异步任务的正常结果作为参数,并返回一个新的结果。而 handle 方法无论是任务正常完成还是出现异常都会执行回调函数,其回调函数 BiFunction 接受任务结果和异常作为参数。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureHandleVsThenApplyExample {

    public static CompletableFuture<Integer> divideAsync(int a, int b) {
        return CompletableFuture.supplyAsync(() -> {
            if (b == 0) {
                throw new IllegalArgumentException("除数不能为零");
            }
            return a / b;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = divideAsync(10, 2);

        CompletableFuture<String> thenApplyFuture = future.thenApply(result -> {
            System.out.println("thenApply 处理正常结果: " + result);
            return "运算成功,结果为: " + result;
        });

        CompletableFuture<String> handleFuture = future.handle((result, ex) -> {
            if (ex != null) {
                System.err.println("handle 处理异常: " + ex.getMessage());
                return "运算失败";
            } else {
                System.out.println("handle 处理正常结果: " + result);
                return "运算成功,结果为: " + result;
            }
        });

        System.out.println("thenApply 最终结果: " + thenApplyFuture.get());
        System.out.println("handle 最终结果: " + handleFuture.get());
    }
}

在上述代码中,如果 divideAsync 方法抛出异常,thenApply 不会执行回调函数,而 handle 会执行并处理异常。

  1. 与 whenComplete 对比whenComplete 方法也会在异步任务完成(正常或异常)时执行回调函数,它接受一个 BiConsumer,该 BiConsumer 接受任务结果和异常作为参数,但它不会返回新的 CompletableFuture。而 handle 方法会返回一个新的 CompletableFuture,其中包含回调函数的返回值。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureHandleVsWhenCompleteExample {

    public static CompletableFuture<Integer> divideAsync(int a, int b) {
        return CompletableFuture.supplyAsync(() -> {
            if (b == 0) {
                throw new IllegalArgumentException("除数不能为零");
            }
            return a / b;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = divideAsync(10, 2);

        CompletableFuture<Void> whenCompleteFuture = future.whenComplete((result, ex) -> {
            if (ex != null) {
                System.err.println("whenComplete 处理异常: " + ex.getMessage());
            } else {
                System.out.println("whenComplete 处理正常结果: " + result);
            }
        });

        CompletableFuture<String> handleFuture = future.handle((result, ex) -> {
            if (ex != null) {
                System.err.println("handle 处理异常: " + ex.getMessage());
                return "运算失败";
            } else {
                System.out.println("handle 处理正常结果: " + result);
                return "运算成功,结果为: " + result;
            }
        });

        whenCompleteFuture.get();
        System.out.println("handle 最终结果: " + handleFuture.get());
    }
}

在这个例子中,whenComplete 只是在任务完成时执行一些操作,不会改变任务的返回结果,而 handle 可以根据任务结果和异常情况返回一个新的结果包装在新的 CompletableFuture 中。

并发与性能考虑

当使用 handleAsync 方法并指定自定义的 Executor 时,需要谨慎考虑并发策略和性能问题。如果使用的线程池过小,可能会导致任务排队等待执行,从而影响整体的响应时间;而如果线程池过大,可能会导致资源浪费和上下文切换开销增加。

例如,假设我们有一个高并发的场景,需要处理大量的异步任务,并且每个任务的 handle 回调函数都比较耗时:

import java.util.concurrent.*;

public class CompletableFutureHandleAsyncPerformanceExample {

    public static CompletableFuture<Integer> asyncTask() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟异步任务
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 42;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        CompletableFuture[] futures = new CompletableFuture[100];
        for (int i = 0; i < 100; i++) {
            futures[i] = asyncTask().handleAsync((result, ex) -> {
                if (ex != null) {
                    System.err.println("任务发生异常: " + ex.getMessage());
                    return -1;
                } else {
                    // 模拟耗时处理
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("处理结果: " + result);
                    return result * 2;
                }
            }, executor);
        }

        for (CompletableFuture future : futures) {
            future.get();
        }

        executor.shutdown();
    }
}

在这个示例中,我们创建了一个固定大小为 10 的线程池来执行 handleAsync 回调函数。由于每个回调函数都有 1 秒的模拟耗时操作,并且有 100 个任务,线程池的大小会影响任务的执行效率。如果线程池大小设置不合理,可能会导致部分任务长时间等待执行,从而影响整体性能。

为了优化性能,我们可以根据任务的特性和系统资源情况,选择合适的线程池类型(如 CachedThreadPoolScheduledThreadPool 等),并动态调整线程池的大小。

实际应用场景

  1. 微服务调用链中的错误处理与结果转换:在微服务架构中,一个业务流程可能涉及多个微服务的调用。当调用某个微服务出现异常时,我们可以使用 handle 方法在调用链的中间环节对异常进行处理,并返回一个适合后续微服务处理的默认结果。例如,在一个电商系统中,订单服务调用库存服务获取商品库存信息,如果库存服务调用失败,订单服务可以使用 handle 方法返回一个默认的库存值,并继续处理订单流程。

  2. 异步数据处理流水线:在大数据处理领域,经常需要对异步获取的数据进行一系列的处理步骤。CompletableFuturehandle 方法可以在每个处理步骤完成后,对结果进行检查和转换,确保整个流水线的稳定性和可靠性。比如,从分布式文件系统中异步读取数据块,然后对每个数据块进行解析、清洗和聚合操作,在每个步骤中都可以使用 handle 方法处理可能出现的异常,并将结果传递到下一个步骤。

  3. 异步任务的重试机制:结合 handle 方法和循环,可以实现异步任务的重试逻辑。当任务执行失败时,handle 方法捕获异常并决定是否进行重试。例如,在网络请求场景中,如果请求失败,handle 方法可以根据异常类型判断是否是网络临时故障,如果是,则进行重试,直到达到最大重试次数或请求成功。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class CompletableFutureRetryWithHandleExample {

    public static CompletableFuture<String> asyncNetworkRequest() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟网络请求,这里随机抛出异常
            if (Math.random() < 0.5) {
                throw new RuntimeException("网络请求失败");
            }
            return "请求成功,返回数据";
        });
    }

    public static CompletableFuture<String> retryAsyncRequest(int maxRetries) {
        CompletableFuture<String> future = asyncNetworkRequest();
        for (int i = 0; i < maxRetries; i++) {
            future = future.handle((result, ex) -> {
                if (ex != null) {
                    System.err.println("第 " + (i + 1) + " 次请求失败,重试...");
                    return null;
                } else {
                    return result;
                }
            }).thenApplyAsync(result -> {
                if (result == null) {
                    return asyncNetworkRequest();
                } else {
                    return CompletableFuture.completedFuture(result);
                }
            }).thenCompose(f -> f);
        }
        return future;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = retryAsyncRequest(3);
        System.out.println("最终结果: " + future.get(5, TimeUnit.SECONDS));
    }
}

在这个示例中,asyncNetworkRequest 方法模拟一个可能失败的网络请求。retryAsyncRequest 方法使用 handle 方法在每次请求失败时进行重试,最多重试 3 次。

通过以上详细的介绍、示例和分析,我们对 CompletableFuturehandle 方法有了更深入的理解。它在异步编程中提供了强大的异常处理和结果转换能力,能够帮助我们构建健壮、高效的异步应用程序。无论是在小型的单线程应用还是大型的分布式系统中,handle 方法都能发挥重要作用。在实际应用中,需要根据具体的业务需求和场景,合理地使用 handle 方法及其相关的异步操作,以达到最佳的性能和用户体验。同时,要注意并发控制和资源管理,避免出现性能瓶颈和资源浪费的问题。