MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 任务异步回调 thenAcceptAsync 方法

2022-04-215.2k 阅读

CompletableFuture 概述

在Java中,CompletableFuture 是Java 8引入的一个强大工具,用于处理异步计算。它实现了 Future 接口和 CompletionStage 接口,这使得它既可以像传统的 Future 那样获取异步操作的结果,又能以更灵活的方式对异步操作的结果进行处理,包括链式调用、组合多个异步操作等。CompletableFuture 提供了丰富的方法来处理异步任务的各种场景,其中 thenAcceptAsync 方法在处理异步任务的回调时非常有用。

thenAcceptAsync 方法基础

thenAcceptAsync 方法是 CompletableFuture 类中的一个方法,它的作用是在 CompletableFuture 完成(无论是正常完成还是异常完成)时,异步执行一个消费者操作。这个消费者操作会接收 CompletableFuture 的结果作为参数,但不会返回任何值。

thenAcceptAsync 方法有两个重载版本:

  1. public CompletableFuture<Void> thenAcceptAsync(Consumer<? super U> action)
  2. public CompletableFuture<Void> thenAcceptAsync(Consumer<? super U> action, Executor executor)

第一个版本使用默认的 ForkJoinPool.commonPool() 作为执行异步任务的线程池。而第二个版本允许我们传入一个自定义的 Executor,这样就可以在我们指定的线程池中执行异步任务。

代码示例 - 简单使用 thenAcceptAsync

下面通过一个简单的示例来展示 thenAcceptAsync 的基本用法:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        }).thenAcceptAsync(result -> {
            System.out.println("Received result: " + result);
        });

        // 主线程等待一段时间,确保异步任务有机会执行
        Thread.sleep(3000);
    }
}

在这个示例中,我们使用 CompletableFuture.supplyAsync 方法创建了一个异步任务,这个任务会睡眠2秒钟,然后返回一个字符串。接着,我们调用 thenAcceptAsync 方法,当异步任务完成时,它会异步打印出接收到的结果。由于 thenAcceptAsync 默认使用 ForkJoinPool.commonPool(),所以这个打印操作会在一个异步线程中执行。

thenAcceptAsync 与 thenAccept 的区别

thenAccept 方法也用于处理 CompletableFuture 的结果,但它是同步执行的。也就是说,当 CompletableFuture 完成时,thenAccept 中的消费者操作会在当前线程中立即执行,而 thenAcceptAsync 则会在另一个线程中执行。

以下是 thenAccept 的示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureThenAcceptExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        }).thenAccept(result -> {
            System.out.println("Received result: " + result);
        });

        // 主线程等待一段时间,确保异步任务有机会执行
        Thread.sleep(3000);
    }
}

在这个示例中,thenAccept 中的打印操作会在 supplyAsync 任务完成后,在当前线程中立即执行。如果 supplyAsync 任务耗时较长,那么 thenAccept 的执行就会阻塞当前线程。而 thenAcceptAsync 不会阻塞当前线程,因为它在另一个线程中执行。

自定义 Executor 使用 thenAcceptAsync

如前文所述,thenAcceptAsync 方法的第二个重载版本允许我们传入一个自定义的 Executor。这在很多场景下非常有用,比如我们希望使用一个特定大小的线程池来控制并发度。

下面是一个使用自定义线程池的示例:

import java.util.concurrent.*;

public class CompletableFutureCustomExecutorExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        }).thenAcceptAsync(result -> {
            System.out.println("Received result: " + result);
        }, executorService);

        // 关闭线程池
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在这个示例中,我们创建了一个固定大小为3的线程池 executorService。然后,我们将这个线程池作为参数传递给 thenAcceptAsync 方法。这样,当 CompletableFuture 完成时,thenAcceptAsync 中的消费者操作会在我们自定义的线程池中执行。最后,我们需要注意在程序结束时正确关闭线程池,以避免资源泄漏。

thenAcceptAsync 在复杂异步流程中的应用

在实际开发中,我们经常会遇到多个异步任务相互依赖、组合的情况。thenAcceptAsync 在这种复杂的异步流程中可以发挥重要作用。

例如,假设我们有两个异步任务,第一个任务获取用户信息,第二个任务根据用户信息获取用户的订单列表。我们可以这样实现:

import java.util.concurrent.*;

public class ComplexAsyncFlowExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        CompletableFuture<String> getUserInfoFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "User1";
        });

        CompletableFuture<Void> getOrderListFuture = getUserInfoFuture.thenAcceptAsync(userInfo -> {
            CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Fetching order list for user: " + userInfo);
                return "Order1, Order2";
            }).thenAcceptAsync(orderList -> {
                System.out.println("Received order list: " + orderList);
            }, executorService);
        }, executorService);

        getOrderListFuture.join();

        // 关闭线程池
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在这个示例中,getUserInfoFuture 异步获取用户信息。然后,通过 thenAcceptAsync 方法,当获取到用户信息后,又发起了一个异步任务 getOrderListFuture 来获取用户的订单列表。这里展示了如何在复杂的异步流程中使用 thenAcceptAsync 来连接不同的异步任务。

异常处理与 thenAcceptAsync

在异步任务执行过程中,可能会出现异常。CompletableFuture 提供了多种方式来处理异常,与 thenAcceptAsync 相关的异常处理也有一些要点。

CompletableFuture 执行过程中抛出异常时,thenAcceptAsync 中的消费者操作不会被执行。我们可以通过 exceptionally 方法来处理异常。

