MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 多个任务 AnyOf 关系

2023-06-032.2k 阅读

Java 中 CompletableFuture 的 AnyOf 关系详解

CompletableFuture 简介

在 Java 8 引入 CompletableFuture 之前,处理异步任务相对复杂,往往需要依赖诸如 Future 接口以及手动线程管理等方式。Future 接口提供了一种异步计算结果获取的方式,但它存在局限性,例如无法方便地对异步结果进行链式操作、组合多个异步任务等。

CompletableFuture 实现了 Future 接口,并在此基础上提供了丰富的方法来支持异步任务的链式调用、组合、异常处理等操作。它基于 Fork/Join 框架和异步回调机制,使得编写异步代码变得更加简洁和高效。CompletableFuture 可以在任务完成时自动触发后续操作,无需开发者手动轮询 Future 的状态。

AnyOf 关系的概念

CompletableFuture 的 anyOf 方法用于创建一个新的 CompletableFuture,这个新的 CompletableFuture 会在多个给定的 CompletableFuture 中 任意一个 完成时就完成,其结果为第一个完成的 CompletableFuture 的结果。如果任意一个任务因异常而完成,那么 anyOf 返回的 CompletableFuture 也会以该异常完成。

方法签名

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

该方法接受一个可变参数,参数类型为 CompletableFuture<?>,返回一个新的 CompletableFuture,其泛型类型为 Object,因为它的结果可能是传入的任意一个 CompletableFuture 的结果类型。

简单示例代码

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAnyOfExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 1 completed";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2 completed";
        });

        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);

        System.out.println(anyOfFuture.get());
    }
}

在上述代码中,future1future2 是两个异步任务,分别模拟了耗时 2 秒和 1 秒的操作。CompletableFuture.anyOf(future1, future2) 创建了一个新的 CompletableFuture anyOfFutureanyOfFuture.get() 会阻塞直到 future1future2 任意一个完成。由于 future2 耗时更短,所以通常会输出 "Future 2 completed"

异常处理

当参与 anyOf 的 CompletableFuture 中有一个抛出异常时,anyOf 返回的 CompletableFuture 也会以该异常完成。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAnyOfExceptionExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Future 1 exception");
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2 completed";
        });

        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);

        anyOfFuture.exceptionally(ex -> {
            System.out.println("Caught exception: " + ex.getMessage());
            return null;
        }).thenAccept(result -> {
            if (result != null) {
                System.out.println("Result: " + result);
            }
        });
    }
}

在这个例子中,future1 会抛出一个运行时异常。anyOfFuture 会以 future1 的异常完成,通过 exceptionally 方法可以捕获并处理这个异常。

与其他 CompletableFuture 方法组合使用

  1. 与 thenApply 组合 可以在 anyOf 返回的 CompletableFuture 上使用 thenApply 方法对结果进行进一步处理。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAnyOfThenApplyExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 1 completed";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2 completed";
        });

        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);

        CompletableFuture<String> processedFuture = anyOfFuture.thenApply(result -> {
            return "Processed: " + result;
        });

        System.out.println(processedFuture.get());
    }
}

这里,thenApply 方法对 anyOfFuture 的结果进行了处理,在结果前添加了 "Processed: "

  1. 与 thenAccept 组合 thenAccept 方法用于在 CompletableFuture 完成时接受结果并执行副作用操作。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAnyOfThenAcceptExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 1 completed";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2 completed";
        });

        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);

        anyOfFuture.thenAccept(result -> {
            System.out.println("Received result: " + result);
        });

        // 为了确保主线程不会提前退出
        Thread.sleep(3000);
    }
}

在这个示例中,thenAccept 方法在 anyOfFuture 完成时打印出结果。由于主线程可能会提前退出,这里使用 Thread.sleep 确保主线程等待异步任务完成。

AnyOf 关系的原理

从实现层面来看,anyOf 方法会为每个传入的 CompletableFuture 注册一个完成监听器。当任意一个 CompletableFuture 完成时,其完成监听器会被触发。这个监听器会尝试完成 anyOf 返回的 CompletableFuture,并将完成的 CompletableFuture 的结果或异常传递给它。

