MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java ThreadPoolExecutor 核心技术

2024-05-315.9k 阅读

Java ThreadPoolExecutor 核心技术

1. 线程池概述

在Java多线程编程中,线程池是一种重要的工具,它可以有效地管理和复用线程,提高系统的性能和资源利用率。创建和销毁线程是有一定开销的,如果在高并发场景下频繁地创建和销毁线程,会极大地消耗系统资源,降低系统的响应速度。线程池通过预先创建一定数量的线程,并将这些线程保存在池中,当有任务需要执行时,从线程池中取出一个线程来执行任务,任务执行完毕后,线程不会被销毁,而是返回线程池供下次使用。

2. ThreadPoolExecutor 类

ThreadPoolExecutor 是Java提供的线程池实现类,它实现了ExecutorService接口。通过ThreadPoolExecutor,我们可以灵活地控制线程池的各种参数,以适应不同的应用场景。

ThreadPoolExecutor 的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:核心线程数,线程池中会一直存活的线程数量,即使这些线程处于空闲状态。当有新任务提交且线程池中的线程数小于 corePoolSize 时,会创建新的线程来执行任务。
  • maximumPoolSize:线程池允许的最大线程数。当任务队列已满且线程池中的线程数小于 maximumPoolSize 时,会创建新的线程来处理任务。
  • keepAliveTime:非核心线程的存活时间。当线程池中的线程数超过 corePoolSize 时,多余的非核心线程如果在 keepAliveTime 时间内没有任务可执行,就会被销毁。
  • unit:keepAliveTime 的时间单位,例如 TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)等。
  • workQueue:任务队列,用于存放暂时无法被执行的任务。常用的任务队列有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 等。
  • threadFactory:线程工厂,用于创建新的线程。通过线程工厂,我们可以定制线程的名称、优先级等属性。
  • handler:拒绝策略,当线程池和任务队列都已满,无法处理新的任务时,会采用拒绝策略来处理新任务。常见的拒绝策略有 AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy 等。

3. 任务执行流程

当一个新任务提交到 ThreadPoolExecutor 时,其执行流程如下:

  1. 步骤一:核心线程处理 如果当前线程池中的线程数小于 corePoolSize,会创建一个新的线程来执行任务。
  2. 步骤二:任务队列存储 如果当前线程池中的线程数等于 corePoolSize,任务会被放入任务队列 workQueue 中等待执行。
  3. 步骤三:最大线程数处理 如果任务队列已满,且当前线程池中的线程数小于 maximumPoolSize,会创建新的线程来执行任务。
  4. 步骤四:拒绝策略处理 如果任务队列已满,且当前线程池中的线程数达到 maximumPoolSize,新任务会根据拒绝策略进行处理。

4. 任务队列

ThreadPoolExecutor 支持多种类型的任务队列,不同的任务队列具有不同的特性,适用于不同的场景。

  • ArrayBlockingQueue:基于数组的有界阻塞队列。它在创建时需要指定容量大小,当队列已满时,新的任务无法再添加进来。这种队列的优点是在高并发场景下性能较好,因为它内部使用数组存储任务,访问速度快。缺点是容量固定,当任务量突然增加时,如果容量设置不合理,可能会导致任务无法添加。 示例代码:
BlockingQueue<Runnable> arrayQueue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, arrayQueue);
  • LinkedBlockingQueue:基于链表的无界阻塞队列(也可以指定容量变为有界队列)。它的优点是可以动态扩展容量,适用于任务量不确定的场景。但由于它使用链表存储任务,在高并发场景下,链表的插入和删除操作可能会带来一定的性能开销。 示例代码:
BlockingQueue<Runnable> linkedQueue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, linkedQueue);
  • SynchronousQueue:同步队列,它不存储任务,每个插入操作必须等待另一个线程的移除操作,反之亦然。这种队列适用于任务处理速度较快,不希望任务在队列中等待的场景。由于它没有容量,当有新任务提交时,如果没有空闲线程,就会创建新的线程,直到达到 maximumPoolSize。 示例代码:
BlockingQueue<Runnable> syncQueue = new SynchronousQueue<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, syncQueue);

5. 拒绝策略

当线程池和任务队列都已满,无法处理新的任务时,ThreadPoolExecutor 会采用拒绝策略来处理新任务。

  • AbortPolicy:默认的拒绝策略,当任务无法处理时,会抛出 RejectedExecutionException 异常。这种策略适用于需要立即知道任务是否被成功处理的场景。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
    new ThreadPoolExecutor.AbortPolicy());
  • CallerRunsPolicy:当任务无法处理时,会将任务返回给调用者,由调用者所在的线程来执行任务。这种策略可以降低新任务的提交速度,适用于对任务执行时间要求不高,且希望在系统负载过高时降低任务提交频率的场景。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
    new ThreadPoolExecutor.CallerRunsPolicy());
  • DiscardPolicy:当任务无法处理时,直接丢弃该任务,不做任何处理。这种策略适用于对任务执行结果不关心,且任务量较大的场景。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
    new ThreadPoolExecutor.DiscardPolicy());
  • DiscardOldestPolicy:当任务无法处理时,会丢弃任务队列中最老的一个任务,然后尝试将新任务添加到任务队列中。这种策略适用于希望优先处理新任务的场景。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
    new ThreadPoolExecutor.DiscardOldestPolicy());

