MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 线程池创建方法

2023-09-027.7k 阅读

线程池概述

在Java多线程编程中,线程池是一种非常重要的工具。它允许我们创建一个线程集合,这些线程可以被重复使用来执行各种任务。线程池的主要优点包括:

  1. 资源管理:通过复用已有的线程,减少了创建和销毁线程的开销。创建和销毁线程是相对昂贵的操作,涉及到操作系统内核的资源分配和回收。线程池可以避免频繁地进行这些操作,从而提高系统的性能和响应速度。
  2. 控制并发度:我们可以限制线程池中线程的数量,从而控制应用程序的并发度。这对于防止系统资源过度消耗,特别是在处理大量请求时非常重要。例如,在一个Web服务器中,如果没有限制并发线程数,过多的并发请求可能会导致服务器内存耗尽或CPU使用率过高,从而使系统崩溃。
  3. 任务队列:线程池通常会有一个任务队列,用于存储等待执行的任务。当线程池中的所有线程都在忙碌时,新的任务会被放入任务队列中等待执行。这有助于平滑处理任务的高峰和低谷,提高系统的稳定性。

Java线程池创建方法

在Java中,线程池的创建主要通过ExecutorService接口及其实现类来完成。ExecutorService接口继承自Executor接口,提供了更丰富的管理线程池生命周期和提交任务的方法。下面我们将详细介绍几种常见的创建线程池的方法。

使用Executors工厂类创建线程池

Java提供了Executors工厂类,它包含了一些静态方法来创建不同类型的线程池。

1. newFixedThreadPool(int nThreads) 这个方法创建一个固定大小的线程池,线程池中的线程数量始终保持为nThreads。当有新任务提交时,如果线程池中有空闲线程,则立即执行;如果所有线程都在忙碌,则任务会被放入任务队列中等待执行。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小为3的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " has finished.");
            });
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在上述代码中,我们创建了一个固定大小为3的线程池,并提交了5个任务。由于线程池大小为3,前3个任务会立即被执行,剩下的2个任务会被放入任务队列等待,当有线程完成任务后,会从任务队列中取出新的任务执行。最后,我们调用shutdown()方法关闭线程池,该方法不会立即终止线程池,而是不再接受新任务,并等待所有已提交任务执行完毕。

2. newSingleThreadExecutor() 此方法创建一个单线程的线程池,这个线程池只有一个线程在工作。所有任务会按照提交的顺序依次执行,保证任务执行的顺序性。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        // 创建一个单线程的线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " has finished.");
            });
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在这个例子中,5个任务会依次在唯一的线程中执行,每个任务执行完毕后,下一个任务才会开始。

3. newCachedThreadPool() 该方法创建一个可缓存的线程池。如果线程池中有空闲线程,则复用空闲线程执行任务;如果没有空闲线程,则创建新的线程来执行任务。线程池中的线程如果60秒内没有被使用,将会被回收。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个可缓存的线程池
        ExecutorService executorService = Executors.newCachedThreadPool();

        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " has finished.");
            });
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在这个示例中,如果同时提交大量任务,线程池可能会创建很多线程来处理任务,但当任务执行完毕后,空闲的线程会在60秒后被回收。

4. newScheduledThreadPool(int corePoolSize) 此方法创建一个大小可配置的线程池,用于执行定时任务或周期性任务。corePoolSize指定了线程池中的核心线程数。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个核心线程数为3的定时线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

        // 延迟3秒后执行任务
        scheduledExecutorService.schedule(() -> {
            System.out.println("Delayed task is running on thread " + Thread.currentThread().getName());
        }, 3, TimeUnit.SECONDS);

        // 延迟1秒后开始执行任务,之后每2秒执行一次
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            System.out.println("Periodic task is running on thread " + Thread.currentThread().getName());
        }, 1, 2, TimeUnit.SECONDS);

        // 关闭线程池
        scheduledExecutorService.shutdown();
    }
}

在上述代码中,我们展示了两种定时任务的执行方式:schedule方法用于延迟执行一次任务,scheduleAtFixedRate方法用于延迟开始并周期性执行任务。

使用ThreadPoolExecutor类创建线程池

