Java 线程池状态的关闭过程
Java 线程池状态概述
在深入探讨 Java 线程池状态的关闭过程之前,我们先来了解一下线程池的状态概念。Java 线程池中的状态是用于标识线程池当前所处的运行阶段,它对线程池的管理和调度起着至关重要的作用。
线程池状态被定义在 ThreadPoolExecutor
类中,通过一个 32 位的整数来表示,其中高 3 位用于存储线程池状态,低 29 位用于存储线程池中当前活动线程的数量。这种设计方式使得线程池的状态和活动线程数可以通过一个变量进行原子性的操作和维护。
线程池的基本状态
- RUNNING:这是线程池的初始状态。处于
RUNNING
状态时,线程池可以接受新的任务,并处理阻塞队列中的任务。它的数值为一个负数,高 3 位二进制表示为111
。 - SHUTDOWN:当调用线程池的
shutdown()
方法时,线程池会进入SHUTDOWN
状态。此时,线程池不再接受新的任务,但会继续处理阻塞队列中已有的任务。它的数值为0
,高 3 位二进制表示为000
。 - STOP:当调用线程池的
shutdownNow()
方法时,线程池会进入STOP
状态。在这个状态下,线程池不仅不再接受新任务,还会中断正在执行的任务,并尝试清空阻塞队列。它的数值为一个正数,高 3 位二进制表示为001
。 - TIDYING:当所有任务都已终止(包括正在执行的任务被中断和阻塞队列中的任务被处理完),并且线程池中活动线程数为
0
时,线程池会进入TIDYING
状态。在这个状态下,会调用terminated()
方法,该方法默认是空实现,用户可以根据需求进行重写。它的数值为0
,高 3 位二进制表示为010
。 - TERMINATED:当
terminated()
方法执行完毕后,线程池会进入TERMINATED
状态,这表示线程池已经彻底终止。它的数值为0
,高 3 位二进制表示为011
。
Java 线程池状态的关闭过程分析
调用 shutdown() 方法
- 方法定义
在
ThreadPoolExecutor
类中,shutdown()
方法的定义如下:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // 扩展点,默认空实现
} finally {
mainLock.unlock();
}
tryTerminate();
}
- 过程解析
- 获取锁:首先获取
mainLock
,这是一个可重入锁,用于保证对线程池状态和相关操作的线程安全。 - 权限检查:调用
checkShutdownAccess()
方法,该方法用于检查当前线程是否有权限关闭线程池。如果没有权限,会抛出SecurityException
。 - 更新状态:通过
advanceRunState(SHUTDOWN)
方法将线程池状态更新为SHUTDOWN
。这个方法会将线程池状态的高 3 位更新为000
,同时保持低 29 位的活动线程数不变。 - 中断空闲线程:调用
interruptIdleWorkers()
方法,该方法会遍历线程池中的所有工作线程,并中断那些处于空闲状态(即正在等待从阻塞队列中获取任务)的线程。这样做是为了尽快停止那些不再有新任务可执行的线程。 - 扩展点:
onShutdown()
是一个扩展点,默认是空实现。用户可以继承ThreadPoolExecutor
类并覆盖这个方法,在关闭线程池时执行一些自定义的逻辑。 - 尝试终止:最后调用
tryTerminate()
方法,该方法会尝试将线程池状态从SHUTDOWN
推进到TIDYING
。如果所有任务都已完成且活动线程数为0
,则会成功推进到TIDYING
状态,并调用terminated()
方法。
- 获取锁:首先获取
调用 shutdownNow() 方法
- 方法定义
在
ThreadPoolExecutor
类中,shutdownNow()
方法的定义如下:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
- 过程解析
- 获取锁:同样先获取
mainLock
锁,确保线程安全。 - 权限检查:调用
checkShutdownAccess()
方法检查关闭权限。 - 更新状态:通过
advanceRunState(STOP)
方法将线程池状态更新为STOP
,高 3 位变为001
。 - 中断所有线程:调用
interruptWorkers()
方法,与shutdown()
方法中中断空闲线程不同,这里会中断所有的工作线程,无论它们是正在执行任务还是处于空闲状态。 - 清空队列:调用
drainQueue()
方法,该方法会将阻塞队列中的任务全部移除,并返回这些任务的列表。这样做可以防止这些任务继续被执行。 - 尝试终止:最后调用
tryTerminate()
方法,尝试将线程池状态从STOP
推进到TIDYING
。如果所有任务都已终止(包括被中断)且活动线程数为0
,则会进入TIDYING
状态,并调用terminated()
方法。
- 获取锁:同样先获取
tryTerminate() 方法的作用
tryTerminate()
方法在 shutdown()
和 shutdownNow()
方法中都有调用,它的作用是尝试将线程池状态从 SHUTDOWN
或 STOP
推进到 TIDYING
。
- 方法定义
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN &&!workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
- 过程解析
- 条件判断:首先检查当前线程池状态,如果是
RUNNING
状态,或者已经处于TIDYING
或TERMINATED
状态,或者是SHUTDOWN
状态且阻塞队列不为空,则直接返回,不进行状态推进。 - 中断空闲线程:如果活动线程数不为
0
,则调用interruptIdleWorkers(ONLY_ONE)
方法中断一个空闲线程,然后返回。这是为了尽快减少活动线程数,以便满足进入TIDYING
状态的条件。 - 尝试更新状态:获取
mainLock
锁后,使用ctl.compareAndSet(c, ctlOf(TIDYING, 0))
尝试将线程池状态更新为TIDYING
,并将活动线程数设置为0
。如果更新成功,则调用terminated()
方法,然后将状态更新为TERMINATED
。如果更新失败,则继续循环重试。
- 条件判断:首先检查当前线程池状态,如果是
terminated() 方法
terminated()
方法在 ThreadPoolExecutor
类中是一个空实现,它是一个扩展点,用户可以继承 ThreadPoolExecutor
类并覆盖这个方法,在其中执行一些线程池终止时的清理或通知逻辑。
protected void terminated() { }
例如,在一个自定义的线程池中,可以在 terminated()
方法中关闭一些资源:
class CustomThreadPool extends ThreadPoolExecutor {
@Override
protected void terminated() {
// 关闭数据库连接等资源
System.out.println("线程池已终止,执行资源清理操作");
}
}
代码示例
基本线程池创建与关闭示例
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolShutdownExample {
public static void main(String[] args) {
// 创建一个阻塞队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
10, // 存活时间
TimeUnit.SECONDS,
workQueue);
// 提交任务
for (int i = 0; i < 15; i++) {
int taskNumber = i;
executor.submit(() -> {
System.out.println("任务 " + taskNumber + " 开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务 " + taskNumber + " 执行完毕");
});
}
// 调用 shutdown() 方法
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能在规定时间内终止");
}
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
在这个示例中,我们创建了一个线程池,并提交了 15 个任务。然后调用 shutdown()
方法关闭线程池,并通过 awaitTermination()
方法等待线程池终止。如果在 60 秒内线程池没有终止,则调用 shutdownNow()
方法尝试强制终止。
自定义线程池及 terminated() 方法示例
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class CustomThreadPool extends ThreadPoolExecutor {
public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void terminated() {
System.out.println("自定义线程池已终止,执行自定义清理操作");
}
}
public class CustomThreadPoolShutdownExample {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
CustomThreadPool executor = new CustomThreadPool(
2,
4,
10,
TimeUnit.SECONDS,
workQueue);
for (int i = 0; i < 15; i++) {
int taskNumber = i;
executor.submit(() -> {
System.out.println("任务 " + taskNumber + " 开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务 " + taskNumber + " 执行完毕");
});
}
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能在规定时间内终止");
}
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
在这个示例中,我们自定义了一个线程池 CustomThreadPool
,并重写了 terminated()
方法。当线程池终止时,会执行 terminated()
方法中的自定义逻辑。
总结线程池关闭过程中的注意事项
- 任务处理情况:在调用
shutdown()
方法后,线程池会继续处理阻塞队列中的任务,但不再接受新任务。而调用shutdownNow()
方法会中断正在执行的任务并清空队列,可能导致部分任务无法正常完成。因此,在选择关闭方法时,需要根据业务需求来决定是否允许任务被中断。 - 等待终止:调用关闭方法后,通常需要调用
awaitTermination()
方法等待线程池终止,以确保所有任务都能得到妥善处理。如果不等待,可能会在任务还未完成时就退出程序。 - 异常处理:在等待线程池终止的过程中,可能会抛出
InterruptedException
。此时,需要根据情况进行适当的处理,例如再次调用关闭方法并中断当前线程。 - 资源清理:如果线程池在运行过程中持有一些外部资源(如数据库连接、文件句柄等),需要在
terminated()
方法中进行清理,以避免资源泄漏。
通过深入理解 Java 线程池状态的关闭过程以及合理使用相关方法,我们可以更好地管理线程池,确保应用程序的性能和稳定性。在实际应用中,根据具体业务场景选择合适的关闭策略,并注意资源清理和异常处理等问题,是编写健壮多线程程序的关键。