6. 线程池状态

ThreadPoolExecutor 有几种不同的状态,用于表示线程池的运行情况。

  • RUNNING:线程池处于运行状态,可以接受新任务并处理任务队列中的任务。
  • SHUTDOWN:线程池处于关闭状态,不再接受新任务,但会继续处理任务队列中的任务。
  • STOP:线程池处于停止状态,不再接受新任务,并且会中断正在执行的任务,清空任务队列。
  • TIDYING:所有任务都已终止,线程池中的线程数为0,即将进入TERMINATED状态。
  • TERMINATED:线程池已完全终止。

可以通过调用 getState() 方法获取线程池的当前状态。例如:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
int state = executor.getState();

7. 线程池的关闭

ThreadPoolExecutor 提供了两种关闭线程池的方法:shutdown() 和 shutdownNow()。

  • shutdown():启动一个有序关闭过程,不再接受新任务,但会继续处理任务队列中的任务。当所有任务都处理完毕后,线程池会进入TERMINATED状态。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
executor.shutdown();
while (!executor.isTerminated()) {
    // 等待线程池完全关闭
}
  • shutdownNow():尝试停止所有正在执行的任务,停止处理任务队列中的任务,并返回等待执行的任务列表。线程池会进入STOP状态,最终进入TERMINATED状态。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
List<Runnable> tasks = executor.shutdownNow();
while (!executor.isTerminated()) {
    // 等待线程池完全关闭
}

8. 线程池监控

为了更好地了解线程池的运行情况,我们可以对线程池进行监控。ThreadPoolExecutor 提供了一些方法来获取线程池的相关信息。

  • getTaskCount():返回已提交到线程池的任务总数,包括正在执行的和等待执行的任务。
  • getCompletedTaskCount():返回已完成的任务数。
  • getPoolSize():返回当前线程池中的线程数。
  • getActiveCount():返回当前正在执行任务的线程数。 示例代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
long taskCount = executor.getTaskCount();
long completedTaskCount = executor.getCompletedTaskCount();
int poolSize = executor.getPoolSize();
int activeCount = executor.getActiveCount();

9. 自定义线程池

在实际应用中,我们可能需要根据具体的业务需求自定义线程池。例如,自定义线程工厂来设置线程的名称、优先级等属性,或者自定义拒绝策略来满足特殊的业务逻辑。

  • 自定义线程工厂
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public CustomThreadFactory(String namePrefix) {
        this.namePrefix = namePrefix;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, namePrefix + "-Thread-" + threadNumber.getAndIncrement());
        t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

使用自定义线程工厂创建线程池:

CustomThreadFactory factory = new CustomThreadFactory("MyThreadPool");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), factory);
  • 自定义拒绝策略
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task " + r + " rejected, executor state: " + executor.getState());
        // 自定义处理逻辑,例如记录日志、尝试重新提交任务等
    }
}

使用自定义拒绝策略创建线程池:

CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), handler);

10. 合理配置线程池参数

合理配置线程池参数对于提高系统性能至关重要。在配置线程池参数时,需要考虑以下几个因素:

  • 任务类型:如果任务是CPU密集型的,线程数不宜过多,一般设置为 CPU 核心数 + 1,因为过多的线程会导致频繁的上下文切换,降低性能。如果任务是I/O密集型的,线程数可以适当增加,因为I/O操作时线程会处于等待状态,不会占用CPU资源。
  • 任务执行时间:如果任务执行时间较短,可以适当增加线程数,以充分利用系统资源。如果任务执行时间较长,需要控制线程数,避免线程过多导致系统资源耗尽。
  • 系统资源:需要考虑系统的内存、CPU等资源限制,避免线程过多导致系统崩溃。

例如,对于CPU密集型任务:

int cpuCoreCount = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    cpuCoreCount, cpuCoreCount + 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));

对于I/O密集型任务:

int cpuCoreCount = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2 * cpuCoreCount, 2 * cpuCoreCount + 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));

11. 总结

ThreadPoolExecutor 是Java多线程编程中一个强大而灵活的工具,通过合理配置其参数,选择合适的任务队列和拒绝策略,可以有效地提高系统的性能和资源利用率。在实际应用中,需要根据具体的业务场景和系统资源情况,对线程池进行优化和调整,以达到最佳的运行效果。同时,对线程池进行监控和管理,可以及时发现和解决潜在的问题,保证系统的稳定性和可靠性。希望通过本文的介绍,读者对 ThreadPoolExecutor 的核心技术有更深入的理解和掌握,能够在实际项目中灵活运用线程池来提升系统性能。