Java 中 CompletableFuture 自定义线程池 supplyAsync 方法
Java 中 CompletableFuture 自定义线程池 supplyAsync 方法
在 Java 的并发编程领域,CompletableFuture
是一个强大的工具,它提供了异步编程的能力,使我们能够更轻松地处理异步任务。supplyAsync
方法是 CompletableFuture
中用于异步执行有返回值任务的重要方法之一。而当涉及到自定义线程池时,supplyAsync
方法能与自定义线程池相结合,为我们提供更灵活和高效的异步任务执行方式。
CompletableFuture
基础回顾
CompletableFuture
是 Java 8 引入的类,它实现了 Future
和 CompletionStage
接口。Future
接口在 Java 5 时就已出现,主要用于获取异步任务的结果,但它有一些局限性,例如获取结果时需要阻塞线程,缺乏对异步任务链的支持等。而 CompletableFuture
弥补了这些不足,它允许我们以链式调用的方式处理异步任务的结果,并且可以在任务完成时自动触发后续操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时任务
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, CompletableFuture!";
});
future.thenAccept(System.out::println);
在上述代码中,supplyAsync
方法接受一个 Supplier
作为参数,Supplier
是一个函数式接口,它没有参数但会返回一个结果。这里 supplyAsync
方法会异步执行 Supplier
中的任务,并返回一个 CompletableFuture
对象。thenAccept
方法是 CompletableFuture
的一个后续操作,当异步任务完成时,它会接受任务的结果并执行相应的消费操作,这里就是将结果打印出来。
supplyAsync
方法重载形式
CompletableFuture
类提供了两个重载版本的 supplyAsync
方法。
-
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
这个版本的supplyAsync
方法使用ForkJoinPool.commonPool()
作为默认的线程池来执行异步任务。ForkJoinPool.commonPool()
是一个公共的ForkJoinPool
实例,它会使用Runtime.getRuntime().availableProcessors()
- 1 个线程(除非有特殊配置)。如果应用程序中有大量的异步任务,并且任务执行时间较长,可能会导致commonPool
中的线程被长时间占用,影响其他任务的执行。 -
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
这个版本允许我们传入一个自定义的Executor
来执行异步任务。Executor
是一个接口,它定义了任务执行的策略。我们可以通过实现这个接口或者使用 Java 提供的一些Executor
实现类,如ThreadPoolExecutor
来创建自定义线程池。通过使用自定义线程池,我们可以根据应用程序的需求,灵活地配置线程池的参数,如线程数量、任务队列大小等,从而更好地控制异步任务的执行。
自定义线程池的重要性
在实际应用中,使用默认的 ForkJoinPool.commonPool()
可能无法满足所有场景的需求。以下是一些使用自定义线程池的好处:
-
资源控制:自定义线程池可以根据应用程序的负载和硬件资源,精确地控制线程的数量。例如,如果应用程序在一个单核 CPU 的服务器上运行,过多的线程可能会导致上下文切换频繁,降低性能。通过自定义线程池,我们可以将线程数量限制在一个合理的范围内,提高系统的整体性能。
-
任务隔离:不同类型的任务可能有不同的优先级和资源需求。通过使用自定义线程池,我们可以将不同类型的任务分配到不同的线程池中执行,实现任务的隔离。这样可以避免高优先级任务被低优先级任务阻塞,提高系统的响应速度。
-
优化性能:对于一些特定类型的任务,如 I/O 密集型任务或 CPU 密集型任务,我们可以根据任务的特性来调整线程池的参数,以达到最佳的性能。例如,对于 I/O 密集型任务,我们可以适当增加线程数量,因为线程在等待 I/O 操作完成时可以被其他任务复用。
创建自定义线程池
在 Java 中,我们可以通过 ThreadPoolExecutor
类来创建自定义线程池。ThreadPoolExecutor
提供了丰富的构造函数,允许我们设置线程池的各种参数。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPoolExample {
public static void main(String[] args) {
// 创建一个有界队列
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10);
// 创建自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
10, // 线程存活时间
TimeUnit.SECONDS,
taskQueue,
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 提交任务到线程池
for (int i = 0; i < 15; i++) {
int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " has completed.");
});
}
// 关闭线程池
executor.shutdown();
}
}
在上述代码中,我们创建了一个 ThreadPoolExecutor
实例。构造函数的参数含义如下:
- 核心线程数(corePoolSize):线程池中始终保持活动的线程数量。即使这些线程处于空闲状态,它们也不会被销毁,除非设置了
allowCoreThreadTimeOut
为true
。 - 最大线程数(maximumPoolSize):线程池允许创建的最大线程数量。当任务队列已满且活动线程数小于最大线程数时,线程池会创建新的线程来执行任务。
- 线程存活时间(keepAliveTime):当线程池中的线程数量超过核心线程数时,多余的空闲线程在被销毁之前等待新任务的最长时间。
- 时间单位(unit):
keepAliveTime
的时间单位。 - 任务队列(workQueue):用于存储等待执行的任务的队列。这里我们使用了
LinkedBlockingQueue
,它是一个有界队列,容量为 10。 - 拒绝策略(handler):当任务队列已满且线程池中的线程数量达到最大线程数时,新提交的任务将被拒绝。这里我们使用了
CallerRunsPolicy
,它会让提交任务的线程来执行该任务,这样可以避免任务被丢弃,但可能会影响提交任务的线程的性能。
使用自定义线程池与 supplyAsync
方法结合
接下来,我们将自定义线程池与 CompletableFuture
的 supplyAsync
方法结合使用。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CompletableFutureWithCustomThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个有界队列
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10);
// 创建自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
10, // 线程存活时间
TimeUnit.SECONDS,
taskQueue,
new ThreadPoolExecutor.CallerRunsPolicy()
);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时任务
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task completed in custom thread pool.";
}, executor);
// 获取异步任务的结果
String result = future.get();
System.out.println(result);
// 关闭线程池
executor.shutdown();
}
}
在上述代码中,我们将自定义的 ThreadPoolExecutor
实例作为参数传递给 supplyAsync
方法。这样,supplyAsync
方法中的异步任务将在我们自定义的线程池中执行。通过 future.get()
方法,我们可以获取异步任务的执行结果,这里会阻塞当前线程,直到异步任务完成。
supplyAsync
方法执行流程
当我们调用 CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
方法时,其执行流程如下:
-
任务提交:
supplyAsync
方法将Supplier
任务提交给指定的Executor
。如果Executor
是一个线程池,任务会被添加到任务队列中或者直接分配给一个线程执行,具体取决于线程池的状态和当前活动线程数。 -
任务执行:线程池中的线程从任务队列中取出任务并执行
Supplier
中的逻辑。在执行过程中,如果任务需要等待某些资源(如 I/O 操作),线程可能会被阻塞,直到资源可用。 -
任务完成:当
Supplier
任务执行完毕,会返回一个结果。CompletableFuture
会将这个结果保存起来,并标记任务为完成状态。 -
后续操作:如果
CompletableFuture
有后续的thenApply
、thenAccept
等操作,这些操作会在任务完成后被触发。CompletableFuture
使用一种基于事件驱动的机制来处理后续操作,避免了传统Future
需要阻塞线程获取结果的问题。
异常处理
在异步任务执行过程中,可能会出现异常。CompletableFuture
提供了多种方式来处理异常。
exceptionally
方法:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Simulated exception");
}
return "Task completed successfully.";
}, executor).exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return "Default value";
});
String result = future.get();
System.out.println(result);
在上述代码中,exceptionally
方法接受一个 Function
作为参数,当异步任务抛出异常时,这个 Function
会被执行,它会处理异常并返回一个默认值。
whenComplete
方法:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Simulated exception");
}
return "Task completed successfully.";
}, executor).whenComplete((res, ex) -> {
if (ex != null) {
System.out.println("Caught exception: " + ex.getMessage());
} else {
System.out.println("Result: " + res);
}
});
whenComplete
方法接受一个 BiConsumer
作为参数,当异步任务完成(无论成功还是失败)时,这个 BiConsumer
会被执行。它的第一个参数是任务的结果(如果任务成功),第二个参数是任务抛出的异常(如果任务失败)。
注意事项
- 线程池资源管理:使用自定义线程池时,需要注意合理管理线程池的资源。如果线程池创建过多的线程,可能会导致系统资源耗尽,出现
OutOfMemoryError
等问题。同时,要及时关闭线程池,避免资源泄漏。 - 任务依赖和顺序:在使用
CompletableFuture
的链式操作时,要注意任务之间的依赖关系和执行顺序。确保后续操作依赖的任务已经完成,避免出现空指针异常等问题。 - 异常处理的完整性:在处理异步任务的异常时,要确保异常处理的完整性。不仅要处理任务执行过程中抛出的异常,还要处理在获取结果或执行后续操作时可能出现的异常。
应用场景
- Web 应用中的异步请求处理:在 Web 应用中,处理一些耗时的请求(如数据库查询、文件上传等)时,可以使用
CompletableFuture
结合自定义线程池来异步处理这些请求,提高应用的响应速度。 - 大数据处理:在大数据处理场景中,可能需要并行处理大量的数据。通过自定义线程池和
CompletableFuture
,可以将数据处理任务分配到多个线程中并行执行,提高处理效率。 - 分布式系统中的异步调用:在分布式系统中,不同服务之间的调用可能会有一定的延迟。使用
CompletableFuture
可以异步发起调用,并在调用完成时进行后续处理,避免线程阻塞,提高系统的并发性能。
通过深入理解 CompletableFuture
的 supplyAsync
方法与自定义线程池的结合使用,我们可以在 Java 并发编程中更加灵活和高效地处理异步任务,提升应用程序的性能和响应能力。无论是小型应用还是大型分布式系统,这种技术都有着广泛的应用场景和重要的价值。在实际应用中,需要根据具体的业务需求和系统资源情况,合理地配置和使用自定义线程池,以达到最佳的效果。同时,要注意处理好异步任务中的异常和资源管理问题,确保系统的稳定性和可靠性。