MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java中ThreadPoolExecutor的核心原理

2024-01-316.3k 阅读

Java线程池基础概念

在深入探讨ThreadPoolExecutor核心原理之前,先简单回顾一下线程池的基本概念。线程池是一种基于池化思想管理线程的工具,它避免了在处理任务时频繁创建和销毁线程带来的开销。通过预先创建一定数量的线程,将任务提交到线程池中,由线程池中的线程负责执行,这样可以有效提高系统性能和资源利用率。

ThreadPoolExecutor类结构与构造函数

ThreadPoolExecutor是Java并发包java.util.concurrent中用于创建线程池的核心类。它继承自AbstractExecutorService,实现了ExecutorService接口。其构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:核心线程数,线程池中会一直存活的线程数量,即使这些线程处于空闲状态。除非设置了allowCoreThreadTimeOuttrue,否则核心线程不会因为超时而被终止。
  • maximumPoolSize:线程池允许的最大线程数。当任务队列已满且活动线程数小于最大线程数时,线程池会创建新的线程来处理任务。
  • keepAliveTime:非核心线程在空闲状态下的存活时间。当线程池中的线程数量超过核心线程数,并且有线程处于空闲状态超过这个时间,这些空闲的非核心线程将会被终止。
  • unitkeepAliveTime的时间单位,是TimeUnit枚举类型,例如TimeUnit.SECONDSTimeUnit.MILLISECONDS等。
  • workQueue:任务队列,用于存放等待执行的任务。当线程池中的所有核心线程都在忙碌时,新提交的任务会被放入这个队列中。常用的任务队列有ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue等。
  • threadFactory:线程工厂,用于创建新的线程。通过自定义线程工厂,可以对线程的名称、优先级等属性进行设置。
  • handler:拒绝策略,当任务队列已满且线程池中的线程数量达到最大线程数时,新提交的任务会被拒绝。RejectedExecutionHandler接口提供了几种常见的拒绝策略实现,如AbortPolicy(默认策略,直接抛出异常)、CallerRunsPolicy(将任务回退给调用者执行)、DiscardPolicy(直接丢弃任务)、DiscardOldestPolicy(丢弃队列中最老的任务,然后尝试重新提交当前任务)。

任务提交与执行流程

当调用ThreadPoolExecutorexecute(Runnable task)方法提交一个任务时,其执行流程如下:

  1. 核心线程判断:如果当前线程池中的线程数量小于corePoolSize,则会创建一个新的核心线程来执行任务。
  2. 任务队列判断:如果当前线程池中的线程数量已经达到corePoolSize,则会将任务放入任务队列workQueue中。
  3. 最大线程数判断:如果任务队列已满,且当前线程池中的线程数量小于maximumPoolSize,则会创建一个新的非核心线程来执行任务。
  4. 拒绝策略执行:如果任务队列已满,且当前线程池中的线程数量已经达到maximumPoolSize,则会根据设置的拒绝策略handler来处理该任务。

