MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 任务组合 thenCompose 方法

2021-02-111.7k 阅读

1. CompletableFuture 概述

在 Java 中,CompletableFuture 是 Java 8 引入的一个强大工具,用于异步编程。它使得处理异步任务变得更加容易和灵活。CompletableFuture 实现了 Future 接口,并且扩展了其功能,提供了很多用于处理异步操作结果的方法。通过 CompletableFuture,我们可以更方便地进行异步任务的创建、组合、处理结果以及异常处理等操作。

2. thenCompose 方法的作用

thenCompose 方法是 CompletableFuture 中用于任务组合的重要方法之一。它的主要作用是将两个异步操作进行组合,使得第一个异步操作完成后,其结果作为参数传递给第二个异步操作,并且返回一个新的 CompletableFuture,这个新的 CompletableFuture 代表了整个组合操作的结果。

thenCompose 方法的签名如下:

public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

其中,T 是当前 CompletableFuture 的结果类型,U 是组合后新的 CompletableFuture 的结果类型。fn 是一个函数,它接收当前 CompletableFuture 的结果,并返回一个新的 CompletionStage(通常是一个 CompletableFuture)。

3. 简单示例

让我们通过一个简单的例子来理解 thenCompose 的基本用法。假设我们有两个异步任务:第一个任务模拟从数据库中查询用户信息,第二个任务根据用户信息查询用户的订单信息。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class User {
    private String name;

    public User(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

class Order {
    private String orderInfo;

    public Order(String orderInfo) {
        this.orderInfo = orderInfo;
    }

    public String getOrderInfo() {
        return orderInfo;
    }
}

public class ComposeExample {
    public static CompletableFuture<User> getUserFromDB() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟数据库查询
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new User("John");
        });
    }

    public static CompletableFuture<Order> getOrderByUser(User user) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户查询订单
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Order("Order of " + user.getName());
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Order> result = getUserFromDB()
              .thenCompose(ComposeExample::getOrderByUser);
        System.out.println(result.get().getOrderInfo());
    }
}

在上述代码中,getUserFromDB 方法返回一个 CompletableFuture<User>,模拟从数据库获取用户信息。getOrderByUser 方法接收一个 User 对象,返回一个 CompletableFuture<Order>,模拟根据用户获取订单信息。通过 thenCompose 方法,我们将这两个异步操作组合起来,先获取用户,然后根据用户获取订单,最终得到包含订单信息的 CompletableFuture<Order>

4. 与 thenApply 的区别

在理解 thenCompose 时,很容易与 thenApply 方法混淆。thenApply 方法也用于处理 CompletableFuture 的结果,但它返回的是一个直接包含处理后结果的 CompletableFuture,而不是像 thenCompose 那样返回一个由另一个 CompletableFuture 构成的新 CompletableFuture

thenApply 方法的签名如下:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

它接收一个函数 fn,该函数接收当前 CompletableFuture 的结果 T,并返回一个新的结果 U,返回的 CompletableFuture<U> 直接包含这个新结果。

例如,假设我们只想获取用户的名字并转换为大写:

public class ApplyExample {
    public static CompletableFuture<User> getUserFromDB() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟数据库查询
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new User("John");
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> result = getUserFromDB()
              .thenApply(user -> user.getName().toUpperCase());
        System.out.println(result.get());
    }
}

在这个例子中,thenApply 直接处理 User 对象,返回一个包含大写用户名的 CompletableFuture<String>

5. 链式调用与复杂任务组合

thenCompose 方法非常适合链式调用,用于构建复杂的异步任务链。例如,假设我们有一系列任务:从数据库获取用户,根据用户获取订单,根据订单获取订单详情,最后处理订单详情。

class OrderDetail {
    private String detailInfo;

    public OrderDetail(String detailInfo) {
        this.detailInfo = detailInfo;
    }

    public String getDetailInfo() {
        return detailInfo;
    }
}

public class ChainComposeExample {
    public static CompletableFuture<User> getUserFromDB() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟数据库查询
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new User("John");
        });
    }

    public static CompletableFuture<Order> getOrderByUser(User user) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户查询订单
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Order("Order of " + user.getName());
        });
    }

    public static CompletableFuture<OrderDetail> getOrderDetailByOrder(Order order) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据订单查询订单详情
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new OrderDetail("Detail of " + order.getOrderInfo());
        });
    }

    public static String processOrderDetail(OrderDetail orderDetail) {
        return "Processed: " + orderDetail.getDetailInfo();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> result = getUserFromDB()
              .thenCompose(ChainComposeExample::getOrderByUser)
              .thenCompose(ChainComposeExample::getOrderDetailByOrder)
              .thenApply(ChainComposeExample::processOrderDetail);
        System.out.println(result.get());
    }
}

在这个例子中,我们通过多次调用 thenComposethenApply,构建了一个复杂的异步任务链。每个任务依赖前一个任务的结果,最终得到处理后的订单详情。

6. 异常处理

在使用 thenCompose 进行任务组合时,异常处理非常重要。如果任何一个异步任务抛出异常,整个任务链都会受到影响。CompletableFuture 提供了多种异常处理方法,例如 exceptionally

