MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 ThreadPoolExecutor 核心参数的深入剖析

2023-09-275.2k 阅读

Java 线程池基础概念

在深入探讨 ThreadPoolExecutor 的核心参数之前,我们先来回顾一下线程池的基本概念。线程池是一种管理和复用线程的机制,它能够有效地控制线程的创建和销毁,从而避免频繁创建和销毁线程带来的开销,提高系统的性能和稳定性。

在 Java 中,ThreadPoolExecutorExecutorService 接口的一个实现类,它提供了丰富的构造函数和方法来灵活配置和管理线程池。ThreadPoolExecutor 管理着一组工作线程,这些线程从任务队列中获取任务并执行。

ThreadPoolExecutor 构造函数

ThreadPoolExecutor 有多个构造函数,其中最常用的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

接下来我们逐个剖析这个构造函数中的核心参数。

核心参数剖析

corePoolSize(核心线程数)

核心线程数是线程池中一直保持存活的线程数量,即使这些线程处于空闲状态。当有新任务提交到线程池时,如果当前线程池中的线程数量小于 corePoolSize,线程池会创建新的线程来处理任务,而不会将任务放入任务队列。

代码示例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CorePoolSizeExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // corePoolSize
                4,
                10,
                TimeUnit.SECONDS,
                workQueue);

        for (int i = 0; i < 3; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is working.");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

在上述代码中,corePoolSize 被设置为 2。当提交 3 个任务时,前 2 个任务会立即由新创建的核心线程执行,第 3 个任务会被放入任务队列 workQueue 中等待核心线程空闲来执行。

maximumPoolSize(最大线程数)

最大线程数定义了线程池中允许存在的最大线程数量。当任务队列已满,并且当前线程池中的线程数量小于 maximumPoolSize 时,线程池会创建新的非核心线程来处理任务。

代码示例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MaximumPoolSizeExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4, // maximumPoolSize
                10,
                TimeUnit.SECONDS,
                workQueue);

        for (int i = 0; i < 6; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is working.");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

在这个例子中,corePoolSize 为 2,任务队列容量为 2,maximumPoolSize 为 4。当提交 6 个任务时,前 2 个任务由核心线程执行,接下来 2 个任务放入任务队列,最后 2 个任务会创建新的非核心线程来执行,因为任务队列已满且总线程数未达到 maximumPoolSize

keepAliveTime(存活时间)

存活时间定义了非核心线程在空闲状态下能够保持存活的最长时间。当非核心线程在 keepAliveTime 时间内没有任务可执行时,它们会被销毁。

代码示例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class KeepAliveTimeExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                5, // keepAliveTime
                TimeUnit.SECONDS,
                workQueue);

        for (int i = 0; i < 4; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is working.");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("After 8 seconds, the number of active threads: " + executor.getActiveCount());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("After 13 seconds, the number of active threads: " + executor.getActiveCount());

        executor.shutdown();
    }
}

在上述代码中,keepAliveTime 为 5 秒。前 4 个任务执行完后,非核心线程会在空闲 5 秒后被销毁。8 秒时,非核心线程还在,13 秒时,非核心线程已被销毁,因此活跃线程数会发生变化。

unit(时间单位)

unit 参数指定了 keepAliveTime 的时间单位,它是 TimeUnit 枚举类型的一个实例。TimeUnit 提供了多种时间单位,如 SECONDS(秒)、MINUTES(分钟)、HOURS(小时)等。

workQueue(任务队列)

任务队列用于存放等待执行的任务。当线程池中的核心线程都在忙碌且任务队列未满时,新提交的任务会被放入任务队列。ThreadPoolExecutor 支持多种类型的任务队列,常见的有以下几种:

  • ArrayBlockingQueue:基于数组的有界阻塞队列,按 FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:基于链表的无界阻塞队列(也可以指定容量变为有界),按 FIFO 原则对元素进行排序。
  • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作,反之亦然。
  • PriorityBlockingQueue:具有优先级的无界阻塞队列。