下面通过一个简单的代码示例来演示任务提交与执行流程:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorExample {
    public static void main(String[] args) {
        // 创建任务队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // corePoolSize
                4, // maximumPoolSize
                10, // keepAliveTime
                TimeUnit.SECONDS,
                workQueue,
                r -> {
                    Thread t = new Thread(r);
                    t.setName("CustomThread-" + t.getId());
                    return t;
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 提交任务
        for (int i = 0; i < 6; i++) {
            int taskNumber = i;
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is processing task " + taskNumber);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}

在上述示例中,我们创建了一个核心线程数为2,最大线程数为4,任务队列容量为2的线程池。然后提交6个任务,观察任务的执行情况。根据任务提交与执行流程,前2个任务会由核心线程执行,接下来2个任务会被放入任务队列,再接下来2个任务会创建新的非核心线程执行。如果提交更多任务,由于任务队列已满且线程数达到最大线程数,会按照CallerRunsPolicy策略,由调用者线程来执行任务。

线程池状态与控制

ThreadPoolExecutor通过一个AtomicInteger类型的变量ctl来维护线程池的状态和线程数量。ctl的高3位表示线程池状态,低29位表示线程数量。线程池有以下几种状态:

  • RUNNING:运行状态,线程池可以接受新任务并处理任务队列中的任务。
  • SHUTDOWN:关闭状态,线程池不再接受新任务,但会继续处理任务队列中的任务。
  • STOP:停止状态,线程池不再接受新任务,并且会中断正在执行的任务,同时清空任务队列。
  • TIDYING:整理状态,所有任务都已终止,线程数量为0,即将进入TERMINATED状态。
  • TERMINATED:终止状态,线程池已完全终止。

ThreadPoolExecutor提供了一系列方法来控制线程池状态,例如:

  • shutdown():启动一个有序关闭,不再接受新任务,但会继续执行任务队列中的任务。
  • shutdownNow():尝试停止所有正在执行的任务,停止处理等待任务的处理,并返回等待执行的任务列表。
  • isShutdown():判断线程池是否已经启动关闭过程。
  • isTerminated():判断线程池是否已经终止。

线程池中的线程生命周期

线程池中的线程在执行任务过程中有其特定的生命周期。当一个线程被创建后,它会不断从任务队列中获取任务并执行,直到线程池关闭或者线程被终止。

  1. 线程创建:通过threadFactory创建新的线程。在ThreadPoolExecutoraddWorker方法中会调用threadFactory.newThread来创建线程。
  2. 任务获取:线程通过getTask方法从任务队列workQueue中获取任务。如果任务队列中没有任务,并且线程数量超过核心线程数,且线程处于空闲状态超过keepAliveTime,则该线程可能会被终止。
  3. 任务执行:获取到任务后,线程会执行task.run()方法来执行具体的任务逻辑。
  4. 线程终止:当线程执行完任务或者在获取任务时返回null(表示任务队列已空且线程池正在关闭),线程会执行一些清理操作,然后终止。

任务队列的选择与影响

ThreadPoolExecutor中任务队列的选择对线程池的性能和行为有重要影响。常见的任务队列类型有:

  • ArrayBlockingQueue:基于数组的有界阻塞队列,在创建时需要指定队列的容量。它按照FIFO(先进先出)的顺序对任务进行排序。由于其有界性,当队列已满且线程池达到最大线程数时,新任务会触发拒绝策略。例如:
BlockingQueue<Runnable> arrayQueue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executorWithArrayQueue = new ThreadPoolExecutor(
        3, 5, 10, TimeUnit.SECONDS, arrayQueue);
  • LinkedBlockingQueue:基于链表的无界阻塞队列(也可以通过构造函数指定容量变为有界队列)。它同样按照FIFO的顺序对任务进行排序。由于其无界性,在使用时需要注意如果任务提交速度过快,可能会导致内存耗尽。例如:
BlockingQueue<Runnable> linkedQueue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executorWithLinkedQueue = new ThreadPoolExecutor(
        3, 5, 10, TimeUnit.SECONDS, linkedQueue);
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等待另一个线程的移除操作,反之亦然。它适用于需要快速处理任务,并且不希望任务在队列中等待的场景。当使用SynchronousQueue作为任务队列时,线程池的corePoolSize通常应该设置为0,因为任务不会在队列中等待,而是直接交给线程处理,如果没有空闲线程则会创建新线程,直到达到maximumPoolSize。例如:
BlockingQueue<Runnable> syncQueue = new SynchronousQueue<>();
ThreadPoolExecutor executorWithSyncQueue = new ThreadPoolExecutor(
        0, 10, 10, TimeUnit.SECONDS, syncQueue);
  • PriorityBlockingQueue:基于堆的无界阻塞队列,它根据任务的优先级对任务进行排序。任务需要实现Comparable接口来定义优先级。在实际应用中,如果任务有不同的优先级需求,可以使用该队列。例如:
BlockingQueue<Runnable> priorityQueue = new PriorityBlockingQueue<>();
ThreadPoolExecutor executorWithPriorityQueue = new ThreadPoolExecutor(
        3, 5, 10, TimeUnit.SECONDS, priorityQueue);

拒绝策略的应用场景

  1. AbortPolicy:这是默认的拒绝策略,当任务无法提交时,直接抛出RejectedExecutionException异常。适用于对任务处理失败非常敏感的场景,例如在一些关键业务流程中,如果任务不能及时处理会导致严重后果,此时抛出异常可以让上层调用者及时知晓并采取相应措施。
ThreadPoolExecutor executorWithAbortPolicy = new ThreadPoolExecutor(
        3, 5, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new ThreadPoolExecutor.AbortPolicy());
  1. CallerRunsPolicy:将任务回退给调用者执行。这种策略可以降低新任务的提交速度,因为调用者在执行任务时会阻塞,从而减少任务的涌入。适用于对响应时间要求不高,且希望降低系统负载的场景。例如在一些后台批处理任务中,即使任务执行稍有延迟也不会影响整体业务。
ThreadPoolExecutor executorWithCallerRunsPolicy = new ThreadPoolExecutor(
        3, 5, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new ThreadPoolExecutor.CallerRunsPolicy());
  1. DiscardPolicy:直接丢弃任务,不做任何处理。适用于那些对任务丢失不太敏感,且任务执行并非至关重要的场景。例如在一些日志记录任务或者统计信息收集任务中,如果由于系统资源紧张导致部分任务丢失,对整体业务影响较小。
ThreadPoolExecutor executorWithDiscardPolicy = new ThreadPoolExecutor(
        3, 5, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new ThreadPoolExecutor.DiscardPolicy());
  1. DiscardOldestPolicy:丢弃队列中最老的任务,然后尝试重新提交当前任务。这种策略适用于希望优先处理最新提交的任务,并且对任务丢失有一定容忍度的场景。例如在一些实时数据处理场景中,新的数据可能比旧数据更有价值,丢弃旧任务以保证新任务能及时处理。
ThreadPoolExecutor executorWithDiscardOldestPolicy = new ThreadPoolExecutor(
        3, 5, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new ThreadPoolExecutor.DiscardOldestPolicy());

线程池的监控与调优

  1. 线程池监控指标
    • 任务提交数量:可以通过记录execute方法的调用次数来统计任务提交数量,了解系统的负载情况。
    • 任务执行完成数量:在任务执行完成时(例如在afterExecute方法中)进行计数,对比任务提交数量和执行完成数量,可以判断是否有任务积压或者丢失。
    • 活动线程数:通过getActiveCount方法获取当前正在执行任务的线程数量,了解线程池的繁忙程度。
    • 队列大小:通过getQueue().size()获取任务队列中等待执行的任务数量,判断任务队列是否已满或者是否有任务积压。
    • 线程池状态:通过getPoolState等相关方法了解线程池当前处于RUNNINGSHUTDOWN等哪种状态。
  2. 调优思路
    • 核心线程数调整:如果任务执行时间短且任务提交频率高,适当增加核心线程数可以提高任务处理速度,减少任务在队列中的等待时间。如果任务执行时间长,核心线程数过多可能会导致系统资源浪费,需要根据实际情况进行调整。
    • 任务队列大小调整:根据任务的特性和系统资源情况,选择合适的任务队列类型和大小。如果任务执行时间较长,有界队列的大小需要设置合理,避免队列满而触发拒绝策略。对于无界队列,要注意内存使用情况,防止内存溢出。
    • 最大线程数调整:最大线程数的设置需要考虑系统的资源限制,如CPU、内存等。如果设置过大,可能会导致系统资源耗尽;设置过小,则无法充分利用系统资源处理任务。
    • 拒绝策略优化:根据业务需求选择合适的拒绝策略。如果对任务丢失非常敏感,避免使用DiscardPolicy;如果希望对任务进行补偿处理,可以选择CallerRunsPolicy等。

通过合理监控和调优线程池,可以使其在不同的业务场景下发挥最佳性能,提高系统的稳定性和效率。例如,在一个高并发的Web应用中,通过对线程池的监控和调优,可以有效提升用户请求的处理速度,减少响应时间,提升用户体验。

与其他线程池实现的对比

Java并发包中除了ThreadPoolExecutor,还提供了一些预定义的线程池实现,如Executors工具类创建的线程池,它们与ThreadPoolExecutor有一些区别。

  1. FixedThreadPool:通过Executors.newFixedThreadPool(int nThreads)创建,它的核心线程数和最大线程数相等,任务队列是LinkedBlockingQueue(无界队列)。适用于处理固定数量并发任务的场景,由于线程数固定,不会因为任务过多而创建过多线程导致系统资源耗尽,但如果任务执行时间较长且提交速度过快,可能会导致任务在队列中无限堆积,从而占用大量内存。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
  1. CachedThreadPool:通过Executors.newCachedThreadPool()创建,它的核心线程数为0,最大线程数为Integer.MAX_VALUE,任务队列是SynchronousQueue。适用于处理大量短时间任务的场景,线程池会根据任务数量动态创建和销毁线程。但如果任务持续大量提交,可能会创建过多线程导致系统资源耗尽。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
  1. SingleThreadExecutor:通过Executors.newSingleThreadExecutor()创建,它只有一个核心线程,任务队列是LinkedBlockingQueue(无界队列)。适用于需要顺序执行任务,且不希望多个线程并发执行的场景,保证任务按顺序依次执行。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

与这些预定义线程池相比,ThreadPoolExecutor更加灵活,可以根据具体业务场景自定义核心线程数、最大线程数、任务队列、线程工厂和拒绝策略等参数,以满足不同的性能和功能需求。例如,在一个对内存使用非常敏感的应用中,使用ThreadPoolExecutor可以通过设置合适的任务队列大小和最大线程数,避免内存溢出问题,而FixedThreadPoolCachedThreadPool由于使用无界队列,可能会在高负载下导致内存问题。

总结

ThreadPoolExecutor作为Java并发包中线程池的核心实现,提供了丰富的功能和灵活的配置选项。深入理解其核心原理,包括任务提交与执行流程、线程池状态控制、任务队列和拒绝策略的选择等,对于编写高效、稳定的多线程应用至关重要。通过合理监控和调优线程池,并与其他线程池实现进行对比分析,能够根据不同的业务需求选择最合适的线程池解决方案,从而提升系统的整体性能和可靠性。在实际应用中,要充分考虑任务的特性、系统资源限制等因素,灵活运用ThreadPoolExecutor的各项功能,以达到最佳的多线程处理效果。