MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 线程池执行流程的详细解析

2023-10-291.2k 阅读

Java 线程池执行流程的详细解析

在Java并发编程领域,线程池是一种强大且常用的工具,它有效地管理和复用线程,提升系统性能和资源利用率。理解线程池的执行流程对于编写高效、稳定的并发程序至关重要。本文将深入剖析Java线程池的执行流程,并通过代码示例辅助理解。

线程池基础概念

在深入探讨执行流程之前,先了解一些线程池相关的基础概念。

线程池:是一个管理线程的池化技术,它维护着一组线程,这组线程可以被重复使用来执行多个任务。使用线程池可以避免频繁创建和销毁线程带来的开销,提高系统的响应速度和吞吐量。

任务:在线程池中,任务指的是那些需要被线程执行的工作单元。在Java中,任务通常以RunnableCallable接口的实现类的形式存在。Runnable接口的实现类代表一个没有返回值的任务,而Callable接口的实现类代表一个有返回值的任务。

核心线程和非核心线程:线程池中有核心线程和非核心线程的概念。核心线程是线程池中始终保持存活的线程数量(即使它们处于空闲状态),除非设置了allowCoreThreadTimeOuttrue。非核心线程是在核心线程都处于繁忙状态,且任务队列已满时,为了处理新任务而临时创建的线程。当这些非核心线程空闲时间超过keepAliveTime时,会被销毁。

任务队列:线程池使用一个任务队列来存放等待执行的任务。常见的任务队列有ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue等。不同的任务队列特性会影响线程池的行为。

线程池的创建

在Java中,通过ThreadPoolExecutor类来创建线程池。ThreadPoolExecutor的构造函数有多个重载版本,其最完整的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:核心线程数。
  • maximumPoolSize:线程池允许的最大线程数,包括核心线程和非核心线程。
  • keepAliveTime:非核心线程的存活时间,当非核心线程空闲时间超过这个值时,会被销毁。
  • unitkeepAliveTime的时间单位。
  • workQueue:任务队列,用于存放等待执行的任务。
  • threadFactory:线程工厂,用于创建线程。通过线程工厂可以定制线程的名称、优先级等属性。
  • handler:拒绝策略,当线程池无法接受新任务时(线程数达到maximumPoolSize且任务队列已满),会调用拒绝策略来处理该任务。

除了通过ThreadPoolExecutor构造函数创建线程池,Java还提供了一些工具类如Executors来创建常见类型的线程池,例如:

  • newFixedThreadPool(int nThreads):创建一个固定大小的线程池,核心线程数和最大线程数都为nThreads,任务队列是LinkedBlockingQueue
  • newCachedThreadPool():创建一个可缓存的线程池,核心线程数为0,最大线程数为Integer.MAX_VALUE,任务队列是SynchronousQueue
  • newSingleThreadExecutor():创建一个单线程的线程池,核心线程数和最大线程数都为1,任务队列是LinkedBlockingQueue

虽然Executors工具类创建线程池很方便,但在实际生产环境中,建议直接使用ThreadPoolExecutor构造函数创建线程池,这样可以更明确地控制线程池的参数,避免因使用默认参数而带来的性能问题。

线程池执行流程详细分析

当一个新任务提交到线程池时,其执行流程如下:

  1. 判断核心线程是否已满:线程池首先检查核心线程是否都在执行任务。如果核心线程数小于corePoolSize,则创建一个新的核心线程来执行该任务。
  2. 核心线程已满,判断任务队列是否已满:如果核心线程都在执行任务(即核心线程数达到corePoolSize),则将任务放入任务队列中。如果任务队列未满,任务将在队列中等待,直到有核心线程空闲来执行它。
  3. 任务队列已满,判断是否可以创建非核心线程:如果任务队列已满,线程池会检查当前线程数是否小于maximumPoolSize。如果是,则创建一个新的非核心线程来执行该任务。
  4. 线程数达到最大,执行拒绝策略:如果当前线程数已经达到maximumPoolSize且任务队列已满,线程池将无法接受新任务,此时会执行拒绝策略。

