MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步任务回调参数处理

2021-01-094.8k 阅读

CompletableFuture 基础概述

在 Java 编程领域,异步编程已经成为提升程序性能和响应性的重要手段。随着多核处理器的普及,充分利用多核资源进行并行处理变得尤为关键。CompletableFuture 正是 Java 8 引入的一个强大工具,用于支持异步编程,它提供了一种更灵活、更高效的方式来处理异步任务的结果,包括处理回调参数。

CompletableFuture 实现了 FutureCompletionStage 接口。Future 接口在 Java 5 就已引入,它允许我们异步执行任务并获取任务的结果。然而,Future 存在一些局限性,例如它缺乏对异步任务完成后的链式操作支持,并且获取结果时可能会导致阻塞。而 CompletionStage 接口弥补了这些不足,CompletableFuture 结合两者的优势,使得异步编程更加流畅和强大。

CompletableFuture 可以通过多种方式创建。最基本的方式是使用 CompletableFuture.supplyAsync 方法来创建一个异步任务,该任务返回一个结果。例如:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务完成";
});

在上述代码中,supplyAsync 方法接收一个 Supplier 作为参数,这个 Supplier 会在一个新的线程中执行,模拟了一个耗时操作,最后返回一个字符串结果。

CompletableFuture 回调方法分类

基于任务完成结果的回调

  1. thenApply 方法thenApply 方法用于在异步任务完成后,对任务的结果进行转换。它接收一个 Function 作为参数,这个 Function 的输入是异步任务的结果,输出是转换后的结果。例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "原始结果");
CompletableFuture<String> transformedFuture = future.thenApply(result -> result + " 已转换");
transformedFuture.thenAccept(System.out::println);

在这段代码中,thenApply 方法将异步任务返回的 “原始结果” 字符串,转换为 “原始结果 已转换”。thenAccept 方法则用于消费最终的结果,这里是将结果打印到控制台。

  1. thenApplyAsync 方法:与 thenApply 类似,thenApplyAsync 方法也是用于对任务结果进行转换,但它会在一个新的线程中执行 Function。这在某些需要更细粒度控制线程执行环境的场景下非常有用。例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "原始结果");
CompletableFuture<String> transformedFuture = future.thenApplyAsync(result -> {
    // 在新线程中执行转换操作
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return result + " 异步转换";
});
transformedFuture.thenAccept(System.out::println);

这里,thenApplyAsync 中的转换操作会在一个新线程中执行,并且额外模拟了一秒的耗时操作。

消费任务完成结果的回调

  1. thenAccept 方法thenAccept 方法用于在异步任务完成后,消费任务的结果,但不返回新的结果。它接收一个 Consumer 作为参数,这个 Consumer 会处理异步任务的结果。例如:
CompletableFuture.supplyAsync(() -> "任务结果").thenAccept(result -> System.out.println("消费结果: " + result));

在上述代码中,thenAccept 方法中的 Consumer 简单地将异步任务的结果打印到控制台。

  1. thenAcceptAsync 方法:和 thenApplyAsync 类似,thenAcceptAsync 方法会在一个新的线程中执行 Consumer 来消费异步任务的结果。例如:
CompletableFuture.supplyAsync(() -> "任务结果").thenAcceptAsync(result -> {
    // 在新线程中消费结果
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("异步消费结果: " + result);
});
try {
    Thread.sleep(2000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

这里,thenAcceptAsync 中的 Consumer 会在新线程中执行,并且模拟了一秒的耗时操作。为了确保主线程不会提前结束,主线程也进行了两秒的睡眠。

处理任务完成或异常的回调

  1. whenComplete 方法whenComplete 方法用于在异步任务完成(无论是正常完成还是异常完成)时执行一个 BiConsumer。这个 BiConsumer 的第一个参数是任务的结果(如果任务正常完成),第二个参数是任务抛出的异常(如果任务异常完成)。例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("任务异常");
    }
    return "任务正常完成";
});
future.whenComplete((result, exception) -> {
    if (exception != null) {
        System.out.println("任务异常: " + exception.getMessage());
    } else {
        System.out.println("任务结果: " + result);
    }
});

在上述代码中,通过 Math.random() 模拟了任务可能会异常完成的情况。whenComplete 方法中的 BiConsumer 根据任务完成的状态进行相应的处理。

  1. whenCompleteAsync 方法whenCompleteAsync 方法与 whenComplete 类似,但它会在一个新的线程中执行 BiConsumer。例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("任务异常");
    }
    return "任务正常完成";
});
future.whenCompleteAsync((result, exception) -> {
    if (exception != null) {
        System.out.println("异步任务异常: " + exception.getMessage());
    } else {
        System.out.println("异步任务结果: " + result);
    }
});
try {
    Thread.sleep(2000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

这里,whenCompleteAsync 中的 BiConsumer 在新线程中执行,同样为了确保主线程不提前结束,主线程进行了两秒的睡眠。

  1. exceptionally 方法exceptionally 方法专门用于处理异步任务中的异常情况。它接收一个 Function 作为参数,这个 Function 的输入是任务抛出的异常,输出是一个替代结果。例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("任务异常");
    }
    return "任务正常完成";
});
CompletableFuture<String> handledFuture = future.exceptionally(exception -> {
    System.out.println("处理异常: " + exception.getMessage());
    return "默认结果";
});
handledFuture.thenAccept(System.out::println);