以下是一个示例:

import java.util.concurrent.*;

public class ExceptionHandlingWithThenAcceptAsyncExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("Simulated exception");
            }
            return "Success result";
        }).thenAcceptAsync(result -> {
            System.out.println("Received result: " + result);
        }, executorService).exceptionally(ex -> {
            System.err.println("Caught exception: " + ex.getMessage());
            return null;
        });

        // 关闭线程池
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在这个示例中,supplyAsync 任务有50% 的概率抛出一个运行时异常。如果异常发生,thenAcceptAsync 中的消费者操作不会执行,而是会执行 exceptionally 方法中的代码块,打印出异常信息。

thenAcceptAsync 与其他 CompletableFuture 方法的组合使用

CompletableFuture 提供了众多方法,thenAcceptAsync 可以与其他方法组合使用,以实现更强大的异步处理逻辑。

例如,thenApplyAsync 方法会在 CompletableFuture 完成时异步执行一个函数,并返回一个新的 CompletableFuture。我们可以将 thenApplyAsyncthenAcceptAsync 组合使用。

import java.util.concurrent.*;

public class MethodCombinationExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        }).thenApplyAsync(result -> {
            return result + ", World!";
        }, executorService).thenAcceptAsync(finalResult -> {
            System.out.println("Final result: " + finalResult);
        }, executorService);

        // 关闭线程池
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在这个示例中,supplyAsync 任务返回一个字符串 Hello。然后,通过 thenApplyAsync 方法,将这个字符串转换为 Hello, World!。最后,thenAcceptAsync 方法打印出最终的结果。这种组合使用可以实现复杂的异步数据处理流程。

thenAcceptAsync 在并发编程中的性能考量

在并发编程中,使用 thenAcceptAsync 时需要考虑性能问题。虽然它提供了异步处理的能力,但如果不合理使用线程池,可能会导致性能瓶颈。

例如,如果使用默认的 ForkJoinPool.commonPool(),当任务量较大时,可能会出现线程饥饿的情况。因为 ForkJoinPool.commonPool() 的线程数量是根据 CPU 核心数动态调整的,在高并发场景下可能无法满足需求。

在这种情况下,我们可以根据实际情况调整线程池的大小。如果任务是 I/O 密集型的,我们可以适当增加线程池的大小,以充分利用系统资源。如果任务是 CPU 密集型的,过多的线程可能会导致上下文切换开销增大,反而降低性能。

另外,在使用自定义线程池时,需要注意线程池的饱和策略。例如,当线程池中的线程都在忙碌,并且任务队列也已满时,新的任务该如何处理。常见的饱和策略有 AbortPolicy(抛出异常)、CallerRunsPolicy(在调用者线程中执行任务)、DiscardPolicy(丢弃任务)和 DiscardOldestPolicy(丢弃队列中最老的任务)。

thenAcceptAsync 在分布式系统中的应用

在分布式系统中,thenAcceptAsync 也有一定的应用场景。例如,在微服务架构中,一个服务可能需要调用多个其他服务来完成一个业务逻辑。

假设我们有一个用户服务,它需要调用订单服务获取用户的订单信息,调用库存服务获取订单商品的库存信息。我们可以使用 CompletableFuturethenAcceptAsync 来实现异步调用,提高系统的响应性能。

import java.util.concurrent.*;

public class DistributedSystemExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        CompletableFuture<String> orderInfoFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟调用订单服务
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Order1, Order2";
        });

        CompletableFuture<String> inventoryInfoFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟调用库存服务
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Inventory for Order1: 10, Inventory for Order2: 5";
        });

        CompletableFuture.allOf(orderInfoFuture, inventoryInfoFuture).thenAcceptAsync(() -> {
            try {
                System.out.println("Order info: " + orderInfoFuture.get());
                System.out.println("Inventory info: " + inventoryInfoFuture.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }, executorService);

        // 关闭线程池
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在这个示例中,我们使用 CompletableFuture.supplyAsync 分别模拟调用订单服务和库存服务。然后,通过 CompletableFuture.allOf 方法等待两个异步任务都完成。最后,使用 thenAcceptAsync 来处理两个服务返回的结果。这样可以在分布式系统中实现异步调用,提高系统的整体性能。

thenAcceptAsync 的最佳实践

  1. 合理选择线程池:根据任务的类型(CPU 密集型或 I/O 密集型)选择合适的线程池。如果使用默认的 ForkJoinPool.commonPool(),要注意其在高并发场景下的局限性。
  2. 异常处理:始终使用 exceptionally 等方法来处理异步任务中的异常,避免异常丢失导致程序出现难以排查的问题。
  3. 链式调用与组合:充分利用 CompletableFuture 的链式调用和方法组合特性,将复杂的异步逻辑分解为多个简单的步骤,提高代码的可读性和可维护性。
  4. 资源管理:在使用自定义线程池时,要正确管理线程池的生命周期,确保在程序结束时关闭线程池,避免资源泄漏。

总结

CompletableFuturethenAcceptAsync 方法是Java异步编程中的一个强大工具,它允许我们在异步任务完成时异步执行消费者操作。通过合理使用 thenAcceptAsync,我们可以实现复杂的异步流程,提高程序的性能和响应性。在使用过程中,我们需要注意线程池的选择、异常处理、方法组合以及资源管理等方面,以确保代码的正确性和高效性。无论是在单机应用还是分布式系统中,thenAcceptAsync 都能在异步处理场景中发挥重要作用。