Java ThreadPoolExecutor 核心技术
Java ThreadPoolExecutor 核心技术
1. 线程池概述
在Java多线程编程中,线程池是一种重要的工具,它可以有效地管理和复用线程,提高系统的性能和资源利用率。创建和销毁线程是有一定开销的,如果在高并发场景下频繁地创建和销毁线程,会极大地消耗系统资源,降低系统的响应速度。线程池通过预先创建一定数量的线程,并将这些线程保存在池中,当有任务需要执行时,从线程池中取出一个线程来执行任务,任务执行完毕后,线程不会被销毁,而是返回线程池供下次使用。
2. ThreadPoolExecutor 类
ThreadPoolExecutor 是Java提供的线程池实现类,它实现了ExecutorService接口。通过ThreadPoolExecutor,我们可以灵活地控制线程池的各种参数,以适应不同的应用场景。
ThreadPoolExecutor 的构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程数,线程池中会一直存活的线程数量,即使这些线程处于空闲状态。当有新任务提交且线程池中的线程数小于 corePoolSize 时,会创建新的线程来执行任务。
- maximumPoolSize:线程池允许的最大线程数。当任务队列已满且线程池中的线程数小于 maximumPoolSize 时,会创建新的线程来处理任务。
- keepAliveTime:非核心线程的存活时间。当线程池中的线程数超过 corePoolSize 时,多余的非核心线程如果在 keepAliveTime 时间内没有任务可执行,就会被销毁。
- unit:keepAliveTime 的时间单位,例如 TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)等。
- workQueue:任务队列,用于存放暂时无法被执行的任务。常用的任务队列有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 等。
- threadFactory:线程工厂,用于创建新的线程。通过线程工厂,我们可以定制线程的名称、优先级等属性。
- handler:拒绝策略,当线程池和任务队列都已满,无法处理新的任务时,会采用拒绝策略来处理新任务。常见的拒绝策略有 AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy 等。
3. 任务执行流程
当一个新任务提交到 ThreadPoolExecutor 时,其执行流程如下:
- 步骤一:核心线程处理 如果当前线程池中的线程数小于 corePoolSize,会创建一个新的线程来执行任务。
- 步骤二:任务队列存储 如果当前线程池中的线程数等于 corePoolSize,任务会被放入任务队列 workQueue 中等待执行。
- 步骤三:最大线程数处理 如果任务队列已满,且当前线程池中的线程数小于 maximumPoolSize,会创建新的线程来执行任务。
- 步骤四:拒绝策略处理 如果任务队列已满,且当前线程池中的线程数达到 maximumPoolSize,新任务会根据拒绝策略进行处理。
4. 任务队列
ThreadPoolExecutor 支持多种类型的任务队列,不同的任务队列具有不同的特性,适用于不同的场景。
- ArrayBlockingQueue:基于数组的有界阻塞队列。它在创建时需要指定容量大小,当队列已满时,新的任务无法再添加进来。这种队列的优点是在高并发场景下性能较好,因为它内部使用数组存储任务,访问速度快。缺点是容量固定,当任务量突然增加时,如果容量设置不合理,可能会导致任务无法添加。 示例代码:
BlockingQueue<Runnable> arrayQueue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, arrayQueue);
- LinkedBlockingQueue:基于链表的无界阻塞队列(也可以指定容量变为有界队列)。它的优点是可以动态扩展容量,适用于任务量不确定的场景。但由于它使用链表存储任务,在高并发场景下,链表的插入和删除操作可能会带来一定的性能开销。 示例代码:
BlockingQueue<Runnable> linkedQueue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, linkedQueue);
- SynchronousQueue:同步队列,它不存储任务,每个插入操作必须等待另一个线程的移除操作,反之亦然。这种队列适用于任务处理速度较快,不希望任务在队列中等待的场景。由于它没有容量,当有新任务提交时,如果没有空闲线程,就会创建新的线程,直到达到 maximumPoolSize。 示例代码:
BlockingQueue<Runnable> syncQueue = new SynchronousQueue<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, syncQueue);
5. 拒绝策略
当线程池和任务队列都已满,无法处理新的任务时,ThreadPoolExecutor 会采用拒绝策略来处理新任务。
- AbortPolicy:默认的拒绝策略,当任务无法处理时,会抛出 RejectedExecutionException 异常。这种策略适用于需要立即知道任务是否被成功处理的场景。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
new ThreadPoolExecutor.AbortPolicy());
- CallerRunsPolicy:当任务无法处理时,会将任务返回给调用者,由调用者所在的线程来执行任务。这种策略可以降低新任务的提交速度,适用于对任务执行时间要求不高,且希望在系统负载过高时降低任务提交频率的场景。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
new ThreadPoolExecutor.CallerRunsPolicy());
- DiscardPolicy:当任务无法处理时,直接丢弃该任务,不做任何处理。这种策略适用于对任务执行结果不关心,且任务量较大的场景。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
new ThreadPoolExecutor.DiscardPolicy());
- DiscardOldestPolicy:当任务无法处理时,会丢弃任务队列中最老的一个任务,然后尝试将新任务添加到任务队列中。这种策略适用于希望优先处理新任务的场景。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
new ThreadPoolExecutor.DiscardOldestPolicy());
6. 线程池状态
ThreadPoolExecutor 有几种不同的状态,用于表示线程池的运行情况。
- RUNNING:线程池处于运行状态,可以接受新任务并处理任务队列中的任务。
- SHUTDOWN:线程池处于关闭状态,不再接受新任务,但会继续处理任务队列中的任务。
- STOP:线程池处于停止状态,不再接受新任务,并且会中断正在执行的任务,清空任务队列。
- TIDYING:所有任务都已终止,线程池中的线程数为0,即将进入TERMINATED状态。
- TERMINATED:线程池已完全终止。
可以通过调用 getState() 方法获取线程池的当前状态。例如:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
int state = executor.getState();
7. 线程池的关闭
ThreadPoolExecutor 提供了两种关闭线程池的方法:shutdown() 和 shutdownNow()。
- shutdown():启动一个有序关闭过程,不再接受新任务,但会继续处理任务队列中的任务。当所有任务都处理完毕后,线程池会进入TERMINATED状态。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
executor.shutdown();
while (!executor.isTerminated()) {
// 等待线程池完全关闭
}
- shutdownNow():尝试停止所有正在执行的任务,停止处理任务队列中的任务,并返回等待执行的任务列表。线程池会进入STOP状态,最终进入TERMINATED状态。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
List<Runnable> tasks = executor.shutdownNow();
while (!executor.isTerminated()) {
// 等待线程池完全关闭
}
8. 线程池监控
为了更好地了解线程池的运行情况,我们可以对线程池进行监控。ThreadPoolExecutor 提供了一些方法来获取线程池的相关信息。
- getTaskCount():返回已提交到线程池的任务总数,包括正在执行的和等待执行的任务。
- getCompletedTaskCount():返回已完成的任务数。
- getPoolSize():返回当前线程池中的线程数。
- getActiveCount():返回当前正在执行任务的线程数。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
long taskCount = executor.getTaskCount();
long completedTaskCount = executor.getCompletedTaskCount();
int poolSize = executor.getPoolSize();
int activeCount = executor.getActiveCount();
9. 自定义线程池
在实际应用中,我们可能需要根据具体的业务需求自定义线程池。例如,自定义线程工厂来设置线程的名称、优先级等属性,或者自定义拒绝策略来满足特殊的业务逻辑。
- 自定义线程工厂:
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public CustomThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + "-Thread-" + threadNumber.getAndIncrement());
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
使用自定义线程工厂创建线程池:
CustomThreadFactory factory = new CustomThreadFactory("MyThreadPool");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), factory);
- 自定义拒绝策略:
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Task " + r + " rejected, executor state: " + executor.getState());
// 自定义处理逻辑,例如记录日志、尝试重新提交任务等
}
}
使用自定义拒绝策略创建线程池:
CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), handler);
10. 合理配置线程池参数
合理配置线程池参数对于提高系统性能至关重要。在配置线程池参数时,需要考虑以下几个因素:
- 任务类型:如果任务是CPU密集型的,线程数不宜过多,一般设置为 CPU 核心数 + 1,因为过多的线程会导致频繁的上下文切换,降低性能。如果任务是I/O密集型的,线程数可以适当增加,因为I/O操作时线程会处于等待状态,不会占用CPU资源。
- 任务执行时间:如果任务执行时间较短,可以适当增加线程数,以充分利用系统资源。如果任务执行时间较长,需要控制线程数,避免线程过多导致系统资源耗尽。
- 系统资源:需要考虑系统的内存、CPU等资源限制,避免线程过多导致系统崩溃。
例如,对于CPU密集型任务:
int cpuCoreCount = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
cpuCoreCount, cpuCoreCount + 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
对于I/O密集型任务:
int cpuCoreCount = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2 * cpuCoreCount, 2 * cpuCoreCount + 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
11. 总结
ThreadPoolExecutor 是Java多线程编程中一个强大而灵活的工具,通过合理配置其参数,选择合适的任务队列和拒绝策略,可以有效地提高系统的性能和资源利用率。在实际应用中,需要根据具体的业务场景和系统资源情况,对线程池进行优化和调整,以达到最佳的运行效果。同时,对线程池进行监控和管理,可以及时发现和解决潜在的问题,保证系统的稳定性和可靠性。希望通过本文的介绍,读者对 ThreadPoolExecutor 的核心技术有更深入的理解和掌握,能够在实际项目中灵活运用线程池来提升系统性能。