在上述代码中,如果异步任务抛出异常,exceptionally 方法中的 Function 会处理异常并返回一个默认结果,这个默认结果会被后续的 thenAccept 消费。

复杂场景下的回调参数处理

多个 CompletableFuture 组合的回调处理

  1. thenCombine 方法thenCombine 方法用于将两个 CompletableFuture 的结果合并。它接收另一个 CompletableFuture 和一个 BiFunction 作为参数,BiFunction 的两个输入分别是两个 CompletableFuture 的结果,输出是合并后的结果。例如:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " 和 " + result2);
combinedFuture.thenAccept(System.out::println);

在这段代码中,thenCombine 方法将 future1future2 的结果合并为 “结果1 和 结果2”,并通过 thenAccept 打印出来。

  1. thenAcceptBoth 方法thenAcceptBoth 方法与 thenCombine 类似,但它只消费两个 CompletableFuture 的结果,不返回新的结果。它接收另一个 CompletableFuture 和一个 BiConsumer 作为参数。例如:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2");
future1.thenAcceptBoth(future2, (result1, result2) -> System.out.println("组合结果: " + result1 + " 和 " + result2));

这里,thenAcceptBoth 方法中的 BiConsumer 直接消费了两个 CompletableFuture 的结果并打印。

  1. applyToEither 方法applyToEither 方法用于在两个 CompletableFuture 中任意一个完成时,对其结果进行转换。它接收另一个 CompletableFuture 和一个 Function 作为参数,Function 的输入是先完成的 CompletableFuture 的结果,输出是转换后的结果。例如:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "结果1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "结果2";
});
CompletableFuture<String> transformedFuture = future1.applyToEither(future2, result -> result + " 已转换");
transformedFuture.thenAccept(System.out::println);

在上述代码中,future2 会先完成,applyToEither 方法会对 future2 的结果进行转换并打印。

  1. acceptEither 方法acceptEither 方法与 applyToEither 类似,但它只消费先完成的 CompletableFuture 的结果,不返回新的结果。它接收另一个 CompletableFuture 和一个 Consumer 作为参数。例如:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "结果1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "结果2";
});
future1.acceptEither(future2, result -> System.out.println("消费结果: " + result));

这里,acceptEither 方法中的 Consumer 消费了先完成的 future2 的结果并打印。

异步任务流水线中的回调参数传递

在实际应用中,我们常常需要构建异步任务的流水线,即前一个异步任务的结果作为后一个异步任务的输入。CompletableFuture 提供了很好的支持来处理这种场景下的回调参数传递。

例如,假设有一个用户注册的场景,首先需要异步验证用户名是否可用,然后根据验证结果异步创建用户。代码如下:

CompletableFuture<Boolean> validateFuture = CompletableFuture.supplyAsync(() -> {
    // 模拟用户名验证
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return true;
});
CompletableFuture<String> createUserFuture = validateFuture.thenApplyAsync(isValid -> {
    if (isValid) {
        // 模拟创建用户
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "用户创建成功";
    } else {
        return "用户名不可用,创建失败";
    }
});
createUserFuture.thenAccept(System.out::println);

在上述代码中,validateFuture 异步验证用户名,其结果通过 thenApplyAsync 传递给创建用户的异步任务 createUserFuture,最终创建用户的结果通过 thenAccept 打印出来。

深入理解回调参数处理的原理

CompletionStage 接口的作用

CompletableFuture 实现了 CompletionStage 接口,这个接口定义了一系列方法来支持异步任务的链式操作和回调处理。CompletionStage 接口的核心思想是将异步任务的完成看作一个阶段,每个阶段可以依赖前一个阶段的结果进行后续操作。

例如,thenApply 方法在 CompletionStage 接口中的定义如下:

<U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

这里,T 是当前阶段的结果类型,U 是转换后的结果类型。Function 用于将当前阶段的结果 T 转换为新的结果 UCompletableFuture 在实现这个方法时,会将 Function 封装成一个 Completion 对象,并将其加入到任务完成的回调队列中。当异步任务完成时,会触发这个 Completion 对象的执行,从而实现结果的转换。

线程模型与回调执行

CompletableFuture 的回调执行涉及到线程模型。在默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 来执行异步任务和回调。这个线程池是一个共享的线程池,适用于大多数场景。

