MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 线程池执行流程剖析

2022-09-081.2k 阅读

Java 线程池基础概念

在多线程编程中,线程池是一种非常重要的工具。它可以帮助我们管理和复用线程,从而提高系统的性能和资源利用率。Java 提供了 java.util.concurrent 包来支持线程池的实现。

线程池的定义与作用

线程池是一种池化技术,它维护着一组线程。当有任务需要执行时,线程池会从池中取出一个空闲线程来执行任务,任务执行完毕后,线程不会被销毁,而是返回到线程池中等待下一个任务。这样可以避免频繁创建和销毁线程带来的开销。

线程池的主要作用包括:

  1. 提高性能:减少线程创建和销毁的开销,提高任务执行效率。
  2. 资源控制:可以限制线程的数量,避免线程过多导致系统资源耗尽。
  3. 管理方便:统一管理线程,便于监控和调优。

线程池相关类和接口

  1. Executor 接口:这是一个顶层接口,定义了一个方法 execute(Runnable task),用于提交任务。它只提供了最基本的任务提交功能。
public interface Executor {
    void execute(Runnable task);
}
  1. ExecutorService 接口:继承自 Executor 接口,扩展了一些生命周期管理方法,如 shutdown()shutdownNow(),以及提交任务并返回结果的方法,如 submit(Callable<T> task)
public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    <T> Future<T> submit(Callable<T> task);
    // 其他方法
}
  1. ScheduledExecutorService 接口:继承自 ExecutorService 接口,提供了任务调度功能,如 schedule(Runnable task, long delay, TimeUnit unit) 用于延迟执行任务,scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) 用于按固定速率执行任务。
public interface ScheduledExecutorService extends ExecutorService {
    public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit);
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit);
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit);
}
  1. ThreadPoolExecutor:这是线程池的核心实现类,实现了 ExecutorService 接口。它提供了丰富的构造函数和方法来配置和管理线程池,如设置核心线程数、最大线程数、存活时间等。
public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    // 其他构造函数和方法
}
  1. ScheduledThreadPoolExecutor:继承自 ThreadPoolExecutor 类,实现了 ScheduledExecutorService 接口,用于实现任务的调度。

线程池的创建与配置

使用 Executors 工具类创建线程池

Executors 类提供了一些静态方法来方便地创建不同类型的线程池:

  1. newFixedThreadPool(int nThreads):创建一个固定大小的线程池,核心线程数和最大线程数都为 nThreads,线程池中的线程一直存活。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
  1. newCachedThreadPool():创建一个可缓存的线程池,核心线程数为 0,最大线程数为 Integer.MAX_VALUE。如果线程池中有空闲线程,则复用空闲线程;如果没有,则创建新线程。线程如果 60 秒内没有被使用,则会被回收。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
  1. newSingleThreadExecutor():创建一个单线程的线程池,核心线程数和最大线程数都为 1,保证所有任务按照顺序执行。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
  1. newScheduledThreadPool(int corePoolSize):创建一个支持任务调度的线程池,核心线程数为 corePoolSize,最大线程数为 Integer.MAX_VALUE
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);

虽然 Executors 工具类创建线程池很方便,但在实际生产环境中,不推荐使用 newFixedThreadPoolnewCachedThreadPoolnewSingleThreadExecutor,因为它们可能会导致 OOM(OutOfMemoryError)问题。例如,newFixedThreadPoolnewSingleThreadExecutor 使用的是无界队列 LinkedBlockingQueue,当任务提交速度大于任务处理速度时,队列会不断增长,最终导致内存溢出。newCachedThreadPool 由于最大线程数为 Integer.MAX_VALUE,当请求过多时,可能会创建大量线程,耗尽系统资源。

使用 ThreadPoolExecutor 构造函数创建线程池

推荐使用 ThreadPoolExecutor 的构造函数来创建线程池,这样可以根据实际需求进行更细粒度的配置。

BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    3, // 核心线程数
    5, // 最大线程数
    10, // 存活时间
    TimeUnit.SECONDS,
    workQueue,
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.AbortPolicy()
);
  1. 核心线程数(corePoolSize:线程池中保持存活的最小线程数。即使这些线程处于空闲状态,也不会被销毁,除非设置了 allowCoreThreadTimeOuttrue
  2. 最大线程数(maximumPoolSize:线程池中允许的最大线程数。当任务队列已满且核心线程都在忙碌时,会创建新的线程,直到线程数达到 maximumPoolSize
  3. 存活时间(keepAliveTime:当线程数大于核心线程数时,多余的空闲线程的存活时间。超过这个时间,空闲线程会被销毁。
  4. 时间单位(unit:存活时间的时间单位,如 TimeUnit.SECONDSTimeUnit.MINUTES 等。
  5. 任务队列(workQueue:用于存储等待执行的任务的队列。常见的队列有 ArrayBlockingQueue(有界队列)、LinkedBlockingQueue(无界队列)、SynchronousQueue(不存储任务,直接提交给线程执行)等。
  6. 线程工厂(threadFactory:用于创建线程的工厂。可以通过自定义线程工厂来设置线程的名称、优先级等属性。
  7. 拒绝策略(handler:当任务队列已满且线程数达到最大线程数时,新任务的处理策略。常见的拒绝策略有:
    • AbortPolicy:默认策略,直接抛出 RejectedExecutionException 异常。
    • CallerRunsPolicy:将任务交给调用者所在的线程执行。
    • DiscardPolicy:直接丢弃任务,不做任何处理。
    • DiscardOldestPolicy:丢弃队列中最老的任务,然后尝试提交新任务。

线程池执行流程剖析

当一个任务提交到线程池时,其执行流程如下:

  1. 判断核心线程是否已满:线程池会首先检查核心线程是否都在忙碌。如果核心线程数小于 corePoolSize,则创建一个新的核心线程来执行任务。
  2. 核心线程已满,判断任务队列是否已满:如果核心线程都在忙碌,即核心线程数达到了 corePoolSize,则任务会被放入任务队列 workQueue 中等待执行。如果任务队列还有空间,则任务成功进入队列等待。
  3. 任务队列已满,判断最大线程数:如果任务队列已满,即 workQueue 无法再容纳新任务,线程池会检查当前线程数是否小于 maximumPoolSize。如果小于 maximumPoolSize,则创建一个新的非核心线程来执行任务。
  4. 线程数达到最大线程数,执行拒绝策略:如果线程数已经达到 maximumPoolSize,即所有核心线程和非核心线程都在忙碌,且任务队列已满,此时线程池会根据设置的拒绝策略来处理新任务。

下面通过一个简单的代码示例来演示线程池的执行流程:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutionFlowExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, // 核心线程数
            4, // 最大线程数
            10, // 存活时间
            TimeUnit.SECONDS,
            workQueue,
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
        );

        for (int i = 0; i < 6; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is being executed by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " finished");
            });
        }

        executor.shutdown();
    }
}

在上述代码中,我们创建了一个线程池,核心线程数为 2,最大线程数为 4,任务队列容量为 2。然后提交 6 个任务。根据线程池的执行流程,前 2 个任务会由核心线程执行,接下来 2 个任务会放入任务队列,再接下来 2 个任务由于任务队列已满,会创建新的非核心线程来执行。

线程池的生命周期管理

线程池有以下几种状态:

  1. RUNNING:运行状态,线程池可以接受新任务并处理任务队列中的任务。
  2. SHUTDOWN:关闭状态,线程池不再接受新任务,但会继续处理任务队列中的任务。调用 shutdown() 方法会使线程池进入此状态。
  3. STOP:停止状态,线程池不再接受新任务,也不再处理任务队列中的任务,并且会中断正在执行的任务。调用 shutdownNow() 方法会使线程池进入此状态。
  4. TERMINATED:终止状态,线程池中的所有任务都已执行完毕,线程池已完全终止。

可以通过 isShutdown() 方法判断线程池是否已关闭,通过 isTerminated() 方法判断线程池是否已终止。

ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任务
executor.submit(() -> System.out.println("Task is running"));
// 关闭线程池
executor.shutdown();
while (!executor.isTerminated()) {
    // 等待线程池终止
}
System.out.println("ThreadPool is terminated");

线程池的监控与调优

线程池监控指标

  1. 任务提交数量:可以统计提交到线程池的任务总数,了解系统的负载情况。
  2. 任务执行完成数量:统计已经执行完成的任务数量,与任务提交数量对比,可以了解任务的执行效率。
  3. 活跃线程数:当前正在执行任务的线程数量,反映线程池的繁忙程度。
  4. 队列任务数量:任务队列中等待执行的任务数量,通过监控队列任务数量,可以判断任务提交速度和处理速度是否匹配。

ThreadPoolExecutor 类提供了一些方法来获取这些监控指标,如 getTaskCount() 获取任务提交总数,getCompletedTaskCount() 获取已完成任务数,getActiveCount() 获取活跃线程数,getQueue().size() 获取队列任务数量。

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    3, 5, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>()
);
for (int i = 0; i < 10; i++) {
    executor.submit(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}
System.out.println("Task count: " + executor.getTaskCount());
System.out.println("Completed task count: " + executor.getCompletedTaskCount());
System.out.println("Active thread count: " + executor.getActiveCount());
System.out.println("Queue size: " + executor.getQueue().size());

线程池调优

  1. 核心线程数的调整:核心线程数应该根据任务的类型和系统资源来确定。如果任务是 CPU 密集型的,核心线程数一般设置为 CPU 核心数;如果任务是 I/O 密集型的,可以适当增加核心线程数,一般为 CPU 核心数的 2 倍左右。
  2. 最大线程数的调整:最大线程数需要考虑系统资源,如内存、CPU 等。如果设置过大,可能会导致系统资源耗尽;如果设置过小,可能无法充分利用系统资源。可以通过压测来确定合适的最大线程数。
  3. 任务队列的选择:根据任务的特点选择合适的任务队列。如果任务执行时间较短且任务量较大,可以选择有界队列,如 ArrayBlockingQueue,以避免队列无限增长导致内存溢出;如果任务执行时间较长且任务量相对较小,可以选择无界队列,如 LinkedBlockingQueue
  4. 拒绝策略的选择:根据业务需求选择合适的拒绝策略。如果希望在任务被拒绝时抛出异常,可以选择 AbortPolicy;如果希望将任务交给调用者执行,可以选择 CallerRunsPolicy;如果希望丢弃任务,可以选择 DiscardPolicyDiscardOldestPolicy

线程池在实际项目中的应用场景

  1. Web 服务器:处理大量的 HTTP 请求,通过线程池可以提高请求处理效率,避免频繁创建和销毁线程。
  2. 异步任务处理:如发送邮件、生成报表等异步任务,可以使用线程池来管理这些任务,提高系统的响应速度。
  3. 大数据处理:在大数据计算中,需要处理大量的数据,可以使用线程池来并行处理数据,提高处理速度。

以一个简单的 Web 服务器示例来说明:

import java.io.IOException;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class WebServer {
    private static final int PORT = 8080;
    private static final ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        try (ServerSocket serverSocket = new ServerSocket(PORT)) {
            System.out.println("Server started on port " + PORT);
            while (true) {
                Socket clientSocket = serverSocket.accept();
                executor.submit(() -> {
                    try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {
                        out.println("HTTP/1.1 200 OK");
                        out.println("Content-Type: text/html");
                        out.println();
                        out.println("<html><body>Hello, World!</body></html>");
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            clientSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,使用线程池来处理每个客户端的 HTTP 请求,提高了服务器的并发处理能力。

线程池与并发编程的注意事项

  1. 线程安全问题:在多线程环境下,要注意共享资源的线程安全。可以使用 synchronized 关键字、Lock 接口等方式来保证线程安全。
  2. 任务异常处理:当任务在执行过程中抛出异常时,默认情况下线程池不会将异常直接抛出。可以通过 Future 接口来获取任务执行结果并处理异常,或者自定义线程工厂,在创建线程时设置 UncaughtExceptionHandler 来处理异常。
ExecutorService executor = Executors.newFixedThreadPool(5);
Future<?> future = executor.submit(() -> {
    throw new RuntimeException("Task exception");
});
try {
    future.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
  1. 资源泄漏:如果在线程池中使用了一些需要手动关闭的资源,如数据库连接、文件句柄等,要确保在任务执行完毕后及时关闭这些资源,以避免资源泄漏。

总结

线程池是 Java 并发编程中非常重要的工具,通过合理配置和使用线程池,可以显著提高系统的性能和资源利用率。了解线程池的执行流程、生命周期管理、监控与调优等方面的知识,对于编写高效、稳定的多线程应用程序至关重要。在实际项目中,需要根据具体的业务需求和系统环境来选择合适的线程池配置和任务处理策略,以达到最佳的性能和可靠性。同时,要注意线程安全、任务异常处理和资源泄漏等问题,确保多线程程序的正确性和稳定性。通过不断地实践和优化,能够更好地发挥线程池在并发编程中的优势。

以上就是关于 Java 线程池执行流程的详细剖析,希望对你理解和使用线程池有所帮助。在实际应用中,要根据具体场景灵活运用线程池的各种特性,以实现高效、稳定的多线程编程。