MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 线程池创建线程处理任务的过程

2022-12-154.5k 阅读

Java 线程池基础概念

在深入探讨 Java 线程池创建线程处理任务的过程之前,我们先来了解一些线程池的基础概念。

线程池是一种管理和复用线程的机制,它避免了频繁创建和销毁线程带来的开销。线程池维护着一个线程队列,当有任务到来时,线程池从队列中取出一个空闲线程来执行任务,任务执行完毕后,线程并不会被销毁,而是重新回到线程池中等待下一个任务,这样就实现了线程的复用。

Java 中的线程池主要由 java.util.concurrent.Executor 框架来实现,其中核心的接口和类包括 ExecutorExecutorServiceThreadPoolExecutor 以及一些工具类如 Executors

  • Executor 接口:这是一个基础接口,它只定义了一个方法 execute(Runnable task),用于提交一个任务。这个接口并没有提供对线程池生命周期的管理,也没有返回任务执行结果的机制。
public interface Executor {
    void execute(Runnable task);
}
  • ExecutorService 接口:继承自 Executor 接口,提供了更丰富的方法,比如关闭线程池的方法 shutdown()shutdownNow(),以及提交任务并获取执行结果的方法 submit(Callable<T> task) 等。
public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    <T> Future<T> submit(Callable<T> task);
    // 还有其他重载的 submit 方法
}
  • ThreadPoolExecutor 类:是线程池的核心实现类,它提供了丰富的构造函数,可以灵活地配置线程池的参数,如核心线程数、最大线程数、存活时间等。
public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        // 构造函数实现
    }
    // 还有其他重载的构造函数
}
  • Executors 工具类:提供了一些静态方法来方便地创建不同类型的线程池,比如 newFixedThreadPool(int nThreads) 创建固定大小的线程池,newCachedThreadPool() 创建可缓存的线程池,newSingleThreadExecutor() 创建单线程的线程池等。但需要注意的是,这些方法创建的线程池在某些场景下可能存在风险,比如 newFixedThreadPoolnewSingleThreadExecutor 创建的线程池使用的是无界队列 LinkedBlockingQueue,可能会导致内存溢出问题;newCachedThreadPool 创建的线程池最大线程数为 Integer.MAX_VALUE,可能会创建过多线程导致系统资源耗尽。

线程池参数详解

  1. 核心线程数(corePoolSize):线程池中会一直存活的线程数量,即使这些线程处于空闲状态,也不会被销毁(除非设置了 allowCoreThreadTimeOuttrue)。当有新任务提交时,如果当前线程池中的线程数小于核心线程数,会直接创建新线程来执行任务。

  2. 最大线程数(maximumPoolSize):线程池允许创建的最大线程数量。当任务队列已满,且当前线程池中的线程数小于最大线程数时,会创建新线程来执行任务。

  3. 存活时间(keepAliveTime):当线程池中的线程数量超过核心线程数时,多余的空闲线程在等待新任务到来的时间超过这个存活时间后,会被销毁。

  4. 时间单位(unit):存活时间的时间单位,是 TimeUnit 枚举类型,比如 TimeUnit.SECONDSTimeUnit.MILLISECONDS 等。

  5. 任务队列(workQueue):用于存放等待执行的任务的队列。当核心线程都在忙碌时,新提交的任务会被放入这个队列中。常见的任务队列类型有:

    • ArrayBlockingQueue:基于数组的有界阻塞队列,按 FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:基于链表的无界阻塞队列,同样按 FIFO 原则对元素进行排序。使用这个队列时要注意,如果任务提交速度过快,可能会导致队列无限增长,从而耗尽内存。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等待另一个线程的移除操作,反之亦然。这种队列适合处理任务速度非常快的场景,因为它不会存储任务,所以不会造成任务积压。
    • PriorityBlockingQueue:具有优先级的无界阻塞队列,元素按照自然顺序或者自定义的比较器进行排序。
  6. 线程工厂(ThreadFactory):用于创建线程的工厂。通过自定义线程工厂,可以设置线程的名称、优先级、是否为守护线程等属性。

  7. 拒绝策略(RejectedExecutionHandler):当线程池和任务队列都已满,无法再接受新任务时,会触发拒绝策略。常见的拒绝策略有:

    • AbortPolicy:默认的拒绝策略,直接抛出 RejectedExecutionException 异常。
    • CallerRunsPolicy:将任务回退给调用者(提交任务的线程)来执行,这样可以降低新任务的提交速度。
    • DiscardPolicy:直接丢弃新提交的任务,不做任何处理。
    • DiscardOldestPolicy:丢弃任务队列中最老的任务(队首的任务),然后尝试将新任务放入队列。

