MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 自定义线程池 supplyAsync 方法

2022-07-032.9k 阅读

Java 中 CompletableFuture 自定义线程池 supplyAsync 方法

在 Java 的并发编程领域,CompletableFuture 是一个强大的工具,它提供了异步编程的能力,使我们能够更轻松地处理异步任务。supplyAsync 方法是 CompletableFuture 中用于异步执行有返回值任务的重要方法之一。而当涉及到自定义线程池时,supplyAsync 方法能与自定义线程池相结合,为我们提供更灵活和高效的异步任务执行方式。

CompletableFuture 基础回顾

CompletableFuture 是 Java 8 引入的类,它实现了 FutureCompletionStage 接口。Future 接口在 Java 5 时就已出现,主要用于获取异步任务的结果,但它有一些局限性,例如获取结果时需要阻塞线程,缺乏对异步任务链的支持等。而 CompletableFuture 弥补了这些不足,它允许我们以链式调用的方式处理异步任务的结果,并且可以在任务完成时自动触发后续操作。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

future.thenAccept(System.out::println);

在上述代码中,supplyAsync 方法接受一个 Supplier 作为参数,Supplier 是一个函数式接口,它没有参数但会返回一个结果。这里 supplyAsync 方法会异步执行 Supplier 中的任务,并返回一个 CompletableFuture 对象。thenAccept 方法是 CompletableFuture 的一个后续操作,当异步任务完成时,它会接受任务的结果并执行相应的消费操作,这里就是将结果打印出来。

supplyAsync 方法重载形式

CompletableFuture 类提供了两个重载版本的 supplyAsync 方法。

  1. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 这个版本的 supplyAsync 方法使用 ForkJoinPool.commonPool() 作为默认的线程池来执行异步任务。ForkJoinPool.commonPool() 是一个公共的 ForkJoinPool 实例,它会使用 Runtime.getRuntime().availableProcessors() - 1 个线程(除非有特殊配置)。如果应用程序中有大量的异步任务,并且任务执行时间较长,可能会导致 commonPool 中的线程被长时间占用,影响其他任务的执行。

  2. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 这个版本允许我们传入一个自定义的 Executor 来执行异步任务。Executor 是一个接口,它定义了任务执行的策略。我们可以通过实现这个接口或者使用 Java 提供的一些 Executor 实现类,如 ThreadPoolExecutor 来创建自定义线程池。通过使用自定义线程池,我们可以根据应用程序的需求,灵活地配置线程池的参数,如线程数量、任务队列大小等,从而更好地控制异步任务的执行。

自定义线程池的重要性

在实际应用中,使用默认的 ForkJoinPool.commonPool() 可能无法满足所有场景的需求。以下是一些使用自定义线程池的好处:

  1. 资源控制:自定义线程池可以根据应用程序的负载和硬件资源,精确地控制线程的数量。例如,如果应用程序在一个单核 CPU 的服务器上运行,过多的线程可能会导致上下文切换频繁,降低性能。通过自定义线程池,我们可以将线程数量限制在一个合理的范围内,提高系统的整体性能。

  2. 任务隔离:不同类型的任务可能有不同的优先级和资源需求。通过使用自定义线程池,我们可以将不同类型的任务分配到不同的线程池中执行,实现任务的隔离。这样可以避免高优先级任务被低优先级任务阻塞,提高系统的响应速度。

  3. 优化性能:对于一些特定类型的任务,如 I/O 密集型任务或 CPU 密集型任务,我们可以根据任务的特性来调整线程池的参数,以达到最佳的性能。例如,对于 I/O 密集型任务,我们可以适当增加线程数量,因为线程在等待 I/O 操作完成时可以被其他任务复用。

创建自定义线程池

在 Java 中,我们可以通过 ThreadPoolExecutor 类来创建自定义线程池。ThreadPoolExecutor 提供了丰富的构造函数,允许我们设置线程池的各种参数。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个有界队列
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10);

        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,      // 核心线程数
                4,      // 最大线程数
                10,     // 线程存活时间
                TimeUnit.SECONDS,
                taskQueue,
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 提交任务到线程池
        for (int i = 0; i < 15; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " has completed.");
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,我们创建了一个 ThreadPoolExecutor 实例。构造函数的参数含义如下:

  • 核心线程数(corePoolSize):线程池中始终保持活动的线程数量。即使这些线程处于空闲状态,它们也不会被销毁,除非设置了 allowCoreThreadTimeOuttrue
  • 最大线程数(maximumPoolSize):线程池允许创建的最大线程数量。当任务队列已满且活动线程数小于最大线程数时,线程池会创建新的线程来执行任务。
  • 线程存活时间(keepAliveTime):当线程池中的线程数量超过核心线程数时,多余的空闲线程在被销毁之前等待新任务的最长时间。
  • 时间单位(unit)keepAliveTime 的时间单位。
  • 任务队列(workQueue):用于存储等待执行的任务的队列。这里我们使用了 LinkedBlockingQueue,它是一个有界队列,容量为 10。
  • 拒绝策略(handler):当任务队列已满且线程池中的线程数量达到最大线程数时,新提交的任务将被拒绝。这里我们使用了 CallerRunsPolicy,它会让提交任务的线程来执行该任务,这样可以避免任务被丢弃,但可能会影响提交任务的线程的性能。