虽然Executors工厂类提供了方便的线程池创建方法,但在实际生产环境中,更推荐使用ThreadPoolExecutor类直接创建线程池,因为它提供了更细粒度的控制。ThreadPoolExecutor的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  1. corePoolSize:线程池中的核心线程数。即使这些线程处于空闲状态,也不会被回收(除非设置了allowCoreThreadTimeOuttrue)。
  2. maximumPoolSize:线程池允许的最大线程数。当任务队列已满且所有核心线程都在忙碌时,线程池会创建新的线程,直到线程数达到maximumPoolSize
  3. keepAliveTime:非核心线程在空闲状态下的存活时间。当线程数超过corePoolSize时,多余的非核心线程如果在keepAliveTime时间内没有任务执行,将会被回收。
  4. unitkeepAliveTime的时间单位。
  5. workQueue:任务队列,用于存储等待执行的任务。常见的任务队列实现有ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue等。
  6. threadFactory:线程工厂,用于创建新的线程。可以通过自定义线程工厂来设置线程的名称、优先级等属性。
  7. handler:拒绝策略,当任务队列已满且线程数达到maximumPoolSize时,新提交的任务会被拒绝,由拒绝策略来处理。常见的拒绝策略有AbortPolicy(默认策略,直接抛出异常)、CallerRunsPolicy(将任务返回给调用者执行)、DiscardPolicy(直接丢弃任务)、DiscardOldestPolicy(丢弃任务队列中最老的任务,然后尝试提交新任务)。

以下是一个使用ThreadPoolExecutor创建线程池的示例:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutorExample {
    public static void main(String[] args) {
        // 创建任务队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5);

        // 创建线程池
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                2, // 核心线程数
                4, // 最大线程数
                10, // 非核心线程存活时间
                TimeUnit.SECONDS,
                workQueue,
                r -> {
                    Thread thread = new Thread(r);
                    thread.setName("CustomThread-" + thread.getId());
                    return thread;
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " has finished.");
            });
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在这个示例中,我们创建了一个核心线程数为2,最大线程数为4,任务队列容量为5的线程池。我们还自定义了线程工厂来设置线程名称,并使用了CallerRunsPolicy拒绝策略。当提交10个任务时,前2个任务会由核心线程执行,接下来5个任务会被放入任务队列,再接下来2个任务会创建新的线程执行,最后1个任务由于任务队列已满且线程数达到最大,会由调用者线程执行(因为使用了CallerRunsPolicy拒绝策略)。

线程池参数调优

  1. 核心线程数和最大线程数的选择
    • CPU密集型任务:对于CPU密集型任务,线程数不宜过多,一般设置为CPU核心数 + 1。因为CPU密集型任务主要消耗CPU资源,过多的线程会导致线程上下文切换开销增加,反而降低性能。例如,在进行复杂的数学计算、加密解密等任务时,我们可以通过Runtime.getRuntime().availableProcessors()获取CPU核心数,然后设置核心线程数和最大线程数为availableProcessors() + 1
    • I/O密集型任务:I/O密集型任务在等待I/O操作完成时,线程会处于空闲状态,此时可以设置较多的线程数来充分利用CPU资源。一般可以设置为2 * CPU核心数。比如在进行文件读写、网络请求等I/O操作时,由于I/O操作相对较慢,需要更多的线程来处理其他任务,避免CPU空闲。
  2. 任务队列的选择
    • ArrayBlockingQueue:是一个有界队列,它的容量在创建时就被固定。使用ArrayBlockingQueue可以有效地控制任务队列的大小,防止任务队列无限增长导致内存溢出。适用于需要严格控制任务数量的场景。
    • LinkedBlockingQueue:是一个无界队列(也可以创建有界的),如果不指定容量,它可以容纳无限个任务。使用LinkedBlockingQueue时要注意,如果任务提交速度过快,可能会导致内存占用不断增加,甚至内存溢出。适用于任务处理速度相对稳定,且对任务队列大小没有严格限制的场景。
    • SynchronousQueue:它不是一个真正的队列,而是一个直接提交的队列。每个插入操作必须等待另一个线程的移除操作,反之亦然。使用SynchronousQueue时,线程池会倾向于创建新的线程来处理任务,而不是将任务放入队列。适用于对实时性要求较高,不希望任务在队列中等待的场景。
  3. 拒绝策略的选择
    • AbortPolicy:默认的拒绝策略,当任务被拒绝时,直接抛出RejectedExecutionException异常。适用于希望在任务被拒绝时立即得到通知,以便进行相应处理的场景。
    • CallerRunsPolicy:将被拒绝的任务返回给调用者线程执行。这种策略可以降低新任务的提交速度,因为调用者线程在执行任务时,无法继续提交新任务。适用于对任务处理的实时性要求不高,且希望降低任务提交频率的场景。
    • DiscardPolicy:直接丢弃被拒绝的任务,不做任何处理。适用于对任务丢失不敏感,且希望尽可能快速处理新任务的场景。
    • DiscardOldestPolicy:丢弃任务队列中最老的任务,然后尝试提交新任务。适用于希望优先处理新任务,且对任务队列中的任务顺序不太敏感的场景。