在 CompletableFuture 的内部实现中,使用了基于状态的机制。每个 CompletableFuture 有一个状态字段,用于表示其当前状态,如未开始、进行中、正常完成、异常完成等。当一个 CompletableFuture 完成时,它会更新自己的状态,并触发所有注册的监听器。anyOf 方法注册的监听器会检查 anyOf 返回的 CompletableFuture 的状态,如果它还未完成,就会根据完成的 CompletableFuture 的结果或异常来完成 anyOf 返回的 CompletableFuture。

在实际应用场景中的使用

  1. 资源获取 假设一个应用程序需要从多个数据源获取数据,但只要获取到其中一个数据源的数据就可以继续后续操作。例如,应用程序可以同时从本地缓存和远程服务器获取用户数据,只要其中一个数据源返回数据,就可以使用该数据进行处理。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAnyOfResourceExample {
    public static CompletableFuture<String> getFromLocalCache() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从本地缓存获取数据
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Data from local cache";
        });
    }

    public static CompletableFuture<String> getFromRemoteServer() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从远程服务器获取数据
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Data from remote server";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(getFromLocalCache(), getFromRemoteServer());

        System.out.println(anyOfFuture.get());
    }
}

在这个例子中,getFromLocalCachegetFromRemoteServer 分别模拟从本地缓存和远程服务器获取数据。anyOf 方法确保只要有一个数据源返回数据,就可以继续后续操作。

  1. 服务调用优化 在微服务架构中,可能存在多个服务提供相似的功能。可以同时调用这些服务,只要其中一个服务成功响应,就可以使用其结果,从而提高系统的响应速度。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAnyOfMicroserviceExample {
    public static CompletableFuture<String> callService1() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟服务1调用
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Response from service 1";
        });
    }

    public static CompletableFuture<String> callService2() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟服务2调用
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Response from service 2";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(callService1(), callService2());

        System.out.println(anyOfFuture.get());
    }
}

这里,callService1callService2 模拟两个提供相似功能的微服务调用。anyOf 方法使得只要有一个服务响应,就可以获取结果并继续处理。

性能考虑

虽然 anyOf 提供了一种高效的方式来组合多个异步任务,但在使用时也需要考虑性能问题。由于它会同时启动所有传入的 CompletableFuture,可能会消耗较多的系统资源,尤其是当任务数量较多或任务本身比较耗时的情况下。

为了优化性能,可以根据具体场景对任务进行合理的筛选和分组。例如,如果知道某些任务通常会更快完成,可以将它们优先放在 anyOf 的参数列表中。另外,对于资源消耗较大的任务,可以考虑使用线程池来控制并发度,避免系统资源过度消耗。

与 AllOf 关系的对比

  1. 完成条件
    • anyOf:只要其中一个 CompletableFuture 完成,返回的 CompletableFuture 就完成。
    • allOf:所有传入的 CompletableFuture 都完成,返回的 CompletableFuture 才完成。如果其中任何一个 CompletableFuture 抛出异常,allOf 返回的 CompletableFuture 也会以该异常完成。
  2. 返回结果
    • anyOf:返回第一个完成的 CompletableFuture 的结果。
    • allOf:返回的 CompletableFuture 的结果为 null,主要用于等待所有任务完成,通常用于执行一些所有任务完成后才需要的操作,如汇总所有任务的结果等。

总结与注意事项

CompletableFuture 的 anyOf 方法为处理多个异步任务的“或”关系提供了便捷的方式。它允许我们在多个异步任务中只要有一个完成就可以继续后续操作,适用于多种实际应用场景,如资源获取、服务调用优化等。

在使用 anyOf 时,需要注意异常处理,确保能够正确捕获和处理任务中抛出的异常。同时,要考虑性能问题,合理安排任务和控制并发度,避免过度消耗系统资源。与 allOf 等其他 CompletableFuture 方法相比,要清楚它们之间的区别,根据具体需求选择合适的方法来组合异步任务。

通过深入理解和合理使用 CompletableFuture 的 anyOf 关系,开发者可以编写出更加高效、灵活的异步代码,提升应用程序的性能和响应能力。

希望通过本文的详细介绍和示例代码,你对 Java 中 CompletableFuture 的 AnyOf 关系有了更深入的理解和掌握。在实际开发中,根据具体需求灵活运用这一特性,能够显著提升代码的异步处理能力。