例如,thenApplyAsync 方法会将 Function 提交到 ForkJoinPool.commonPool() 中执行。如果需要使用自定义的线程池,可以通过 CompletableFuture.supplyAsync(Supplier<T> supplier, Executor executor) 方法来指定线程池。

在处理回调时,CompletableFuture 会根据任务的完成状态(正常完成或异常完成)来决定执行哪个回调方法。如果任务正常完成,会执行 thenApplythenAccept 等基于正常结果的回调方法;如果任务异常完成,会执行 whenComplete(带有异常参数)、exceptionally 等处理异常的回调方法。

回调参数的内存管理与生命周期

CompletableFuture 中,回调参数(如 FunctionConsumerBiConsumer 等)的内存管理和生命周期与异步任务的执行密切相关。当一个 CompletableFuture 被创建并启动异步任务时,回调参数会被封装并存储在相关的数据结构中。

例如,thenApply 方法中的 Function 会被存储在 CompletableFuture 的内部队列中。当异步任务完成时,会从队列中取出 Function 并执行。在执行完成后,Function 对象如果没有其他引用,会符合垃圾回收的条件,从而被垃圾回收器回收。

对于复杂的异步任务组合,如 thenCombine 等方法,多个回调参数会被依次处理。在这个过程中,每个回调参数的生命周期也是遵循上述原则,即在任务完成并处理完后,等待垃圾回收。

优化与最佳实践

合理使用线程池

如前所述,CompletableFuture 默认使用 ForkJoinPool.commonPool()。在一些高并发场景下,这个共享线程池可能会出现资源竞争等问题。因此,根据具体业务场景,合理创建和使用自定义线程池是一个重要的优化点。

例如,对于 I/O 密集型任务,可以创建一个较大线程数的线程池:

ExecutorService ioExecutor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
    // I/O 操作
    return "I/O 任务结果";
}, ioExecutor);

而对于 CPU 密集型任务,线程数不宜过多,以免过度竞争 CPU 资源:

ExecutorService cpuExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletableFuture.supplyAsync(() -> {
    // CPU 计算操作
    return "CPU 任务结果";
}, cpuExecutor);

避免不必要的回调嵌套

在处理复杂异步任务时,要避免出现回调地狱(Callback Hell),即过多的回调嵌套导致代码可读性和维护性变差。可以通过合理使用 CompletableFuture 的链式操作来扁平化回调结构。

例如,以下是一个不好的回调嵌套示例:

CompletableFuture.supplyAsync(() -> "结果1").thenApply(result1 -> {
    // 处理结果1
    return result1 + " 处理后";
}).thenApplyAsync(result2 -> {
    // 进一步处理结果2
    return result2 + " 再次处理";
}).thenAccept(result3 -> {
    // 最终消费结果
    System.out.println(result3);
});

虽然上述代码没有过度嵌套,但如果有更多的处理步骤,嵌套会逐渐加深。可以通过方法提取来优化:

CompletableFuture.supplyAsync(() -> "结果1")
       .thenApply(CompletableFutureCallbacks::processResult1)
       .thenApplyAsync(CompletableFutureCallbacks::processResult2)
       .thenAccept(CompletableFutureCallbacks::consumeFinalResult);
class CompletableFutureCallbacks {
    static String processResult1(String result) {
        return result + " 处理后";
    }
    static String processResult2(String result) {
        return result + " 再次处理";
    }
    static void consumeFinalResult(String result) {
        System.out.println(result);
    }
}

这样,代码的逻辑更加清晰,也便于维护。

异常处理的最佳实践

在异步任务中,良好的异常处理是保证程序健壮性的关键。对于 CompletableFuture,要确保在每个可能出现异常的地方进行适当的处理。

例如,在 exceptionally 方法中,不仅要返回默认结果,还可以记录异常信息,以便后续排查问题:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("任务异常");
    }
    return "任务正常完成";
});
CompletableFuture<String> handledFuture = future.exceptionally(exception -> {
    System.err.println("捕获异常: " + exception.getMessage());
    return "默认结果";
});
handledFuture.thenAccept(System.out::println);

同时,结合 whenComplete 方法,可以更全面地处理任务完成和异常情况,确保程序的稳定性。

总结 CompletableFuture 回调参数处理的要点

通过以上对 CompletableFuture 异步任务回调参数处理的详细介绍,我们了解到它在异步编程中的强大功能和灵活性。从基础的回调方法,到复杂场景下的组合处理,再到深入的原理分析和优化实践,CompletableFuture 为 Java 开发者提供了丰富的工具来构建高效、健壮的异步应用程序。

在实际开发中,要根据具体业务需求合理选择回调方法,注意线程池的使用和异常处理,避免常见的代码陷阱,从而充分发挥 CompletableFuture 的优势,提升程序的性能和响应性。掌握 CompletableFuture 的回调参数处理技巧,对于编写高质量的 Java 异步代码至关重要。