使用自定义线程池与 supplyAsync 方法结合

接下来,我们将自定义线程池与 CompletableFuturesupplyAsync 方法结合使用。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CompletableFutureWithCustomThreadPool {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建一个有界队列
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10);

        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,      // 核心线程数
                4,      // 最大线程数
                10,     // 线程存活时间
                TimeUnit.SECONDS,
                taskQueue,
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时任务
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task completed in custom thread pool.";
        }, executor);

        // 获取异步任务的结果
        String result = future.get();
        System.out.println(result);

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,我们将自定义的 ThreadPoolExecutor 实例作为参数传递给 supplyAsync 方法。这样,supplyAsync 方法中的异步任务将在我们自定义的线程池中执行。通过 future.get() 方法,我们可以获取异步任务的执行结果,这里会阻塞当前线程,直到异步任务完成。

supplyAsync 方法执行流程

当我们调用 CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) 方法时,其执行流程如下:

  1. 任务提交supplyAsync 方法将 Supplier 任务提交给指定的 Executor。如果 Executor 是一个线程池,任务会被添加到任务队列中或者直接分配给一个线程执行,具体取决于线程池的状态和当前活动线程数。

  2. 任务执行:线程池中的线程从任务队列中取出任务并执行 Supplier 中的逻辑。在执行过程中,如果任务需要等待某些资源(如 I/O 操作),线程可能会被阻塞,直到资源可用。

  3. 任务完成:当 Supplier 任务执行完毕,会返回一个结果。CompletableFuture 会将这个结果保存起来,并标记任务为完成状态。

  4. 后续操作:如果 CompletableFuture 有后续的 thenApplythenAccept 等操作,这些操作会在任务完成后被触发。CompletableFuture 使用一种基于事件驱动的机制来处理后续操作,避免了传统 Future 需要阻塞线程获取结果的问题。

异常处理

在异步任务执行过程中,可能会出现异常。CompletableFuture 提供了多种方式来处理异常。

  1. exceptionally 方法
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("Simulated exception");
    }
    return "Task completed successfully.";
}, executor).exceptionally(ex -> {
    System.out.println("Caught exception: " + ex.getMessage());
    return "Default value";
});

String result = future.get();
System.out.println(result);

在上述代码中,exceptionally 方法接受一个 Function 作为参数,当异步任务抛出异常时,这个 Function 会被执行,它会处理异常并返回一个默认值。

  1. whenComplete 方法
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("Simulated exception");
    }
    return "Task completed successfully.";
}, executor).whenComplete((res, ex) -> {
    if (ex != null) {
        System.out.println("Caught exception: " + ex.getMessage());
    } else {
        System.out.println("Result: " + res);
    }
});

whenComplete 方法接受一个 BiConsumer 作为参数,当异步任务完成(无论成功还是失败)时,这个 BiConsumer 会被执行。它的第一个参数是任务的结果(如果任务成功),第二个参数是任务抛出的异常(如果任务失败)。

注意事项

  1. 线程池资源管理:使用自定义线程池时,需要注意合理管理线程池的资源。如果线程池创建过多的线程,可能会导致系统资源耗尽,出现 OutOfMemoryError 等问题。同时,要及时关闭线程池,避免资源泄漏。
  2. 任务依赖和顺序:在使用 CompletableFuture 的链式操作时,要注意任务之间的依赖关系和执行顺序。确保后续操作依赖的任务已经完成,避免出现空指针异常等问题。
  3. 异常处理的完整性:在处理异步任务的异常时,要确保异常处理的完整性。不仅要处理任务执行过程中抛出的异常,还要处理在获取结果或执行后续操作时可能出现的异常。

应用场景

  1. Web 应用中的异步请求处理:在 Web 应用中,处理一些耗时的请求(如数据库查询、文件上传等)时,可以使用 CompletableFuture 结合自定义线程池来异步处理这些请求,提高应用的响应速度。
  2. 大数据处理:在大数据处理场景中,可能需要并行处理大量的数据。通过自定义线程池和 CompletableFuture,可以将数据处理任务分配到多个线程中并行执行,提高处理效率。
  3. 分布式系统中的异步调用:在分布式系统中,不同服务之间的调用可能会有一定的延迟。使用 CompletableFuture 可以异步发起调用,并在调用完成时进行后续处理,避免线程阻塞,提高系统的并发性能。

通过深入理解 CompletableFuturesupplyAsync 方法与自定义线程池的结合使用,我们可以在 Java 并发编程中更加灵活和高效地处理异步任务,提升应用程序的性能和响应能力。无论是小型应用还是大型分布式系统,这种技术都有着广泛的应用场景和重要的价值。在实际应用中,需要根据具体的业务需求和系统资源情况,合理地配置和使用自定义线程池,以达到最佳的效果。同时,要注意处理好异步任务中的异常和资源管理问题,确保系统的稳定性和可靠性。