MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步任务执行效率提升

2022-12-123.0k 阅读

Java 中 CompletableFuture 基础概述

在现代 Java 开发中,异步编程变得越来越重要,尤其是在处理高并发、I/O 密集型任务时,能够显著提升系统的性能和响应能力。CompletableFuture 是 Java 8 引入的一个强大的类,用于支持异步计算,它实现了 Future 接口和 CompletionStage 接口。

Future 接口是 Java 早期用于异步计算的工具,它允许我们启动一个异步任务,并在稍后获取任务的结果。但是,Future 存在一些局限性,例如我们无法在任务完成时得到通知,只能通过轮询或者阻塞的方式获取结果,这在一定程度上限制了它在复杂异步场景中的应用。

CompletableFuture 则弥补了这些不足。它不仅提供了更灵活的异步任务处理方式,还支持链式调用、组合多个异步任务等功能。例如,我们可以通过 CompletableFuture.supplyAsync 方法创建一个异步任务:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务执行完成";
});
future.thenAccept(System.out::println);

在上述代码中,supplyAsync 方法接受一个 Supplier 作为参数,这个 Supplier 中的代码会在一个新的线程中异步执行。thenAccept 方法则是在任务完成后执行一个消费操作,这里是将任务的结果打印出来。

CompletableFuture 的异步任务执行机制

CompletableFuture 的异步任务执行依赖于 ForkJoinPoolForkJoinPool 是 Java 7 引入的一种线程池,它特别适用于分治算法的任务执行。CompletableFuture 默认使用 ForkJoinPool.commonPool() 来执行异步任务。

ForkJoinPool 内部维护了多个工作线程,这些线程会从任务队列中获取任务并执行。当一个 CompletableFuture 任务被提交时,它会被添加到 ForkJoinPool 的任务队列中。工作线程会不断地从队列中取出任务并执行。

例如,我们可以通过以下方式获取 ForkJoinPool.commonPool() 的一些信息:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
System.out.println("并行度: " + forkJoinPool.getParallelism());
System.out.println("活动线程数: " + forkJoinPool.getActiveThreadCount());

getParallelism() 方法返回 ForkJoinPool 的并行度,即同时可以执行的最大任务数。getActiveThreadCount() 方法返回当前活动的线程数。

影响 CompletableFuture 异步任务执行效率的因素

  1. 任务本身的复杂度:如果任务本身是 CPU 密集型的,例如进行大量的数值计算,那么任务的执行时间主要取决于 CPU 的性能和任务的计算量。对于这种情况,单纯增加线程数可能并不能显著提升效率,因为 CPU 的核心数是有限的,过多的线程反而会增加线程切换的开销。
CompletableFuture<Long> cpuIntensiveTask = CompletableFuture.supplyAsync(() -> {
    long sum = 0;
    for (long i = 0; i < 1000000000L; i++) {
        sum += i;
    }
    return sum;
});
  1. 线程池的配置:如前面提到的,CompletableFuture 默认使用 ForkJoinPool.commonPool(),它的并行度是根据 CPU 核心数来设置的。如果我们的应用场景有特殊的需求,例如处理大量 I/O 密集型任务,可能需要调整线程池的参数。我们可以创建自己的 ForkJoinPool 并设置合适的并行度。
ForkJoinPool customPool = new ForkJoinPool(10);
CompletableFuture<String> futureWithCustomPool = CompletableFuture.supplyAsync(() -> {
    // 模拟任务
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务执行完成";
}, customPool);
  1. 任务之间的依赖关系:当多个 CompletableFuture 任务之间存在依赖关系时,例如一个任务的结果作为另一个任务的输入,任务的执行顺序和等待时间会影响整体效率。如果不合理地设置依赖关系,可能会导致不必要的等待和线程阻塞。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = future1.thenApplyAsync(i -> i * 2);
CompletableFuture<Integer> future3 = future2.thenApplyAsync(i -> i + 5);

在上述代码中,future2 依赖于 future1 的结果,future3 依赖于 future2 的结果。如果 future1 执行时间较长,future2future3 就需要等待。

提升 CompletableFuture 异步任务执行效率的策略

  1. 优化任务本身:对于 CPU 密集型任务,可以考虑使用并行算法,例如 Java 8 中的并行流。并行流会利用多个线程来处理数据,从而提高计算效率。
long parallelSum = LongStream.range(0, 1000000000L)
      .parallel()
      .sum();