线程池的生命周期管理

  1. shutdown()
    • 调用shutdown()方法后,线程池不再接受新的任务,但会继续执行已提交到任务队列中的任务。所有任务执行完毕后,线程池中的线程会逐渐终止。这个过程是平缓的,不会立即终止线程池。
    • 在调用shutdown()方法后,可以通过isShutdown()方法判断线程池是否已经开始关闭过程,通过isTerminated()方法判断线程池是否已经完全终止(所有任务已执行完毕且所有线程已终止)。
  2. shutdownNow()
    • shutdownNow()方法会尝试停止所有正在执行的任务,停止处理等待任务队列中的任务,并返回等待执行的任务列表。它会给正在执行任务的线程发送interrupt信号,线程可以通过Thread.currentThread().isInterrupted()方法来判断是否收到中断信号,并进行相应的处理(例如提前结束任务)。
    • 调用shutdownNow()后,线程池也会进入关闭状态,同样可以使用isShutdown()isTerminated()方法来判断线程池的状态。
  3. awaitTermination(long timeout, TimeUnit unit)
    • 这个方法用于等待线程池终止。它会阻塞当前线程,直到线程池完全终止,或者达到指定的等待时间。如果在等待时间内线程池终止,则返回true;否则返回false
    • 通常在调用shutdown()shutdownNow()后,使用awaitTermination()方法来确保线程池中的所有任务都执行完毕,然后再进行后续的清理操作。例如:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolLifeCycleExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " has finished.");
            });
        }

        // 关闭线程池
        executorService.shutdown();

        try {
            if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在上述代码中,我们首先调用shutdown()方法关闭线程池,然后使用awaitTermination()方法等待5秒,看线程池是否能在5秒内完全终止。如果不能,则调用shutdownNow()方法尝试立即停止线程池,并再次等待5秒。如果最终线程池仍未终止,则输出错误信息。这样可以有效地管理线程池的生命周期,确保任务的正确执行和资源的合理释放。

总结与最佳实践

  1. 避免使用Executors工厂类的默认方法Executors工厂类提供的newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPool等方法在某些情况下可能会导致性能问题或资源耗尽。例如,newFixedThreadPoolnewSingleThreadExecutor使用的是无界的LinkedBlockingQueue,如果任务提交速度过快,可能会导致内存溢出;newCachedThreadPool允许创建无限个线程,可能会耗尽系统资源。因此,在生产环境中,建议使用ThreadPoolExecutor类直接创建线程池,以便更好地控制线程池的参数。
  2. 合理设置线程池参数:根据任务的类型(CPU密集型或I/O密集型)合理设置核心线程数和最大线程数。同时,选择合适的任务队列和拒绝策略,以满足应用程序的性能和稳定性要求。在实际应用中,可以通过性能测试和监控来调整线程池参数,找到最优配置。
  3. 正确管理线程池的生命周期:在应用程序结束时,要正确关闭线程池,避免线程泄漏。使用shutdown()shutdownNow()方法时,要结合awaitTermination()方法来确保线程池中的所有任务都执行完毕,同时处理好可能的中断异常。
  4. 监控和日志记录:在生产环境中,对线程池的运行状态进行监控和日志记录是非常重要的。可以通过ThreadPoolExecutor类提供的一些方法,如getActiveCount(获取当前活动线程数)、getTaskCount(获取已提交的任务总数)、getCompletedTaskCount(获取已完成的任务数)等,来了解线程池的运行情况。同时,记录线程池相关的日志,如任务提交、任务执行、线程池关闭等事件,以便在出现问题时能够快速定位和解决。

通过深入理解Java线程池的创建方法、参数调优、生命周期管理以及遵循最佳实践,我们可以在多线程编程中充分发挥线程池的优势,提高应用程序的性能、稳定性和可维护性。在实际开发中,根据具体的业务需求和系统环境,灵活选择和配置线程池,是实现高效并发编程的关键。