MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 多个任务 AllOf 关系

2021-03-152.8k 阅读

Java 中 CompletableFuture 多个任务 AllOf 关系

在 Java 的异步编程领域,CompletableFuture 提供了强大且灵活的工具来处理异步任务。其中,allOf 方法是用于处理多个异步任务之间关系的重要手段,它表示所有任务都完成时才进行后续操作,这种关系在很多实际应用场景中非常关键。

CompletableFuture 基础回顾

在深入探讨 allOf 之前,先简要回顾一下 CompletableFuture 的基本概念。CompletableFuture 是 Java 8 引入的类,用于支持异步计算。它实现了 Future 接口和 CompletionStage 接口,使得异步任务的处理更加便捷。

CompletableFuture 可以通过多种方式创建异步任务,例如 supplyAsync 用于返回一个有返回值的异步任务,runAsync 用于执行没有返回值的异步任务。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    // 模拟一些耗时操作
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Result of future1";
});

CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
    // 模拟一些耗时操作
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("future2 completed");
});

allOf 方法概述

CompletableFutureallOf 方法用于创建一个新的 CompletableFuture,该 CompletableFuture 在作为参数传入的所有 CompletableFuture 都完成时才会完成。其方法签名如下:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

这里的 cfs 是一个可变参数,即可以传入多个 CompletableFuture 对象。返回的 CompletableFuture<Void> 没有实际的返回值,因为它只关注所有任务是否都完成。

allOf 的应用场景

  1. 批处理任务:在需要同时执行多个独立的任务,并在所有任务都完成后进行汇总操作时,allOf 非常有用。例如,在一个电商系统中,可能需要同时查询商品的库存、价格和评论信息,只有当这三个查询任务都完成后,才能完整地展示商品详情给用户。
  2. 数据校验:假设需要对用户输入的多个数据字段进行异步校验,只有当所有字段的校验都通过后,才能进行下一步操作,如保存用户信息到数据库。

代码示例 - 简单批处理任务

下面通过一个简单的示例来展示 allOf 的使用。假设有三个任务,分别模拟获取商品库存、价格和评论信息。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAllOfExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> inventoryFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Inventory: 100 units";
        });

        CompletableFuture<String> priceFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Price: $10.99";
        });

        CompletableFuture<String> reviewFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Reviews: 4.5 out of 5 stars";
        });

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(inventoryFuture, priceFuture, reviewFuture);

        allFutures.join();

        try {
            System.out.println(inventoryFuture.get());
            System.out.println(priceFuture.get());
            System.out.println(reviewFuture.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中:

  1. 首先创建了三个 CompletableFuture 任务,分别模拟获取商品库存、价格和评论信息,每个任务都有不同的模拟耗时。
  2. 然后使用 CompletableFuture.allOf 方法将这三个任务组合起来,返回一个新的 CompletableFuture<Void>
  3. 通过调用 allFutures.join() 方法等待所有任务完成。
  4. 最后分别获取每个任务的结果并打印。

allOf 的异常处理

当使用 allOf 时,如果其中任何一个 CompletableFuture 任务抛出异常,allOf 返回的 CompletableFuture 也会以异常状态完成。可以通过 exceptionally 方法来处理异常。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAllOfExceptionExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of future1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Simulated exception in future2");
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of future3";
        });

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

        allFutures.exceptionally(ex -> {
            System.out.println("An exception occurred: " + ex.getMessage());
            return null;
        }).join();

        try {
            System.out.println(future1.get());
            System.out.println(future2.get());
            System.out.println(future3.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,future2 故意抛出一个运行时异常。通过 allFutures.exceptionally 方法捕获异常并进行处理,打印出异常信息。

allOf 与并行执行

虽然 allOf 本身并不直接控制任务的并行性,但它与 CompletableFuture 的异步执行机制相结合,可以实现并行任务的管理。当使用 supplyAsyncrunAsync 创建 CompletableFuture 任务时,这些任务会在默认的 ForkJoinPool.commonPool() 线程池中并行执行(除非指定了其他线程池)。

例如,在前面的商品信息查询示例中,获取库存、价格和评论信息的任务会并行执行,而 allOf 则负责等待所有任务都完成后再进行后续操作。

allOf 的性能考量

在使用 allOf 时,需要考虑性能方面的因素。如果有大量的任务通过 allOf 组合,可能会对系统资源造成一定压力。一方面,每个 CompletableFuture 任务都会占用线程池中的一个线程,过多的任务可能导致线程池饱和。另一方面,等待所有任务完成可能会增加整体的响应时间。

为了优化性能,可以考虑以下几点:

  1. 合理设置线程池:根据系统的负载和任务特性,设置合适大小的线程池,避免线程过多或过少。
  2. 任务拆分与合并:对于复杂的任务,可以进一步拆分成更小的子任务,以提高并行度,但也要注意任务拆分带来的额外开销。
  3. 异步结果缓存:如果某些任务的结果是重复使用的,可以考虑缓存这些结果,避免重复计算。

allOf 与其他 CompletionStage 方法的结合使用

CompletableFuture 实现了 CompletionStage 接口,提供了丰富的方法来处理异步任务的结果和流程控制。allOf 可以与其他 CompletionStage 方法结合使用,以实现更复杂的异步逻辑。

例如,可以在 allOf 完成后,使用 thenApply 方法对所有任务的结果进行汇总处理。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAllOfCombinationExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of future1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of future2";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of future3";
        });

        CompletableFuture<List<String>> combinedFuture = CompletableFuture.allOf(future1, future2, future3)
               .thenApply(v -> {
                    List<String> results = new ArrayList<>();
                    try {
                        results.add(future1.get());
                        results.add(future2.get());
                        results.add(future3.get());
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                    return results;
                });

        List<String> results = combinedFuture.get();
        System.out.println(results);
    }
}