对于 I/O 密集型任务,可以采用异步 I/O 操作。例如,在进行文件读取时,可以使用 AsynchronousSocketChannel 或者 AsynchronousFileChannel 来进行异步操作,避免线程阻塞。 2. 合理配置线程池:根据任务的类型(CPU 密集型或 I/O 密集型)来调整线程池的并行度。对于 I/O 密集型任务,可以适当增加并行度,因为在 I/O 操作等待时,线程可以去执行其他任务。而对于 CPU 密集型任务,并行度应该接近 CPU 的核心数。 我们还可以设置线程池的其他参数,例如 ForkJoinPoolasyncModeasyncMode 设置为 true 时,ForkJoinPool 会采用异步模式,更适合处理大量短时间的异步任务。

ForkJoinPool asyncPool = new ForkJoinPool(10,
      ForkJoinPool.defaultForkJoinWorkerThreadFactory,
      null, true);
  1. 优化任务依赖关系:尽量减少不必要的任务依赖,将可以并行执行的任务并行化。例如,如果有多个任务需要独立计算一些数据,然后再进行汇总,可以先并行执行这些独立任务,最后再进行汇总操作。
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combinedFuture = CompletableFuture.allOf(task1, task2)
      .thenApply(v -> task1.join() + task2.join());

在上述代码中,task1task2 可以并行执行,最后通过 allOf 方法等待两个任务都完成后,再进行结果的汇总。

利用 CompletableFuture 的组合操作提升效率

CompletableFuture 提供了丰富的组合操作方法,如 thenApplythenComposethenCombine 等,合理利用这些方法可以优化异步任务的执行流程,从而提升效率。

  1. thenApplythenApply 方法用于在任务完成后对结果进行转换。它接收一个 Function 作为参数,该 Function 会在任务结果可用时被调用。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Integer> lengthFuture = future.thenApply(String::length);
lengthFuture.thenAccept(System.out::println);

在上述代码中,supplyAsync 方法创建的异步任务返回一个字符串 "Hello",thenApply 方法将这个字符串转换为它的长度,最后通过 thenAccept 打印出长度。

  1. thenComposethenCompose 方法用于将一个 CompletableFuture 的结果作为另一个 CompletableFuture 的输入,并返回一个新的 CompletableFuture。与 thenApply 不同的是,thenApply 返回的 Function 直接返回一个值,而 thenCompose 返回的 Function 返回一个 CompletableFuture
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> future2 = future1.thenCompose(s -> CompletableFuture.supplyAsync(() -> "Hello, " + s));
future2.thenAccept(System.out::println);

在上述代码中,future1 的结果 "World" 作为 future2 异步任务的输入,future2 会在新的异步任务中拼接字符串并返回。

  1. thenCombinethenCombine 方法用于将两个 CompletableFuture 的结果合并。它接收另一个 CompletableFuture 和一个 BiFunction 作为参数,当两个 CompletableFuture 都完成时,BiFunction 会被调用,将两个任务的结果合并。
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combinedFuture = futureA.thenCombine(futureB, (a, b) -> a + b);
combinedFuture.thenAccept(System.out::println);

在上述代码中,futureAfutureB 并行执行,当它们都完成后,thenCombine 方法将两个任务的结果相加并返回新的 CompletableFuture

异常处理对 CompletableFuture 执行效率的影响及优化

在异步任务执行过程中,异常处理是必不可少的。CompletableFuture 提供了多种异常处理方法,如 exceptionallyhandle 等,合理的异常处理策略也能对执行效率产生影响。

  1. exceptionallyexceptionally 方法用于在任务发生异常时提供一个替代结果。它接收一个 Function 作为参数,当任务抛出异常时,这个 Function 会被调用,传入异常对象,返回一个替代结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("模拟异常");
    }
    return "任务成功";
}).exceptionally(ex -> {
    System.out.println("捕获到异常: " + ex.getMessage());
    return "默认结果";
});
future.thenAccept(System.out::println);

在上述代码中,如果异步任务抛出异常,exceptionally 方法中的 Function 会捕获异常并返回 "默认结果"。

  1. handlehandle 方法既可以处理正常结果,也可以处理异常。它接收一个 BiFunction 作为参数,第一个参数是任务的结果(如果任务成功),第二个参数是异常对象(如果任务失败)。
CompletableFuture<String> futureHandle = CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("模拟异常");
    }
    return "任务成功";
}).handle((result, ex) -> {
    if (ex != null) {
        System.out.println("捕获到异常: " + ex.getMessage());
        return "默认结果";
    }
    return result;
});
futureHandle.thenAccept(System.out::println);

在异常处理时,要避免过于复杂的异常处理逻辑,因为这可能会增加额外的开销。尽量采用简洁的异常处理方式,快速返回合适的结果或者进行必要的清理操作,以减少对异步任务执行效率的影响。

实际应用场景中的 CompletableFuture 效率优化案例

假设我们有一个电商系统,需要在用户下单后同时执行多个异步任务,如库存扣减、订单记录写入数据库、发送订单确认邮件等。

  1. 未优化的实现
