Java 中 CompletableFuture 自定义线程池饱和策略
一、Java 线程池与饱和策略基础
在深入探讨 CompletableFuture 自定义线程池饱和策略之前,我们先来回顾一下 Java 线程池以及其饱和策略的基础知识。
1.1 Java 线程池概述
Java 的线程池通过 ExecutorService
接口及其实现类(如 ThreadPoolExecutor
)来管理和复用线程。线程池的核心优势在于避免频繁创建和销毁线程带来的开销,提高应用程序的性能和资源利用率。例如,在一个高并发的 Web 应用中,大量的请求需要被处理,如果每次请求都创建一个新线程,系统资源很快就会耗尽,而线程池可以有效地管理这些线程,重复利用已创建的线程来处理任务。
ThreadPoolExecutor
构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize
:核心线程数,线程池在正常情况下保持的线程数量。即使这些线程处于空闲状态,也不会被销毁。maximumPoolSize
:最大线程数,线程池允许创建的最大线程数量。当任务队列已满且活跃线程数小于最大线程数时,线程池会创建新线程来处理任务。keepAliveTime
:线程存活时间,当线程池中的线程数量超过核心线程数时,多余的空闲线程在被销毁之前等待新任务的最长时间。unit
:keepAliveTime
的时间单位。workQueue
:任务队列,用于存储等待执行的任务。常见的任务队列实现有ArrayBlockingQueue
、LinkedBlockingQueue
等。threadFactory
:线程工厂,用于创建新线程。可以通过自定义线程工厂来设置线程的名称、优先级等属性。handler
:饱和策略,当任务队列已满且线程池中的线程数达到最大线程数时,新任务到来时的处理策略。
1.2 内置饱和策略
Java 线程池提供了几种内置的饱和策略:
- AbortPolicy:这是默认的饱和策略。当新任务无法提交时(任务队列已满且线程数达到最大线程数),会抛出
RejectedExecutionException
异常,拒绝执行该任务。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- CallerRunsPolicy:当新任务无法提交时,该策略会让提交任务的线程(即调用
execute
方法的线程)直接执行该任务。这样做的好处是可以降低新任务提交的速度,同时利用调用线程的资源来处理任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
- DiscardPolicy:当新任务无法提交时,直接丢弃该任务,不做任何处理。这种策略适用于那些对任务执行结果不敏感,且任务数量过多可能导致系统资源耗尽的场景。
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
- DiscardOldestPolicy:当新任务无法提交时,该策略会丢弃任务队列中最老的一个任务(即队列头部的任务),然后尝试重新提交新任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
二、CompletableFuture 与线程池
2.1 CompletableFuture 简介
CompletableFuture
是 Java 8 引入的一个强大的异步编程工具,它实现了 Future
接口和 CompletionStage
接口。CompletableFuture
允许我们以一种更灵活、更易于组合的方式处理异步任务,包括链式调用、并行执行多个任务以及处理任务完成后的结果等。
例如,我们可以通过 CompletableFuture.supplyAsync
方法异步执行一个有返回值的任务:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, CompletableFuture!";
});
future.thenAccept(System.out::println);
在上述代码中,supplyAsync
方法接收一个 Supplier
作为参数,并返回一个 CompletableFuture
。任务在一个线程池中异步执行,当任务完成后,thenAccept
方法会处理任务的结果并打印输出。
2.2 CompletableFuture 与线程池的关系
CompletableFuture
在执行异步任务时,默认会使用 ForkJoinPool.commonPool()
作为线程池。ForkJoinPool
是 Java 7 引入的一种特殊的线程池,它采用工作窃取算法来提高并行处理能力。工作窃取算法允许空闲的线程从繁忙的线程队列中窃取任务来执行,从而提高系统的整体吞吐量。
然而,在某些情况下,默认的 ForkJoinPool.commonPool()
可能无法满足我们的需求。例如,在一个高并发的应用中,如果所有的异步任务都使用 ForkJoinPool.commonPool()
,可能会导致线程资源竞争,影响任务的执行效率。此外,ForkJoinPool.commonPool()
的线程数量是根据 CPU 核心数动态调整的,可能无法精确控制线程池的大小。
为了更好地控制异步任务的执行,CompletableFuture
提供了一些方法来指定自定义的线程池。例如,supplyAsync(Supplier<U> supplier, Executor executor)
方法可以接收一个自定义的 Executor
作为参数,从而使用自定义线程池来执行异步任务。
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, CompletableFuture with custom executor!";
}, executor);
future.thenAccept(System.out::println);
executor.shutdown();
在上述代码中,我们创建了一个固定大小为 10 的线程池 executor
,并将其作为参数传递给 supplyAsync
方法,这样异步任务就会在这个自定义线程池中执行。
三、CompletableFuture 自定义线程池饱和策略实现
3.1 自定义饱和策略类
当我们为 CompletableFuture
使用自定义线程池时,同样需要考虑线程池的饱和策略。如果不设置饱和策略,线程池将使用默认的 AbortPolicy
。下面我们通过实现 RejectedExecutionHandler
接口来自定义一个饱和策略。
假设我们希望在任务被拒绝时,将任务信息记录到日志中,并尝试在一段时间后重新提交任务。我们可以创建如下自定义饱和策略类:
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
private static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class.getName());
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
LOGGER.log(Level.WARNING, "Task {0} rejected from {1}", new Object[]{r, e});
try {
// 等待一段时间后尝试重新提交任务
TimeUnit.SECONDS.sleep(5);
e.submit(r);
} catch (InterruptedException | RejectedExecutionException ex) {
LOGGER.log(Level.SEVERE, "Failed to resubmit task", ex);
}
}
}
在上述代码中,CustomRejectedExecutionHandler
类实现了 RejectedExecutionHandler
接口的 rejectedExecution
方法。当任务被拒绝时,首先记录任务被拒绝的日志信息,然后线程等待 5 秒,之后尝试重新提交任务。如果重新提交任务时再次出现异常(如线程池已关闭),则记录严重错误日志。
3.2 使用自定义线程池和饱和策略
接下来,我们将自定义的饱和策略应用到 CompletableFuture
使用的自定义线程池中。
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;
public class CompletableFutureCustomThreadPoolExample {
private static final Logger LOGGER = Logger.getLogger(CompletableFutureCustomThreadPoolExample.class.getName());
public static void main(String[] args) {
// 创建自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new CustomRejectedExecutionHandler()
);
for (int i = 0; i < 20; i++) {
int taskNumber = i;
CompletableFuture.supplyAsync(() -> {
LOGGER.log(Level.INFO, "Task {0} is running", taskNumber);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task " + taskNumber + " completed";
}, executor)
.thenAccept(result -> LOGGER.log(Level.INFO, result));
}
// 关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
LOGGER.log(Level.SEVERE, "Pool did not terminate");
}
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
在上述代码中,我们创建了一个 ThreadPoolExecutor
线程池,核心线程数为 5,最大线程数为 10,任务队列容量为 5,并使用了我们自定义的 CustomRejectedExecutionHandler
饱和策略。然后,通过循环提交 20 个异步任务,每个任务模拟耗时 2 秒的操作。当任务提交数量超过线程池和任务队列的承载能力时,自定义的饱和策略将被触发。
最后,在程序结束时,我们通过调用 executor.shutdown()
方法来启动线程池的关闭过程,并使用 awaitTermination
方法等待所有任务执行完毕或超时。如果超时仍有任务未完成,则调用 executor.shutdownNow()
尝试中断正在执行的任务。
四、实际应用场景分析
4.1 高并发 Web 应用
在高并发的 Web 应用中,大量的请求需要被异步处理。例如,一个电商网站在促销活动期间,可能会有大量的订单处理请求、库存更新请求等。如果使用默认的 ForkJoinPool.commonPool()
,可能会导致线程资源竞争,影响用户体验。通过为 CompletableFuture
使用自定义线程池,并设置合适的饱和策略,可以更好地控制任务的执行。
假设我们的电商应用有一个订单处理模块,当用户下单后,需要异步处理订单,包括扣除库存、更新订单状态等操作。我们可以创建一个专门的线程池来处理订单相关的异步任务,并设置自定义饱和策略。如果订单处理任务被拒绝,我们可以记录日志并尝试重新提交,以确保订单的正常处理。
4.2 大数据处理
在大数据处理场景中,常常需要对大量的数据进行并行计算。例如,在一个数据分析系统中,需要对海量的用户行为数据进行清洗、统计和分析。使用 CompletableFuture
结合自定义线程池可以有效地提高数据处理的效率。
假设我们有一个任务是对用户行为数据进行实时分析,计算每个用户的活跃度。我们可以将数据分割成多个小块,使用 CompletableFuture
并行处理这些小块数据。由于数据量可能非常大,线程池可能会面临饱和的情况。此时,我们可以设置一个合适的饱和策略,如 DiscardOldestPolicy
,丢弃较早的数据块处理任务,以保证最新的数据能够及时得到处理。
4.3 分布式系统
在分布式系统中,各个节点之间需要进行异步通信和任务协作。例如,一个微服务架构的系统中,不同的微服务之间可能需要异步调用对方的接口来完成业务逻辑。使用 CompletableFuture
可以方便地处理这些异步调用。
假设我们有一个订单微服务和一个支付微服务,订单微服务在接收到订单后,需要异步调用支付微服务进行支付处理。我们可以使用 CompletableFuture
来处理这个异步调用,并为其使用自定义线程池。在分布式系统中,网络波动等原因可能导致任务处理时间变长,线程池更容易饱和。通过设置合适的饱和策略,如 CallerRunsPolicy
,可以在一定程度上缓解系统压力,确保关键任务能够得到执行。
五、性能优化与注意事项
5.1 线程池参数调优
在使用自定义线程池时,合理调整线程池的参数对于性能优化至关重要。
- 核心线程数:应根据任务的类型和系统的资源情况来确定。如果任务是 CPU 密集型的,核心线程数可以设置为 CPU 核心数;如果是 I/O 密集型的,核心线程数可以适当增大,以充分利用 CPU 资源等待 I/O 操作完成。
- 最大线程数:不宜设置过大,否则可能导致系统资源耗尽。一般来说,可以根据系统的内存、CPU 等资源情况来估算最大线程数。例如,在一个内存有限的服务器上,如果每个线程占用一定的内存空间,过多的线程可能会导致内存溢出。
- 任务队列:选择合适的任务队列类型和容量也很重要。对于有界队列(如
ArrayBlockingQueue
),需要根据任务的流量和处理速度来设置合适的容量,避免队列溢出。而无界队列(如LinkedBlockingQueue
)虽然不会溢出,但可能会导致任务堆积,占用大量内存。
5.2 饱和策略选择
不同的饱和策略适用于不同的场景,需要根据业务需求来选择。
- AbortPolicy:适用于对任务执行失败敏感的场景,如金融交易系统,任何一笔交易任务的失败都可能导致严重后果,此时抛出异常可以及时发现问题。
- CallerRunsPolicy:适用于希望降低新任务提交速度的场景,同时可以利用调用线程的资源来处理任务。例如,在一个单机应用中,当线程池饱和时,让调用线程执行任务可以避免任务丢失。
- DiscardPolicy:适用于对任务执行结果不敏感,且任务数量过多可能导致系统资源耗尽的场景。例如,在一些日志记录任务中,如果日志记录任务过多导致线程池饱和,丢弃部分任务可能不会对系统造成严重影响。
- DiscardOldestPolicy:适用于希望优先处理新任务的场景。例如,在实时数据分析系统中,新的数据可能更有价值,丢弃较老的数据处理任务可以保证系统对最新数据的响应速度。
5.3 异常处理
在使用 CompletableFuture
和自定义线程池时,需要注意异常处理。当任务在异步执行过程中抛出异常时,如果不进行处理,可能会导致程序出现难以排查的问题。CompletableFuture
提供了 exceptionally
方法来处理异常。
CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Simulated exception");
}
return "Task completed successfully";
}, executor)
.exceptionally(ex -> {
LOGGER.log(Level.SEVERE, "Task failed", ex);
return "Task failed: " + ex.getMessage();
})
.thenAccept(System.out::println);
在上述代码中,exceptionally
方法接收一个 Function
作为参数,当任务抛出异常时,该 Function
会被执行,我们可以在其中记录异常信息并返回一个默认结果。
此外,在线程池饱和策略中,如果重新提交任务时抛出异常,也需要进行适当的处理,如我们在 CustomRejectedExecutionHandler
类中记录了重新提交任务失败的异常日志。
5.4 资源管理
在使用自定义线程池时,要注意资源的管理和释放。在程序结束时,一定要正确关闭线程池,避免线程泄漏。如我们在前面的示例中,通过调用 executor.shutdown()
和 awaitTermination
方法来确保线程池的正确关闭。
同时,对于自定义的线程工厂,如果创建了一些需要释放的资源(如文件句柄、数据库连接等),也需要在适当的时候进行释放。例如,可以在自定义线程工厂的 newThread
方法中创建资源,在 Thread
的 run
方法结束时释放资源。
六、总结与展望
通过深入了解 Java 线程池的饱和策略以及 CompletableFuture 与线程池的关系,我们能够为 CompletableFuture 定制合适的线程池饱和策略,以满足不同应用场景的需求。在实际开发中,合理地选择和配置线程池参数、饱和策略,并做好异常处理和资源管理,能够显著提高应用程序的性能和稳定性。
随着技术的不断发展,异步编程和并发处理在软件开发中的重要性日益凸显。未来,我们可能会看到更多针对不同场景优化的线程池实现和饱和策略,以及更便捷的异步编程工具和框架。作为开发者,我们需要不断学习和掌握这些新技术,以提升我们的编程能力和解决复杂问题的能力。
希望本文对您理解和应用 Java 中 CompletableFuture 自定义线程池饱和策略有所帮助。在实际项目中,根据具体业务需求灵活运用这些知识,能够打造出更高效、更稳定的应用程序。