MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 多个任务 OR 组合关系

2023-03-026.6k 阅读

CompletableFuture 简介

在 Java 8 引入 CompletableFuture 之前,处理异步任务相对繁琐。CompletableFuture 提供了一种更简洁、灵活的方式来处理异步计算,它实现了 FutureCompletionStage 接口。Future 接口主要用于获取异步任务的结果,但它缺乏对异步任务完成时的回调处理等功能。而 CompletionStage 接口则弥补了这一不足,提供了诸如任务完成时的回调、任务组合等丰富的操作。

CompletableFuture 允许我们以链式调用的方式编写异步代码,使得异步操作的逻辑更加清晰。例如,我们可以很方便地在一个异步任务完成后接着执行另一个任务,而无需手动管理线程和复杂的同步机制。

多个任务的 OR 组合关系概述

在实际开发中,我们经常会遇到这样的场景:多个异步任务,只要其中一个任务成功完成,整个组合任务就视为成功;只有当所有任务都失败时,组合任务才被认为失败。这种关系就是多个任务的 OR 组合关系。

CompletableFuture 中,有几种方式来实现这种 OR 组合关系,接下来我们将详细探讨。

使用 applyToEither 方法

applyToEither 方法用于当两个 CompletableFuture 中任意一个完成时,对其结果应用给定的函数。其方法签名如下:

<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,? extends U> fn);

这里 other 是另一个 CompletableFuturefn 是用于处理完成的 CompletableFuture 结果的函数。

下面通过一个简单的代码示例来展示 applyToEither 的用法:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class ApplyToEitherExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 2";
        });

        CompletableFuture<String> combinedFuture = future1.applyToEither(future2, result -> {
            System.out.println("The first completed future result is: " + result);
            return result.toUpperCase();
        });

        System.out.println(combinedFuture.get());
    }
}

在上述代码中,future1future2 是两个异步任务,分别模拟了耗时 2 秒和 1 秒的操作。applyToEither 方法会等待 future1future2 其中一个先完成,然后对完成的结果应用 Function 函数。在这个例子中,future2 会先完成,所以输出结果为:

The first completed future result is: Result from Future 2
RESULT FROM FUTURE 2

使用 acceptEither 方法

acceptEither 方法与 applyToEither 类似,不同之处在于它不返回处理后的结果,而是直接消费完成的 CompletableFuture 的结果。其方法签名如下:

CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> action);

这里 action 是一个 Consumer,用于消费完成的 CompletableFuture 的结果。

以下是代码示例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class AcceptEitherExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 2";
        });

        CompletableFuture<Void> combinedFuture = future1.acceptEither(future2, result -> {
            System.out.println("The first completed future result is: " + result);
        });

        combinedFuture.get();
    }
}

在这个示例中,acceptEither 方法等待 future1future2 其中一个完成,然后将完成的结果传递给 Consumer。由于 future2 先完成,输出结果为:

The first completed future result is: Result from Future 2

使用 runAfterEither 方法

runAfterEither 方法在两个 CompletableFuture 中任意一个完成时,执行一个无参数的 Runnable 任务。其方法签名如下:

CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action);

下面是代码示例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class RunAfterEitherExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 2";
        });

        CompletableFuture<Void> combinedFuture = future1.runAfterEither(future2, () -> {
            System.out.println("One of the futures has completed.");
        });

        combinedFuture.get();
    }
}

在这个例子中,当 future1future2 任意一个完成时,就会执行 Runnable 任务,输出结果为:

One of the futures has completed.

处理多个 CompletableFuture 的 OR 关系

前面介绍的方法主要是针对两个 CompletableFuture 的情况。当我们需要处理多个 CompletableFuture 的 OR 关系时,可以通过递归或借助 Stream 来实现。

递归方式

通过递归的方式,我们可以将多个 CompletableFuture 逐步组合成 OR 关系。以下是一个简单的实现示例:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class MultipleFuturesOrRecursiveExample {
    public static CompletableFuture<String> orCombine(List<CompletableFuture<String>> futures) {
        if (futures.size() == 1) {
            return futures.get(0);
        }
        CompletableFuture<String> future1 = futures.get(0);
        CompletableFuture<String> future2 = orCombine(futures.subList(1, futures.size()));
        return future1.applyToEither(future2, result -> result);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<CompletableFuture<String>> futures = new ArrayList<>();
        futures.add(CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 1";
        }));
        futures.add(CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 2";
        }));
        futures.add(CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 3";
        }));

        CompletableFuture<String> combinedFuture = orCombine(futures);
        System.out.println(combinedFuture.get());
    }
}