public class OrderService {
    public void placeOrder(Order order) {
        CompletableFuture.runAsync(() -> {
            // 库存扣减
            inventoryService.decreaseInventory(order.getProductId(), order.getQuantity());
        });
        CompletableFuture.runAsync(() -> {
            // 订单记录写入数据库
            orderRepository.save(order);
        });
        CompletableFuture.runAsync(() -> {
            // 发送订单确认邮件
            emailService.sendOrderConfirmationEmail(order.getCustomerEmail(), order);
        });
    }
}

在这个实现中,虽然三个任务是异步执行的,但它们之间没有进行有效的协调和优化。例如,库存扣减失败时,没有进行统一的异常处理,可能会导致订单状态不一致等问题。

  1. 优化后的实现
public class OptimizedOrderService {
    public CompletableFuture<Void> placeOrder(Order order) {
        CompletableFuture<Void> inventoryFuture = CompletableFuture.runAsync(() -> {
            inventoryService.decreaseInventory(order.getProductId(), order.getQuantity());
        });
        CompletableFuture<Void> saveOrderFuture = CompletableFuture.runAsync(() -> {
            orderRepository.save(order);
        });
        CompletableFuture<Void> emailFuture = CompletableFuture.runAsync(() -> {
            emailService.sendOrderConfirmationEmail(order.getCustomerEmail(), order);
        });

        return CompletableFuture.allOf(inventoryFuture, saveOrderFuture, emailFuture)
              .exceptionally(ex -> {
                    // 统一的异常处理
                    System.out.println("下单过程中出现异常: " + ex.getMessage());
                    // 进行必要的回滚操作
                    inventoryService.rollbackInventory(order.getProductId(), order.getQuantity());
                    orderRepository.delete(order);
                    return null;
                });
    }
}

在优化后的实现中,使用 allOf 方法等待所有任务完成,并通过 exceptionally 方法进行统一的异常处理。如果任何一个任务失败,都会进行库存回滚和订单删除等操作,保证了系统的一致性。同时,通过合理的异常处理和任务协调,提升了整个下单流程的可靠性和执行效率。

CompletableFuture 与其他异步编程模型的比较及优势

  1. 与 Future 的比较:如前文所述,Future 只能通过轮询或阻塞的方式获取任务结果,而 CompletableFuture 支持异步回调、链式调用等功能。CompletableFuture 提供了更丰富的方法来处理异步任务的完成、失败和结果转换,使得异步编程更加灵活和高效。
// Future 的使用示例
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> futureOld = executorService.submit(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务完成";
});
try {
    String result = futureOld.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
executorService.shutdown();

// CompletableFuture 的使用示例
CompletableFuture<String> futureNew = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务完成";
});
futureNew.thenAccept(System.out::println);

从上述代码可以看出,CompletableFuture 不需要手动阻塞获取结果,而是通过回调的方式处理结果,代码更加简洁和高效。

  1. 与 RxJava 的比较:RxJava 是一个基于观察者模式的异步编程框架,它提供了强大的数据流处理能力。CompletableFuture 则是 Java 标准库中的异步编程工具。CompletableFuture 更轻量级,与 Java 生态系统的集成度更高,对于简单的异步任务处理更加方便。而 RxJava 适用于处理复杂的异步数据流,例如事件驱动的 UI 编程、实时数据处理等场景。 例如,在处理多个异步任务的合并时,CompletableFuture 可以通过 allOf 等方法简单实现,而 RxJava 则需要使用 Observable.zip 等操作符,语法相对复杂一些。但在处理数据流的变换、过滤等方面,RxJava 提供了更丰富的操作符。

总结 CompletableFuture 异步任务执行效率提升要点

  1. 任务优化:针对 CPU 密集型任务采用并行算法,I/O 密集型任务采用异步 I/O 操作。
  2. 线程池配置:根据任务类型合理调整线程池的并行度等参数,必要时创建自定义线程池。
  3. 任务依赖管理:减少不必要的任务依赖,并行化可并行的任务,合理利用组合操作优化任务流程。
  4. 异常处理:采用简洁有效的异常处理方式,避免复杂逻辑增加开销。
  5. 与其他模型比较:根据实际场景选择合适的异步编程模型,充分发挥 CompletableFuture 轻量级、与 Java 集成度高的优势。

通过以上对 CompletableFuture 异步任务执行效率提升的深入探讨和实践,我们可以在 Java 开发中更高效地利用异步编程,提升系统的性能和响应能力,满足现代应用对高并发和快速响应的需求。在实际项目中,需要根据具体的业务场景和需求,灵活运用这些优化策略,以达到最佳的执行效率。同时,不断关注 Java 异步编程的新特性和发展趋势,持续优化和改进代码,以适应不断变化的业务需求和技术环境。