MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture thenCompose解耦异步任务的方法

2023-02-133.6k 阅读

Java CompletableFuture thenCompose解耦异步任务的方法

CompletableFuture 基础概念

在Java中,CompletableFuture 是Java 8引入的一个强大的类,用于处理异步计算。它实现了 Future 接口和 CompletionStage 接口,允许我们以一种更灵活、更简洁的方式编写异步代码。Future 接口在Java 5就已经引入,它提供了一种异步执行任务并获取结果的方式。但是,Future 存在一些局限性,比如我们很难对多个 Future 进行链式操作,也难以处理异步操作完成后的结果。而 CompletableFuture 解决了这些问题。

CompletableFuture 可以通过多种方式创建,例如:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个异步任务,这里可能是耗时的I/O操作、数据库查询等
    return "Hello, CompletableFuture!";
});

在上述代码中,supplyAsync 方法接收一个 Supplier 作为参数,并异步执行 Supplier 中的逻辑,返回一个 CompletableFuture 对象,该对象最终会包含 Supplier 返回的结果。

thenCompose 方法的作用

thenCompose 方法是 CompletableFuture 类中一个非常重要的方法,它用于将两个异步操作链式连接起来。更具体地说,thenCompose 允许我们在一个 CompletableFuture 完成后,使用其结果来触发另一个 CompletableFuture 的执行,并且将这两个异步操作解耦。

假设我们有两个异步任务,任务A和任务B,任务B的执行依赖于任务A的结果。在传统的 Future 中,我们可能需要手动获取任务A的结果,然后再启动任务B。而使用 CompletableFuturethenCompose 方法,我们可以更优雅地实现这种链式异步操作。

thenCompose 方法的签名如下:

public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

其中,T 是当前 CompletableFuture 的结果类型,U 是新生成的 CompletableFuture 的结果类型。fn 是一个 Function,它接收当前 CompletableFuture 的结果,并返回一个新的 CompletionStage(通常是另一个 CompletableFuture)。

代码示例1:简单的链式异步操作

下面通过一个简单的示例来演示 thenCompose 的基本用法。假设我们有两个异步任务,第一个任务是从数据库中查询用户信息,第二个任务是根据用户信息获取用户的订单列表。

首先,定义两个模拟的异步方法:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class UserService {
    // 模拟从数据库查询用户信息
    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "User: John Doe";
        });
    }

    // 模拟根据用户信息获取订单列表
    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Orders for " + userInfo + ": Order1, Order2";
        });
    }
}

然后,使用 thenCompose 方法将这两个异步任务连接起来:

public class ThenComposeExample1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> result = UserService.getUserInfo()
               .thenCompose(UserService::getOrderList);
        System.out.println(result.get());
    }
}

在上述代码中,UserService.getUserInfo() 方法返回一个 CompletableFuture<String>,表示获取用户信息的异步操作。thenCompose 方法接收 UserService::getOrderList 作为参数,这是一个方法引用,它将 getUserInfo 操作的结果作为参数传递给 getOrderList 方法,并返回一个新的 CompletableFuture<String>,表示获取订单列表的异步操作。最终,通过 result.get() 获取整个链式异步操作的结果。

thenCompose 与 thenApply 的区别

在使用 CompletableFuture 进行链式操作时,thenApply 也是一个常用的方法,它与 thenCompose 有一些相似之处,但也有重要的区别。

thenApply 方法的签名如下:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

thenApply 接收一个 Function,该 Function 接收当前 CompletableFuture 的结果,并返回一个普通的结果(不是 CompletableFuture)。而 thenCompose 接收的 Function 返回的是一个 CompletableFuture

例如,假设我们有一个 CompletableFuture<Integer>,我们想对其结果进行平方操作,我们可以使用 thenApply

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> squaredFuture = future1.thenApply(i -> i * i);