在这个示例中,allOf 等待所有任务完成后,通过 thenApply 方法将所有任务的结果收集到一个 List 中并返回。

allOf 在实际项目中的应用案例

  1. 微服务架构中的数据聚合:在微服务架构中,一个业务请求可能需要调用多个不同的微服务来获取相关数据。例如,一个用户资料展示页面可能需要从用户服务获取基本信息,从订单服务获取订单历史,从积分服务获取用户积分。可以使用 CompletableFutureallOf 方法并行调用这些微服务,并在所有调用都完成后进行数据聚合和展示。
  2. 分布式系统中的批量操作:在分布式系统中,可能需要对多个节点执行相同的操作,如批量更新数据、批量查询等。通过 allOf 可以并行发起这些操作,并等待所有操作完成后进行统一的结果处理。

总结 allOf 的要点

  1. allOf 用于创建一个新的 CompletableFuture,该 CompletableFuture 在所有传入的 CompletableFuture 任务都完成时才会完成。
  2. 它适用于批处理任务、数据校验等场景,能有效管理多个异步任务之间的关系。
  3. 注意异常处理,当任何一个任务抛出异常时,allOf 返回的 CompletableFuture 也会以异常状态完成。
  4. 结合 CompletableFuture 的其他方法,可以实现更复杂的异步逻辑。
  5. 在性能方面,要合理设置线程池、优化任务拆分与合并以及考虑异步结果缓存等。

通过深入理解和掌握 CompletableFutureallOf 方法,开发者可以更加高效地编写异步代码,提升系统的性能和响应能力,特别是在处理多个异步任务之间的关系时,能够更加优雅地实现复杂的业务逻辑。无论是在小型应用还是大型分布式系统中,allOf 都为异步编程提供了强大的支持。

希望通过本文的介绍和示例,能帮助读者更好地理解和应用 CompletableFutureallOf 方法在多个任务关系处理中的作用。在实际开发中,根据具体的业务需求和系统架构,灵活运用这一特性,将为程序带来更高的并发性能和更好的用户体验。