代码示例

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class WorkQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue);

        for (int i = 0; i < 4; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is working.");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

在这个例子中,使用了 ArrayBlockingQueue 作为任务队列,容量为 2。当提交 4 个任务时,前 2 个任务由核心线程执行,接下来 2 个任务放入任务队列等待执行。

threadFactory(线程工厂)

threadFactory 用于创建新的线程。通过自定义线程工厂,我们可以对线程进行一些定制化操作,如设置线程名称、线程优先级、是否为守护线程等。

代码示例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadFactoryExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadFactory threadFactory = r -> {
            Thread thread = new Thread(r);
            thread.setName("CustomThread-" + thread.getId());
            thread.setPriority(Thread.NORM_PRIORITY);
            return thread;
        };

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue,
                threadFactory);

        for (int i = 0; i < 3; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is working.");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

在上述代码中,通过自定义 threadFactory,每个新创建的线程都有一个以 CustomThread- 开头的名称,并且优先级为普通优先级。

RejectedExecutionHandler(拒绝策略)

当线程池已经达到 maximumPoolSize 且任务队列已满时,新提交的任务将被拒绝。RejectedExecutionHandler 接口定义了任务被拒绝时的处理策略。ThreadPoolExecutor 提供了几种内置的拒绝策略:

  • AbortPolicy:默认策略,直接抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy:将被拒绝的任务交给提交任务的线程来执行,即调用 execute 方法的线程来执行。
  • DiscardPolicy:直接丢弃被拒绝的任务,不做任何处理。
  • DiscardOldestPolicy:丢弃任务队列中最老的任务(队首任务),然后尝试将新任务放入任务队列。

代码示例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RejectedExecutionHandlerExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
        RejectedExecutionHandler handler = new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString() + " is rejected.");
            }
        };

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue,
                handler);

        for (int i = 0; i < 7; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is working.");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

在这个例子中,自定义了一个拒绝策略,当任务被拒绝时,打印任务被拒绝的信息。当提交 7 个任务时,由于任务队列容量为 2,maximumPoolSize 为 4,会有一个任务被拒绝并执行自定义的拒绝策略。

核心参数的动态调整

在实际应用中,有时需要根据系统的负载情况动态调整线程池的核心参数。ThreadPoolExecutor 提供了一些方法来实现这一点。

setCorePoolSize(int corePoolSize)

该方法用于动态设置核心线程数。如果当前线程数大于新的核心线程数,多余的核心线程会在空闲时被销毁;如果当前线程数小于新的核心线程数,会创建新的核心线程。

代码示例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SetCorePoolSizeExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue);

        for (int i = 0; i < 3; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is working.");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.setCorePoolSize(1);
        System.out.println("Core pool size changed to: " + executor.getCorePoolSize());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

在上述代码中,先提交 3 个任务,线程池中有 2 个核心线程和 1 个任务在队列中。5 秒后,将核心线程数调整为 1,多余的核心线程在空闲时会被销毁。

setMaximumPoolSize(int maximumPoolSize)

此方法用于动态设置最大线程数。如果新的最大线程数大于当前最大线程数,线程池可能会创建新的线程来处理任务;如果新的最大线程数小于当前最大线程数,多余的线程会在空闲时被销毁。

代码示例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SetMaximumPoolSizeExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue);

        for (int i = 0; i < 6; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " is working.");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.setMaximumPoolSize(3);
        System.out.println("Maximum pool size changed to: " + executor.getMaximumPoolSize());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

在这个例子中,提交 6 个任务,线程池达到最大线程数 4。5 秒后,将最大线程数调整为 3,多余的非核心线程在空闲时会被销毁。

合理设置核心参数的策略

合理设置 ThreadPoolExecutor 的核心参数对于提高系统性能和稳定性至关重要。以下是一些设置参数的策略建议:

  1. 核心线程数:根据任务类型和系统资源来确定。对于 CPU 密集型任务,核心线程数可设置为 CPU 核心数 + 1;对于 I/O 密集型任务,核心线程数可适当增大,如 CPU 核心数 * 2。
  2. 最大线程数:考虑系统资源的限制,避免设置过大导致系统资源耗尽。一般可以根据经验值和实际测试来调整。
  3. 任务队列:根据任务的性质选择合适的任务队列。如果任务执行时间短且数量多,可选择有界队列;如果任务执行时间长且数量少,可选择无界队列,但要注意内存使用情况。
  4. 存活时间:根据系统的负载情况和任务的频率来设置。如果系统负载波动较大,可适当设置较长的存活时间;如果负载相对稳定,可设置较短的存活时间。

通过合理设置这些核心参数,并结合动态调整机制,能够使线程池在不同的工作负载下都能保持高效运行。同时,在实际应用中还需要对线程池进行监控和调优,以确保系统的整体性能和稳定性。

综上所述,深入理解 ThreadPoolExecutor 的核心参数,并根据实际需求合理设置和动态调整这些参数,是使用线程池实现高效并发编程的关键。通过本文的详细剖析和代码示例,希望能帮助读者更好地掌握这一重要的 Java 并发工具。