Java 线程池创建线程处理任务的详细过程

  1. 任务提交:当调用 execute(Runnable task)submit(Callable<T> task) 方法提交任务时,线程池开始处理任务。

  2. 核心线程检查:线程池首先检查当前运行的线程数是否小于核心线程数 corePoolSize。如果小于,会创建一个新的核心线程来执行任务。

  3. 任务队列处理:如果当前运行的线程数已经达到核心线程数,那么新提交的任务会被放入任务队列 workQueue 中等待执行。

  4. 最大线程数检查:如果任务队列已满,且当前运行的线程数小于最大线程数 maximumPoolSize,线程池会创建一个新的非核心线程来执行任务。

  5. 拒绝策略触发:如果任务队列已满,且当前运行的线程数已经达到最大线程数,那么线程池会根据设置的拒绝策略来处理新提交的任务,比如抛出异常、将任务回退给调用者等。

  6. 线程复用:当一个线程执行完任务后,它不会被立即销毁(除非是多余的空闲线程且超过了存活时间),而是会回到线程池中,等待下一个任务到来。这样就实现了线程的复用,减少了线程创建和销毁的开销。

代码示例

下面通过一个简单的代码示例来演示线程池创建线程处理任务的过程。

import java.util.concurrent.*;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池,核心线程数和最大线程数都为 3
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // 定义任务
        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            Runnable task = () -> {
                System.out.println("Task " + taskNumber + " is being processed by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " is completed.");
            };

            // 提交任务
            executorService.submit(task);
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在这个示例中,我们创建了一个固定大小为 3 的线程池。首先提交的 3 个任务会分别由 3 个核心线程来执行,当提交第 4 个和第 5 个任务时,由于核心线程都在忙碌,这两个任务会被放入任务队列(LinkedBlockingQueue,因为 newFixedThreadPool 使用的是这个队列)中等待执行。当某个核心线程执行完任务后,会从任务队列中取出一个任务继续执行。

我们也可以通过自定义 ThreadPoolExecutor 来更灵活地配置线程池参数。

import java.util.concurrent.*;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // 自定义线程池参数
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(3);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();

        // 创建自定义线程池
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                threadFactory,
                handler);

        // 定义任务
        for (int i = 0; i < 7; i++) {
            int taskNumber = i;
            Runnable task = () -> {
                System.out.println("Task " + taskNumber + " is being processed by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " is completed.");
            };

            // 提交任务
            executorService.submit(task);
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在这个自定义线程池的示例中,核心线程数为 2,最大线程数为 4,任务队列大小为 3。当提交前 2 个任务时,会创建 2 个核心线程来执行。提交第 3、4、5 个任务时,会将任务放入任务队列。当提交第 6 个任务时,由于任务队列已满且当前线程数小于最大线程数,会创建一个新的非核心线程来执行。当提交第 7 个任务时,由于任务队列已满且线程数已达到最大线程数,会触发拒绝策略(这里是 AbortPolicy,会抛出 RejectedExecutionException 异常)。

线程池的生命周期管理

线程池有几个重要的生命周期状态,分别是 RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED

  1. RUNNING:线程池的初始状态,能够接受新任务并处理任务队列中的任务。

  2. SHUTDOWN:调用 shutdown() 方法后,线程池进入此状态。此时线程池不再接受新任务,但会继续处理任务队列中已有的任务。

  3. STOP:调用 shutdownNow() 方法后,线程池进入此状态。线程池会停止所有正在执行的任务,清空任务队列,并尝试中断所有等待任务的线程。

  4. TIDYING:当所有任务都执行完毕,且任务队列也为空,线程池进入此状态。在这个状态下,会调用 terminated() 方法,这个方法可以被子类重写,用于执行一些资源清理等操作。

  5. TERMINATEDterminated() 方法执行完毕后,线程池进入此状态,表示线程池已完全终止。

我们可以通过 isShutdown() 方法判断线程池是否已经调用了 shutdown()shutdownNow() 方法,通过 isTerminated() 方法判断线程池是否已经完全终止。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolLifeCycleExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 提交任务
        executorService.submit(() -> {
            System.out.println("Task 1 is being processed by " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Task 1 is completed.");
        });

        // 关闭线程池
        executorService.shutdown();

        // 检查线程池状态
        while (!executorService.isTerminated()) {
            System.out.println("ThreadPool is still running...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("ThreadPool has terminated.");
    }
}

在这个示例中,我们提交一个任务后调用 shutdown() 方法关闭线程池,然后通过循环检查 isTerminated() 方法的返回值,直到线程池完全终止。

线程池的性能优化

  1. 合理设置线程池参数:根据任务的类型(CPU 密集型、IO 密集型等)和系统资源来合理设置核心线程数、最大线程数和任务队列大小等参数。对于 CPU 密集型任务,核心线程数可以设置为 CPU 核心数;对于 IO 密集型任务,可以适当增加核心线程数,因为线程在等待 IO 操作时会处于空闲状态,增加核心线程数可以提高系统利用率。

  2. 选择合适的任务队列:如果任务执行速度较快,可以选择 SynchronousQueue,避免任务积压;如果任务执行速度较慢,且需要控制任务数量,可以选择有界队列,如 ArrayBlockingQueue,防止内存溢出。

  3. 使用合适的拒绝策略:根据业务需求选择合适的拒绝策略。如果任务非常重要,不能丢弃,可以选择 CallerRunsPolicy 或自定义拒绝策略,将任务回退给调用者处理。

  4. 监控和调优:通过 ThreadPoolExecutor 提供的一些方法,如 getTaskCount()getCompletedTaskCount()getActiveCount() 等,来监控线程池的运行状态,根据监控结果进行参数调优。

线程池在实际项目中的应用场景

  1. Web 服务器:处理大量的 HTTP 请求,每个请求可以作为一个任务提交到线程池中处理,避免为每个请求创建新线程带来的开销。

  2. 分布式计算:在分布式系统中,将计算任务分发到不同的节点上,每个节点可以使用线程池来处理任务,提高计算效率。

  3. 消息队列消费:在消息队列系统中,消费者可以使用线程池来处理接收到的消息,提高消息处理的并发能力。

  4. 定时任务:通过线程池来执行定时任务,如定期的数据清理、报表生成等任务。

通过深入理解 Java 线程池创建线程处理任务的过程以及合理应用线程池,可以显著提高程序的性能和并发处理能力,在实际项目开发中发挥重要作用。在使用线程池时,要根据具体的业务场景和系统资源进行合理配置和优化,以达到最佳的效果。同时,要注意线程安全问题,避免出现数据竞争和死锁等情况。通过不断地实践和总结经验,能够更好地利用线程池这一强大的工具来构建高效、稳定的应用程序。在实际应用中,还可以结合 AOP(面向切面编程)等技术,对线程池的使用进行统一的监控和管理,进一步提高系统的可维护性和性能。例如,可以通过 AOP 切面来记录线程池的任务提交、执行时间、异常等信息,方便进行性能分析和问题排查。此外,在多线程编程中,还需要关注线程上下文切换的开销,尽量减少不必要的上下文切换,提高系统的整体性能。在设计线程池时,也要考虑到系统的扩展性,以便在业务增长时能够方便地对线程池进行调整和优化。总之,Java 线程池是一个功能强大且复杂的工具,需要开发者深入理解并灵活运用,才能充分发挥其优势,构建出高性能、高并发的应用程序。