下面通过代码示例来演示线程池的执行流程:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutionFlowExample {
    public static void main(String[] args) {
        // 创建任务队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(3);
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // 核心线程数
                4, // 最大线程数
                10, // 非核心线程存活时间
                TimeUnit.SECONDS,
                workQueue,
                r -> {
                    Thread t = new Thread(r);
                    t.setName("CustomThread-" + t.getId());
                    return t;
                },
                new ThreadPoolExecutor.AbortPolicy()
        );

        // 提交任务
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is executing task " + taskNumber);
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println(Thread.currentThread().getName() + " has finished task " + taskNumber);
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,我们创建了一个线程池,核心线程数为2,最大线程数为4,任务队列容量为3。然后提交10个任务,每个任务执行2秒。

  • 初始阶段:前两个任务提交时,因为核心线程数未满(核心线程数为2),所以会创建两个核心线程来执行这两个任务。
  • 任务队列填充阶段:继续提交任务,当提交到第3、4、5个任务时,核心线程已满,这三个任务会被放入任务队列中等待执行。
  • 非核心线程创建阶段:再提交任务,当提交到第6、7、8个任务时,任务队列已满,且当前线程数(2个核心线程)小于最大线程数(4),所以会创建两个非核心线程来执行这三个任务中的两个(因为任务队列中有3个任务,此时线程池中有2个核心线程 + 2个非核心线程,共4个线程同时执行任务)。
  • 拒绝策略阶段:当提交第9、10个任务时,线程数已达到最大线程数(4)且任务队列已满,此时会执行拒绝策略(这里使用的是AbortPolicy,会抛出RejectedExecutionException异常)。

线程池的拒绝策略

当线程池无法接受新任务时(线程数达到maximumPoolSize且任务队列已满),会执行拒绝策略。Java提供了以下几种内置的拒绝策略:

  • AbortPolicy:默认的拒绝策略,当任务被拒绝时,直接抛出RejectedExecutionException异常。
  • CallerRunsPolicy:当任务被拒绝时,会在调用execute方法的线程中直接执行该任务。这样做的好处是可以降低新任务的提交速度,减轻线程池的压力。
  • DiscardPolicy:当任务被拒绝时,直接丢弃该任务,不做任何处理。
  • DiscardOldestPolicy:当任务被拒绝时,会丢弃任务队列中最老的一个任务(即最先进入队列的任务),然后尝试将新任务放入任务队列。

可以通过实现RejectedExecutionHandler接口来自定义拒绝策略。例如:

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task " + r + " is rejected, custom handling...");
        // 这里可以添加自定义的处理逻辑,比如记录日志、将任务放入其他队列等
        throw new RejectedExecutionException("Task rejected from " + executor);
    }
}

然后在创建线程池时使用自定义的拒绝策略:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        2,
        4,
        10,
        TimeUnit.SECONDS,
        workQueue,
        new CustomThreadFactory(),
        new CustomRejectedExecutionHandler()
);

线程池的关闭

线程池使用完毕后,需要进行关闭操作,以释放资源。线程池提供了两个关闭方法:

  • shutdown():启动一个有序关闭过程,不再接受新任务,但会继续执行已提交到任务队列中的任务。调用该方法后,线程池的状态会变为SHUTDOWN
  • shutdownNow():尝试停止所有正在执行的任务,停止处理等待任务队列中的任务,并返回等待执行的任务列表。调用该方法后,线程池的状态会变为STOP

通常情况下,建议先调用shutdown()方法,如果在指定时间内线程池没有正常关闭,可以再调用shutdownNow()方法。例如:

executor.shutdown();
try {
    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
        executor.shutdownNow();
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            System.err.println("Pool did not terminate");
        }
    }
} catch (InterruptedException ie) {
    executor.shutdownNow();
    Thread.currentThread().interrupt();
}

在上述代码中,首先调用shutdown()方法,然后等待60秒,如果线程池在60秒内没有关闭,则调用shutdownNow()方法,再等待60秒,如果仍然没有关闭,则输出错误信息。

线程池的监控与调优

为了确保线程池在应用程序中高效运行,需要对其进行监控和调优。

监控指标

  • 线程池状态:可以通过getPoolSize()获取当前线程池中的线程数,getActiveCount()获取正在执行任务的线程数,getQueue().size()获取任务队列中的任务数量等。
  • 任务执行情况:通过getCompletedTaskCount()获取已完成的任务数量,getTaskCount()获取已提交的任务总数。

调优策略

  • 调整核心线程数和最大线程数:如果任务执行时间短且数量多,可以适当增加核心线程数,减少任务在队列中的等待时间。如果任务执行时间长且数量少,可以适当减小最大线程数,避免过多线程占用系统资源。
  • 选择合适的任务队列:根据任务的特性选择合适的任务队列。如果任务优先级较高,可以使用PriorityBlockingQueue;如果需要快速处理任务,可以使用SynchronousQueue

通过合理地监控和调优线程池,可以提高系统的并发性能和稳定性。

线程池在实际项目中的应用场景

线程池在实际项目中有广泛的应用场景:

  • Web服务器:处理大量的HTTP请求,通过线程池复用线程,提高服务器的响应速度和吞吐量。
  • 数据处理:如批量数据的导入、导出,数据的计算等任务,可以使用线程池并行处理,提高处理效率。
  • 消息队列消费:在消息队列系统中,消费者可以使用线程池来处理接收到的消息,提高消息处理的并发能力。

总结

Java线程池是一种强大的并发编程工具,理解其执行流程、拒绝策略、关闭方法以及监控调优对于编写高效、稳定的并发程序至关重要。通过合理配置线程池参数和选择合适的任务队列、拒绝策略等,可以充分发挥线程池的优势,提升系统的性能和资源利用率。在实际项目中,应根据具体的业务场景和需求来灵活运用线程池技术。希望通过本文的介绍和代码示例,读者能对Java线程池的执行流程有更深入的理解,并能在实际开发中熟练运用线程池解决并发问题。