在上述代码中,orCombine 方法通过递归将多个 CompletableFuture 组合成 OR 关系。在 main 方法中,创建了三个异步任务,分别模拟不同的耗时操作。最终输出的是最先完成的任务的结果,在这个例子中,future3 会最先完成,输出结果为:

Result from Future 3

使用 Stream 方式

借助 Java 8 的 Stream API,我们可以更简洁地处理多个 CompletableFuture 的 OR 关系。以下是代码示例:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class MultipleFuturesOrStreamExample {
    public static CompletableFuture<String> orCombine(List<CompletableFuture<String>> futures) {
        return futures.stream()
               .reduce(CompletableFuture::applyToEither)
               .orElse(CompletableFuture.completedFuture(null));
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<CompletableFuture<String>> futures = new ArrayList<>();
        futures.add(CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 1";
        }));
        futures.add(CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 2";
        }));
        futures.add(CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future 3";
        }));

        CompletableFuture<String> combinedFuture = orCombine(futures);
        System.out.println(combinedFuture.get());
    }
}

在这个示例中,Streamreduce 方法将多个 CompletableFuture 通过 applyToEither 方法进行组合。最终输出的也是最先完成的任务的结果,同样,future3 最先完成,输出结果为:

Result from Future 3

异常处理

在处理多个任务的 OR 组合关系时,异常处理是非常重要的。当所有任务都失败时,我们需要捕获并处理异常。

CompletableFuture 中,我们可以使用 exceptionally 方法来处理异常。例如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class ExceptionHandlingExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Future 1 failed");
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Future 2 failed");
        });

        CompletableFuture<String> combinedFuture = future1.applyToEither(future2, result -> result)
               .exceptionally(ex -> {
                    System.out.println("An exception occurred: " + ex.getMessage());
                    return "Default result";
                });

        System.out.println(combinedFuture.get());
    }
}

在上述代码中,future1future2 都抛出了异常。通过 exceptionally 方法,我们捕获了异常并返回了一个默认结果,输出结果为:

An exception occurred: Future 1 failed
Default result

实际应用场景

  1. 数据获取与缓存:在从多个数据源获取数据时,如果其中一个数据源成功获取到数据,就可以直接使用该数据,而不需要等待其他数据源。例如,先尝试从缓存中获取数据,如果缓存中没有,则从数据库中获取。只要缓存或数据库中有一个成功返回数据,就可以继续后续操作。
  2. 服务调用:当调用多个微服务获取数据时,只要其中一个微服务成功返回所需数据,就可以满足业务需求。例如,在一个电商应用中,查询商品信息时,可以同时调用库存服务和价格服务,只要其中一个服务成功返回信息,就可以展示给用户相关内容。
  3. 并行计算优化:在进行一些复杂的计算任务时,可以将任务分解为多个子任务并行执行,只要有一个子任务成功计算出满足条件的结果,就可以停止其他子任务,提高计算效率。

性能考虑

在使用 CompletableFuture 进行多个任务的 OR 组合时,性能是一个需要考虑的因素。虽然异步操作可以提高整体的并发性能,但过多的任务并行执行可能会导致资源消耗过大,例如线程池资源耗尽等问题。

  1. 线程池管理:合理配置线程池的大小,根据系统的硬件资源和任务的特性来调整线程池参数。如果任务是 I/O 密集型的,可以适当增加线程池的大小;如果是 CPU 密集型的,则需要谨慎调整,避免过多线程导致 CPU 上下文切换开销增大。
  2. 任务优先级:可以根据任务的重要性或预期执行时间为任务设置优先级。例如,对于一些耗时较短且重要的任务,可以优先执行,以提高整体的响应速度。
  3. 资源监控与调优:使用工具如 Java 自带的 jconsoleVisualVM 等监控系统的资源使用情况,包括 CPU、内存、线程池状态等,根据监控结果进行相应的调优。

总结与注意事项

通过 CompletableFutureapplyToEitheracceptEitherrunAfterEither 等方法,我们可以方便地实现多个任务的 OR 组合关系。在处理多个任务时,可以通过递归或 Stream 方式进行组合。同时,异常处理和性能优化也是不容忽视的方面。

在实际应用中,需要根据具体的业务场景和系统资源情况,合理选择实现方式,以达到最佳的性能和稳定性。并且要注意线程安全问题,避免在异步操作中出现数据竞争等问题。

希望通过本文的介绍,你对 JavaCompletableFuture 多个任务的 OR 组合关系有了更深入的理解和掌握,能够在实际项目中灵活运用这些知识,提升代码的质量和性能。

以上就是关于 JavaCompletableFuture 多个任务 OR 组合关系的详细内容,希望对你有所帮助。如果你还有其他疑问或需要进一步了解的内容,请随时提问。