在上述代码中,thenApply 接收一个 Function,该 FunctionCompletableFuture<Integer> 的结果(这里是5)进行平方操作,并返回一个新的 CompletableFuture<Integer>,其结果为25。

而如果我们想在结果基础上再启动一个异步操作,比如根据平方后的结果进行数据库查询,就需要使用 thenCompose。例如:

class DatabaseService {
    public static CompletableFuture<String> queryByNumber(int number) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟数据库查询
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result for number " + number;
        });
    }
}

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<String> queryResult = future2.thenCompose(i -> DatabaseService.queryByNumber(i * i));

在这个例子中,thenComposeCompletableFuture<Integer> 的结果平方后,作为参数传递给 DatabaseService.queryByNumber 方法,该方法返回一个 CompletableFuture<String>,表示数据库查询的异步操作。

代码示例2:处理复杂业务逻辑

在实际应用中,我们可能会遇到更复杂的业务逻辑,需要多个异步任务链式执行,并且每个任务都可能有不同的返回类型。下面通过一个更复杂的示例来展示 thenCompose 的应用。

假设我们要开发一个电商系统,需要从库存系统查询商品库存,然后根据库存判断是否有足够的商品可以发货,如果有则更新订单状态并通知用户。

首先,定义相关的服务类和方法:

import java.util.concurrent.CompletableFuture;

class InventoryService {
    // 模拟查询商品库存
    public static CompletableFuture<Integer> checkStock(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 假设返回库存数量
            return 10;
        });
    }
}

class OrderService {
    // 模拟更新订单状态
    public static CompletableFuture<String> updateOrderStatus(String orderId, int stock) {
        if (stock > 0) {
            return CompletableFuture.supplyAsync(() -> {
                // 模拟耗时操作
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Order " + orderId + " status updated to shipped";
            });
        } else {
            return CompletableFuture.supplyAsync(() -> "Not enough stock for order " + orderId);
        }
    }
}

class NotificationService {
    // 模拟通知用户
    public static CompletableFuture<String> notifyUser(String message) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "User notified: " + message;
        });
    }
}

然后,使用 thenCompose 来实现整个业务流程:

public class ThenComposeExample2 {
    public static void main(String[] args) throws Exception {
        String productId = "P001";
        String orderId = "O001";

        CompletableFuture<String> result = InventoryService.checkStock(productId)
               .thenCompose(stock -> OrderService.updateOrderStatus(orderId, stock))
               .thenCompose(OrderService::notifyUser);

        System.out.println(result.get());
    }
}

在上述代码中,首先通过 InventoryService.checkStock 查询商品库存,然后使用 thenCompose 将库存结果传递给 OrderService.updateOrderStatus 方法来更新订单状态,最后再使用 thenCompose 将更新订单状态的结果传递给 NotificationService.notifyUser 方法来通知用户。整个过程通过 thenCompose 实现了异步任务的链式调用和解耦。

异常处理

在使用 thenCompose 进行异步操作时,异常处理是非常重要的。CompletableFuture 提供了几种处理异常的方法,比如 exceptionallyhandle 等。

exceptionally 方法接收一个 Function,当 CompletableFuture 执行过程中出现异常时,该 Function 会被调用,返回一个默认值或处理异常后的结果。例如:

CompletableFuture<Integer> futureWithException = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Simulated exception");
});

CompletableFuture<Integer> resultWithException = futureWithException.exceptionally(ex -> {
    System.out.println("Caught exception: " + ex.getMessage());
    return -1;
});

try {
    System.out.println(resultWithException.get());
} catch (Exception e) {
    e.printStackTrace();
}

在上述代码中,futureWithException 故意抛出一个运行时异常。exceptionally 方法捕获到这个异常,并返回 -1 作为默认值。

当使用 thenCompose 进行链式异步操作时,异常会沿着链式传递。例如:

CompletableFuture<String> futureChainWithException = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("First step exception");
})
       .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " appended"));