让我们修改前面的例子,在 getUserFromDB 方法中添加可能抛出异常的情况:

public class ExceptionComposeExample {
    public static CompletableFuture<User> getUserFromDB() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟数据库查询
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (Math.random() > 0.5) {
                throw new RuntimeException("Database error");
            }
            return new User("John");
        });
    }

    public static CompletableFuture<Order> getOrderByUser(User user) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户查询订单
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Order("Order of " + user.getName());
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Order> result = getUserFromDB()
              .thenCompose(ExceptionComposeExample::getOrderByUser)
              .exceptionally(ex -> {
                    System.out.println("Caught exception: " + ex.getMessage());
                    return null;
                });
        System.out.println(result.get());
    }
}

在上述代码中,如果 getUserFromDB 方法抛出异常,exceptionally 方法会捕获异常并进行处理,返回一个默认值(这里是 null),避免整个任务链因异常而终止。

7. 与其他 CompletableFuture 方法的协同使用

thenCompose 方法可以与其他 CompletableFuture 方法协同使用,以实现更复杂的异步操作。例如,whenComplete 方法可以在 CompletableFuture 完成(无论是正常完成还是异常完成)时执行一个动作。

public class CoordinationExample {
    public static CompletableFuture<User> getUserFromDB() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟数据库查询
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new User("John");
        });
    }

    public static CompletableFuture<Order> getOrderByUser(User user) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户查询订单
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Order("Order of " + user.getName());
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Order> result = getUserFromDB()
              .thenCompose(CoordinationExample::getOrderByUser)
              .whenComplete((order, ex) -> {
                    if (ex == null) {
                        System.out.println("Order retrieved: " + order.getOrderInfo());
                    } else {
                        System.out.println("Exception occurred: " + ex.getMessage());
                    }
                });
        System.out.println(result.get());
    }
}

在这个例子中,whenComplete 方法在 thenCompose 组合的任务完成后执行,根据是否有异常打印相应的信息。

8. 实际应用场景

  • 微服务调用链:在微服务架构中,一个业务操作可能涉及多个微服务的调用,并且后一个微服务的调用依赖前一个微服务的结果。例如,一个电商系统中,先调用用户服务获取用户信息,再根据用户信息调用订单服务获取订单,最后根据订单调用物流服务获取物流信息。通过 thenCompose 可以方便地组合这些异步微服务调用。
  • 数据处理流水线:在数据处理场景中,数据可能需要经过多个步骤的处理,每个步骤都可能是异步的。例如,从文件系统读取数据,然后对数据进行清洗,再进行分析,最后生成报告。每个步骤的结果作为下一个步骤的输入,thenCompose 可以很好地管理这个数据处理流水线。

9. 性能考虑

虽然 thenCompose 提供了强大的异步任务组合能力,但在使用时也需要考虑性能问题。过多的异步任务组合和链式调用可能会导致线程上下文切换频繁,增加系统开销。此外,如果异步任务本身执行时间较长,需要合理调整线程池大小等参数,以提高系统的整体性能。

例如,在前面的例子中,如果每个异步任务的执行时间都很长,我们可以考虑使用自定义的线程池来执行这些任务,而不是使用默认的 ForkJoinPool.commonPool()

import java.util.concurrent.*;

public class PerformanceExample {
    private static final ExecutorService executor = Executors.newFixedThreadPool(10);

    public static CompletableFuture<User> getUserFromDB() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟数据库查询
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new User("John");
        }, executor);
    }

    public static CompletableFuture<Order> getOrderByUser(User user) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户查询订单
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Order("Order of " + user.getName());
        }, executor);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Order> result = getUserFromDB()
              .thenCompose(PerformanceExample::getOrderByUser);
        System.out.println(result.get());
        executor.shutdown();
    }
}

在这个例子中,我们创建了一个固定大小为 10 的线程池 executor,并在 CompletableFuture.supplyAsync 方法中指定使用这个线程池,以更好地控制异步任务的执行和性能。

10. 总结 thenCompose 的特点和优势

  • 任务依赖处理thenCompose 能够很好地处理异步任务之间的依赖关系,确保任务按照顺序依次执行,前一个任务的结果作为后一个任务的输入。
  • 代码简洁:通过链式调用 thenCompose,可以将复杂的异步任务组合写得更加简洁明了,提高代码的可读性和可维护性。
  • 异常处理方便:结合 exceptionally 等异常处理方法,可以方便地处理异步任务链中可能出现的异常,避免因一个任务失败而导致整个流程崩溃。
  • 灵活性高:与其他 CompletableFuture 方法协同使用,能够满足各种复杂的异步编程需求,适用于多种实际应用场景。

通过深入理解和合理使用 thenCompose 方法,Java 开发者可以更高效地进行异步编程,充分发挥多核处理器的性能优势,提升应用程序的响应速度和吞吐量。在实际开发中,需要根据具体的业务需求和性能要求,灵活运用 thenCompose 及其相关方法,构建出健壮、高效的异步应用程序。

希望通过以上内容,读者对 JavaCompletableFuturethenCompose 方法有了更全面、深入的理解和掌握,能够在实际项目中熟练运用该方法解决异步任务组合的问题。