Java ThreadPoolExecutor 的参数调优
2021-02-212.8k 阅读
Java ThreadPoolExecutor 的参数调优
在Java多线程编程中,ThreadPoolExecutor
是一个功能强大且灵活的线程池实现类。合理地调优ThreadPoolExecutor
的参数,对于提升应用程序的性能、资源利用率以及稳定性至关重要。接下来我们将深入探讨如何对其参数进行调优。
ThreadPoolExecutor
的构造函数与参数
ThreadPoolExecutor
有多个构造函数,最常用的构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这里涉及到7个重要参数:
corePoolSize
:核心线程数,线程池中会一直存活的线程数量,即使这些线程处于空闲状态。当提交新任务时,如果当前线程数小于corePoolSize
,则会创建新的线程来处理任务,即使线程池中有空闲线程。maximumPoolSize
:线程池允许的最大线程数。当任务队列已满,并且当前线程数小于maximumPoolSize
时,线程池会创建新的线程来处理任务。keepAliveTime
:当线程数大于corePoolSize
时,多余的空闲线程等待新任务的最长时间,超过这个时间,多余的线程将被终止。unit
:keepAliveTime
的时间单位,如TimeUnit.SECONDS
、TimeUnit.MILLISECONDS
等。workQueue
:任务队列,用于存放等待执行的任务。当线程池中的线程都在忙碌,且当前线程数达到corePoolSize
时,新提交的任务将被放入任务队列中。常见的任务队列实现有ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
等。threadFactory
:线程工厂,用于创建新线程。通过自定义线程工厂,可以设置线程的名称、优先级等属性。handler
:拒绝策略,当任务队列已满且线程数达到maximumPoolSize
时,新提交的任务将被拒绝,由拒绝策略来处理这些被拒绝的任务。常见的拒绝策略有ThreadPoolExecutor.AbortPolicy
(默认策略,直接抛出异常)、ThreadPoolExecutor.CallerRunsPolicy
(由调用者线程来执行任务)、ThreadPoolExecutor.DiscardPolicy
(直接丢弃任务)、ThreadPoolExecutor.DiscardOldestPolicy
(丢弃队列中最老的任务,然后尝试提交新任务)。
核心线程数 corePoolSize
的调优
- CPU 密集型任务
对于CPU密集型任务,线程主要在执行计算,很少有I/O操作等阻塞情况。一般来说,
corePoolSize
可以设置为CPU核心数加1。例如,在一个4核的CPU上,corePoolSize
可以设置为5。这样在某个线程因为页缺失等原因阻塞时,还有一个额外的线程可以利用CPU资源。
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class CpuIntensiveTask {
public static void main(String[] args) {
int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
corePoolSize,
0L,
java.util.concurrent.TimeUnit.MILLISECONDS,
new java.util.concurrent.LinkedBlockingQueue<>());
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
// 模拟CPU密集型任务
long result = 0;
for (int j = 0; j < 1000000000; j++) {
result += j;
}
System.out.println(Thread.currentThread().getName() + " 计算结果: " + result);
});
}
executorService.shutdown();
}
}
- I/O 密集型任务
I/O密集型任务中,线程大部分时间在等待I/O操作完成,如读取文件、网络请求等。这种情况下,
corePoolSize
可以设置为CPU核心数的2倍甚至更多。因为在I/O等待期间,CPU处于空闲状态,可以让更多的线程利用这些空闲时间。
示例代码:
import java.io.File;
import java.io.FileReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class IoIntensiveTask {
public static void main(String[] args) {
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
corePoolSize,
0L,
java.util.concurrent.TimeUnit.MILLISECONDS,
new java.util.concurrent.LinkedBlockingQueue<>());
File file = new File("example.txt");
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
try (FileReader fileReader = new FileReader(file)) {
int data;
while ((data = fileReader.read()) != -1) {
// 模拟I/O操作
}
System.out.println(Thread.currentThread().getName() + " 完成文件读取");
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
- 混合型任务
如果任务中既有CPU密集型部分,又有I/O密集型部分,需要根据实际情况进行测试和调整。可以先按照I/O密集型任务的方式设置
corePoolSize
,然后逐步调整,观察系统的性能指标,如吞吐量、响应时间等,找到一个最优值。
最大线程数 maximumPoolSize
的调优
- 结合任务队列分析
maximumPoolSize
与任务队列的容量密切相关。如果任务队列容量较大,并且可以长时间容纳任务,那么maximumPoolSize
可以适当减小。例如,使用LinkedBlockingQueue
(无界队列)时,maximumPoolSize
的设置可能就不是那么关键,因为任务可以无限期地在队列中等待。但如果使用ArrayBlockingQueue
(有界队列),当队列满了之后,就需要考虑创建更多线程来处理任务,此时maximumPoolSize
的设置就很重要。
假设我们有一个任务队列ArrayBlockingQueue
,容量为100,且任务处理时间较长。如果corePoolSize
设置为10,当有大量任务涌入,队列很快就会满。这时候就需要合理设置maximumPoolSize
,以避免任务被拒绝。
示例代码:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MaximumPoolSizeExample {
public static void main(String[] args) {
int corePoolSize = 10;
int maximumPoolSize = 50;
int queueCapacity = 100;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity));
for (int i = 0; i < 200; i++) {
executorService.submit(() -> {
// 模拟长时间运行任务
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 任务完成");
});
}
executorService.shutdown();
}
}
- 系统资源限制
在设置
maximumPoolSize
时,还需要考虑系统的资源限制,如内存、CPU使用率等。创建过多的线程会消耗大量的系统资源,可能导致系统性能下降甚至崩溃。可以通过监控系统的内存使用情况、CPU负载等指标,来确定一个合理的maximumPoolSize
。
线程存活时间 keepAliveTime
与时间单位 unit
的调优
- 任务提交频率
如果任务提交频率较高,且任务执行时间较短,那么可以适当减小
keepAliveTime
。因为频繁创建和销毁线程会带来额外的开销,而保持一定数量的空闲线程可以提高任务处理的效率。例如,在一个高并发的Web应用中,不断有短时间的请求任务,keepAliveTime
可以设置为1 - 5秒。
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class KeepAliveTimeExample {
public static void main(String[] args) {
int corePoolSize = 10;
int maximumPoolSize = 20;
long keepAliveTime = 2;
TimeUnit unit = TimeUnit.SECONDS;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new java.util.concurrent.LinkedBlockingQueue<>());
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
// 模拟短时间任务
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 任务完成");
});
}
executorService.shutdown();
}
}
- 资源回收需求
如果系统对资源回收比较敏感,希望在任务处理完后尽快释放多余的线程资源,那么可以适当增大
keepAliveTime
。例如,在一个内存有限的嵌入式系统中,长时间保持大量空闲线程会占用宝贵的内存资源,此时可以将keepAliveTime
设置得较长,以便在任务处理完后及时回收线程。
任务队列 workQueue
的选择与调优
ArrayBlockingQueue
ArrayBlockingQueue
是一个有界队列,它的容量在创建时就确定。这种队列适合在任务数量可预测,且希望对任务队列大小进行严格控制的场景。例如,在一个订单处理系统中,每个订单的处理任务可以放入ArrayBlockingQueue
,如果队列满了,说明订单处理速度跟不上订单生成速度,此时可以根据拒绝策略进行处理。
示例代码:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ArrayBlockingQueueExample {
public static void main(String[] args) {
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
int queueCapacity = 20;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new ArrayBlockingQueue<>(queueCapacity));
for (int i = 0; i < 30; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 处理任务");
});
}
executorService.shutdown();
}
}
LinkedBlockingQueue
LinkedBlockingQueue
可以是有界的,也可以是无界的(默认是无界的)。无界队列在任务量较大时,不会因为队列满而拒绝任务,但可能会导致内存耗尽。有界队列则可以避免这种情况。在一些对任务处理能力要求较高,且任务量不会无限增长的场景中,可以使用有界的LinkedBlockingQueue
。例如,在一个日志处理系统中,日志记录任务可以放入有界的LinkedBlockingQueue
,当队列满时,可以根据拒绝策略丢弃较新的日志记录。
示例代码(有界LinkedBlockingQueue
):
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class LinkedBlockingQueueExample {
public static void main(String[] args) {
int corePoolSize = 3;
int maximumPoolSize = 6;
long keepAliveTime = 5;
TimeUnit unit = TimeUnit.SECONDS;
int queueCapacity = 15;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new LinkedBlockingQueue<>(queueCapacity));
for (int i = 0; i < 20; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 处理日志任务");
});
}
executorService.shutdown();
}
}
SynchronousQueue
SynchronousQueue
没有容量,它不存储任务,而是直接将任务传递给线程处理。如果没有空闲线程,新提交的任务会等待,直到有线程准备好接收任务。这种队列适用于任务处理速度较快,且希望尽快处理任务,避免任务在队列中积压的场景。例如,在一个实时数据处理系统中,每个数据处理任务可以通过SynchronousQueue
直接交给线程处理。
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueExample {
public static void main(String[] args) {
int corePoolSize = 4;
int maximumPoolSize = 8;
long keepAliveTime = 5;
TimeUnit unit = TimeUnit.SECONDS;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new SynchronousQueue<>());
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 处理实时数据任务");
});
}
executorService.shutdown();
}
}
线程工厂 threadFactory
的定制
通过自定义线程工厂,可以为线程设置有意义的名称、优先级等属性,方便在调试和监控时进行区分。
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadFactoryExample {
public static void main(String[] args) {
int corePoolSize = 3;
int maximumPoolSize = 6;
long keepAliveTime = 5;
TimeUnit unit = TimeUnit.SECONDS;
ThreadFactory threadFactory = new ThreadFactory() {
private int counter = 1;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("CustomThread-" + counter++);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new java.util.concurrent.LinkedBlockingQueue<>(),
threadFactory);
for (int i = 0; i < 5; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 执行任务");
});
}
executorService.shutdown();
}
}
拒绝策略 handler
的选择与定制
AbortPolicy
AbortPolicy
是默认的拒绝策略,当任务被拒绝时,它会直接抛出RejectedExecutionException
。这种策略适用于对任务丢失非常敏感的场景,例如在金融交易系统中,任何一个交易任务都不能被丢弃,否则可能导致数据不一致等问题。
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AbortPolicyExample {
public static void main(String[] args) {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 5;
TimeUnit unit = TimeUnit.SECONDS;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new java.util.concurrent.LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 0; i < 6; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 处理任务");
});
}
} catch (RejectedExecutionException e) {
System.err.println("任务被拒绝: " + e.getMessage());
}
executorService.shutdown();
}
}
CallerRunsPolicy
CallerRunsPolicy
会将被拒绝的任务交给调用者线程来执行。这种策略可以减轻线程池的压力,但可能会影响调用者线程的正常工作。例如,在一个Web应用中,如果使用CallerRunsPolicy
,当线程池满时,新的HTTP请求任务会在Web服务器的主线程中执行,可能会导致服务器响应变慢。
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CallerRunsPolicyExample {
public static void main(String[] args) {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 5;
TimeUnit unit = TimeUnit.SECONDS;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new java.util.concurrent.LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 6; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 处理任务");
});
}
executorService.shutdown();
}
}
DiscardPolicy
DiscardPolicy
直接丢弃被拒绝的任务,不做任何处理。这种策略适用于对任务丢失不敏感,且任务处理不重要的场景,例如一些统计类的任务,如果因为系统繁忙而丢失一些统计数据,对整体业务影响不大。
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DiscardPolicyExample {
public static void main(String[] args) {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 5;
TimeUnit unit = TimeUnit.SECONDS;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new java.util.concurrent.LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 6; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 处理任务");
});
}
executorService.shutdown();
}
}
DiscardOldestPolicy
DiscardOldestPolicy
会丢弃任务队列中最老的任务,然后尝试提交新任务。这种策略适用于希望优先处理新任务的场景,例如在一个实时消息处理系统中,新的消息可能比旧的消息更重要,当线程池和任务队列满时,丢弃旧消息,处理新消息。
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DiscardOldestPolicyExample {
public static void main(String[] args) {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 5;
TimeUnit unit = TimeUnit.SECONDS;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new java.util.concurrent.LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 6; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 处理任务");
});
}
executorService.shutdown();
}
}
- 自定义拒绝策略 在某些特殊场景下,上述内置的拒绝策略可能都不满足需求,此时可以自定义拒绝策略。例如,在一个分布式系统中,当任务被拒绝时,可能希望将任务发送到其他节点进行处理。
示例代码:
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定义处理逻辑,例如将任务发送到其他节点
System.err.println("任务被拒绝,尝试发送到其他节点: " + r.toString());
}
}
使用自定义拒绝策略的示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomRejectedExecutionHandlerExample {
public static void main(String[] args) {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 5;
TimeUnit unit = TimeUnit.SECONDS;
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new java.util.concurrent.LinkedBlockingQueue<>(2),
new CustomRejectedExecutionHandler());
for (int i = 0; i < 6; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 处理任务");
});
}
executorService.shutdown();
}
}
通过对ThreadPoolExecutor
各个参数的深入理解和合理调优,可以使应用程序在多线程环境下更高效、稳定地运行。在实际应用中,需要结合具体的业务场景、系统资源情况以及性能指标要求,不断测试和调整参数,以达到最佳的效果。