CompletableFuture<String> resultChainWithException = futureChainWithException.exceptionally(ex -> {
    System.out.println("Caught exception in chain: " + ex.getMessage());
    return "Default value";
});

try {
    System.out.println(resultChainWithException.get());
} catch (Exception e) {
    e.printStackTrace();
}

在这个例子中,第一个 CompletableFuture 抛出异常,这个异常会传递给 thenCompose 链中的后续操作。exceptionally 方法会捕获到这个异常,并返回默认值。

handle 方法与 exceptionally 类似,但它可以同时处理正常结果和异常情况。handle 方法接收一个 BiFunction,第一个参数是正常结果,第二个参数是异常。例如:

CompletableFuture<Integer> futureHandleExample = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Simulated exception");
});

CompletableFuture<Integer> resultHandleExample = futureHandleExample.handle((result, ex) -> {
    if (ex != null) {
        System.out.println("Caught exception: " + ex.getMessage());
        return -1;
    } else {
        return result * 2;
    }
});

try {
    System.out.println(resultHandleExample.get());
} catch (Exception e) {
    e.printStackTrace();
}

在上述代码中,handle 方法根据是否有异常来返回不同的结果。如果有异常,返回 -1;如果没有异常,返回结果的两倍。

并行执行与 thenCompose

有时候,我们可能希望在链式异步操作中,某些部分能够并行执行,以提高整体的执行效率。虽然 thenCompose 本身主要用于链式连接异步操作,但结合 CompletableFuture 的其他方法,可以实现并行与链式操作的结合。

例如,假设我们有两个独立的异步任务A和任务B,它们可以并行执行,然后任务C依赖于任务A和任务B的结果。我们可以这样实现:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class ParallelService {
    // 模拟异步任务A
    public static CompletableFuture<String> taskA() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of task A";
        });
    }

    // 模拟异步任务B
    public static CompletableFuture<String> taskB() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of task B";
        });
    }

    // 模拟依赖于任务A和任务B结果的任务C
    public static CompletableFuture<String> taskC(String resultA, String resultB) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return resultA + " and " + resultB + " combined";
        });
    }
}

public class ParallelThenComposeExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> resultA = ParallelService.taskA();
        CompletableFuture<String> resultB = ParallelService.taskB();

        CompletableFuture<String> combinedResult = CompletableFuture.allOf(resultA, resultB)
               .thenApply(v -> {
                    try {
                        return taskC(resultA.get(), resultB.get()).get();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });

        System.out.println(combinedResult.get());
    }
}

在上述代码中,taskAtaskB 并行执行。CompletableFuture.allOf(resultA, resultB) 等待 resultAresultB 都完成。然后,thenApply 方法获取 taskAtaskB 的结果,并调用 taskC 方法,taskC 方法返回一个新的 CompletableFuture,最终获取整个操作的结果。

总结 thenCompose 的优势

  1. 解耦异步任务thenCompose 使得我们可以将复杂的异步业务逻辑拆分成多个独立的异步任务,每个任务专注于自己的功能,提高代码的可维护性和可读性。例如在电商系统的例子中,库存查询、订单状态更新和用户通知这几个任务被清晰地分开,通过 thenCompose 连接起来。
  2. 链式调用:它提供了一种优雅的链式调用方式,避免了传统 Future 中手动获取结果再启动下一个任务的繁琐操作。使得异步代码看起来更像是同步代码,降低了编写和理解异步代码的难度。
  3. 异常处理方便:结合 CompletableFuture 的异常处理方法,如 exceptionallyhandle,可以方便地在链式异步操作中处理异常,确保整个异步流程的健壮性。

通过深入理解和灵活运用 CompletableFuturethenCompose 方法,我们可以更高效地编写异步代码,提升应用程序的性能和响应性,尤其是在处理复杂的异步业务逻辑时。无论是简单的链式操作还是涉及并行执行和异常处理的复杂场景,thenCompose 都为我们提供了强大的工具。