Java 项目中避免使用 Executors 创建线程池的原因
一、线程池基础概述
1.1 线程池的概念与作用
在Java多线程编程中,线程池是一种重要的资源管理机制。线程的创建和销毁是相对昂贵的操作,涉及到操作系统内核态与用户态的切换等开销。线程池预先创建一定数量的线程,这些线程可以被重复使用来执行多个任务。当有新任务提交时,线程池会分配一个空闲线程来处理该任务,任务执行完毕后,线程并不会被销毁,而是返回线程池等待下一个任务。这种机制大大减少了线程创建和销毁的开销,提高了系统的性能和资源利用率。
例如,在一个Web服务器中,每一个HTTP请求都可以看作是一个任务。如果每次收到请求都创建一个新线程来处理,在高并发情况下,频繁的线程创建和销毁会导致系统性能急剧下降。而使用线程池,服务器可以预先创建一定数量的线程,将请求分配给这些线程处理,从而有效提升系统的并发处理能力。
1.2 Java线程池的核心类
Java通过java.util.concurrent
包提供了丰富的线程池相关类。其中,ThreadPoolExecutor
是线程池的核心实现类,它提供了灵活的线程池配置选项。通过构造函数,我们可以设置核心线程数、最大线程数、线程存活时间等关键参数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 构造函数实现
}
corePoolSize
:核心线程数,线程池会一直维护这些线程,即使它们处于空闲状态。maximumPoolSize
:最大线程数,线程池允许创建的最大线程数量。keepAliveTime
:当线程数大于核心线程数时,多余的空闲线程的存活时间。unit
:keepAliveTime
的时间单位。workQueue
:任务队列,用于存放等待执行的任务。threadFactory
:线程工厂,用于创建线程。handler
:拒绝策略,当任务队列已满且线程数达到最大线程数时,新任务的处理策略。
Executors
类则是一个工具类,它提供了一些静态方法来创建不同类型的线程池,如newFixedThreadPool
、newCachedThreadPool
、newSingleThreadExecutor
等,这些方法底层实际上是通过ThreadPoolExecutor
来实现的。
二、Executors创建线程池的方式
2.1 newFixedThreadPool
Executors.newFixedThreadPool(int nThreads)
方法创建一个固定大小的线程池,该线程池中的线程数量始终保持不变,即核心线程数和最大线程数相等。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
在上述代码中,我们创建了一个固定大小为3的线程池,并提交了5个任务。由于线程池大小为3,所以一开始会有3个任务同时执行,另外2个任务会进入任务队列等待。当正在执行的任务完成后,线程池中的线程会从任务队列中取出任务继续执行。
2.2 newCachedThreadPool
Executors.newCachedThreadPool()
方法创建一个可缓存的线程池。该线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE
,即理论上可以创建无限多个线程。当有新任务提交时,如果线程池中有空闲线程,则复用空闲线程;如果没有空闲线程,则创建一个新线程来处理任务。如果某个线程在60秒内没有被使用,它将被回收。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
在这个例子中,由于线程池可动态创建线程,所以5个任务可能会同时由不同的线程执行。如果在一段时间内没有新任务提交,那些空闲的线程会在60秒后被回收。
2.3 newSingleThreadExecutor
Executors.newSingleThreadExecutor()
方法创建一个单线程的线程池,即核心线程数和最大线程数都为1。它保证所有任务按照提交的顺序依次在一个线程中执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
在上述代码中,5个任务会依次在同一个线程中执行,前一个任务执行完毕后,下一个任务才会开始。
三、避免使用Executors创建线程池的原因
3.1 任务队列可能导致OOM(OutOfMemoryError)
3.1.1 newFixedThreadPool和newSingleThreadExecutor的任务队列问题
newFixedThreadPool
和newSingleThreadExecutor
方法默认使用的任务队列是LinkedBlockingQueue
,这是一个无界队列。当任务提交速度大于线程池处理速度时,任务会不断堆积在任务队列中。由于队列理论上可以无限增长,最终可能会耗尽系统内存,导致OutOfMemoryError
。
例如,在一个电商系统的订单处理模块中,如果使用newFixedThreadPool
来处理订单任务,在促销活动期间,订单提交量可能会急剧增加。如果线程池处理订单的速度跟不上订单提交速度,订单任务就会在LinkedBlockingQueue
中不断堆积,最终导致内存溢出。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class OOMExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
final int taskNumber = i;
executorService.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " is processed.");
});
}
// executorService.shutdown(); 故意不关闭,让任务持续堆积
}
}
在这个代码示例中,我们创建了一个固定大小为1的线程池,并不断提交任务。由于任务处理速度较慢(每个任务睡眠1秒),而提交任务的速度很快,任务会在LinkedBlockingQueue
中不断堆积,最终可能导致OutOfMemoryError
。
3.1.2 解决方案
为了避免这种情况,我们应该使用有界队列来代替LinkedBlockingQueue
。例如,可以使用ArrayBlockingQueue
,它在创建时需要指定队列的容量。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class FixedThreadPoolWithBoundedQueue {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
3,
3,
0L,
TimeUnit.MILLISECONDS,
workQueue
);
for (int i = 0; i < 20; i++) {
final int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
在上述代码中,我们创建了一个容量为10的ArrayBlockingQueue
作为任务队列。当任务队列满且线程池达到最大线程数时,后续提交的任务会根据拒绝策略进行处理,从而避免了任务无限堆积导致的内存溢出问题。
3.2 可能创建过多线程导致系统资源耗尽
3.2.1 newCachedThreadPool的线程创建问题
newCachedThreadPool
方法创建的线程池最大线程数为Integer.MAX_VALUE
,在高并发情况下,可能会创建大量的线程。过多的线程会占用大量的系统资源,如内存、CPU等,导致系统性能下降甚至崩溃。
例如,在一个分布式爬虫系统中,如果使用newCachedThreadPool
来处理网页抓取任务,当需要抓取的网页数量非常庞大时,可能会瞬间创建大量线程。每个线程都需要占用一定的内存空间,过多的线程会导致内存耗尽,同时大量线程竞争CPU资源,也会使系统的整体性能急剧下降。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadOverheadExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100000; i++) {
final int taskNumber = i;
executorService.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " is running.");
});
}
// executorService.shutdown(); 故意不关闭,让线程持续创建
}
}
在这个代码示例中,我们尝试提交10万个任务到newCachedThreadPool
。由于线程池可以无限创建线程,在实际运行中,可能会因为创建过多线程而导致系统资源耗尽。
3.2.2 解决方案
我们应该根据系统的实际资源情况,设置合理的最大线程数。通过直接使用ThreadPoolExecutor
,我们可以精确控制线程池的参数。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CachedThreadPoolWithLimit {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
0,
100,
60L,
TimeUnit.SECONDS,
workQueue
);
for (int i = 0; i < 1000; i++) {
final int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
在上述代码中,我们设置最大线程数为100,这样即使在高并发情况下,线程池最多也只会创建100个线程,从而有效避免了因创建过多线程导致的系统资源耗尽问题。
3.3 缺乏对线程池状态的精确控制
3.3.1 Executors创建线程池的局限性
通过Executors
创建的线程池,开发者对线程池的状态控制相对有限。例如,对于newFixedThreadPool
和newCachedThreadPool
,我们很难直接获取线程池当前的任务队列大小、活动线程数等详细信息,这在进行系统监控和性能调优时会带来不便。
假设我们正在开发一个实时数据分析系统,需要实时了解线程池的任务处理情况,以便及时调整系统参数。如果使用Executors.newFixedThreadPool
,我们无法直接获取任务队列中等待处理的任务数量,也就难以准确评估系统的负载情况。
3.3.2 解决方案
当我们直接使用ThreadPoolExecutor
时,可以方便地获取线程池的各种状态信息。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolStatusExample {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
3,
5,
10L,
TimeUnit.SECONDS,
workQueue
);
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("Active threads: " + executorService.getActiveCount());
System.out.println("Queue size: " + executorService.getQueue().size());
executorService.shutdown();
}
}
在上述代码中,我们通过ThreadPoolExecutor
的getActiveCount
方法获取当前活动线程数,通过getQueue().size()
方法获取任务队列的大小。这些信息对于系统的监控和调优非常重要,而通过Executors
创建的线程池获取这些信息相对困难。
3.4 拒绝策略的默认设置可能不符合业务需求
3.4.1 Executors创建线程池的默认拒绝策略
Executors
创建的线程池默认使用的拒绝策略是ThreadPoolExecutor.AbortPolicy
,当任务队列已满且线程数达到最大线程数时,新提交的任务会被拒绝,并抛出RejectedExecutionException
。
在一些业务场景中,这种默认的拒绝策略可能并不合适。例如,在一个金融交易系统中,交易订单任务如果被拒绝并抛出异常,可能会导致交易失败,给用户带来损失。
3.4.2 解决方案
我们可以根据业务需求选择合适的拒绝策略。Java提供了几种内置的拒绝策略,除了AbortPolicy
,还有CallerRunsPolicy
、DiscardPolicy
和DiscardOldestPolicy
,也可以自定义拒绝策略。
CallerRunsPolicy
:当任务被拒绝时,由提交任务的线程来执行该任务。这种策略可以降低新任务的提交速度。DiscardPolicy
:直接丢弃被拒绝的任务,不做任何处理。DiscardOldestPolicy
:丢弃任务队列中最老的任务,然后尝试提交新任务。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Task " + r + " is rejected. Executor status: " + executor.toString());
}
}
public class CustomRejectedPolicyExample {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new DelayQueue<>();
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
2,
2,
0L,
TimeUnit.MILLISECONDS,
workQueue,
new CustomRejectedExecutionHandler()
);
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
在上述代码中,我们自定义了一个拒绝策略CustomRejectedExecutionHandler
,当任务被拒绝时,会打印出任务和线程池的相关信息。通过这种方式,我们可以根据业务需求灵活处理被拒绝的任务,而不是简单地抛出异常。
四、正确创建和使用线程池的建议
4.1 根据业务场景选择合适的线程池参数
在创建线程池时,需要根据业务场景来确定核心线程数、最大线程数、任务队列等参数。如果任务是CPU密集型的,核心线程数可以设置为CPU核心数或略多一些,以充分利用CPU资源;如果任务是I/O密集型的,由于线程在I/O操作时会阻塞,核心线程数可以设置得相对多一些,以提高系统的并发处理能力。
例如,在一个图像识别系统中,图像识别任务通常是CPU密集型的。假设系统运行的服务器有8个CPU核心,我们可以将核心线程数设置为8或9,最大线程数也可以设置为相近的值,同时选择一个合适的有界任务队列,如ArrayBlockingQueue
。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CPUIntensiveThreadPool {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
8,
9,
10L,
TimeUnit.SECONDS,
workQueue
);
// 提交CPU密集型任务
executorService.shutdown();
}
}
而在一个文件上传下载系统中,任务主要是I/O密集型的。我们可以将核心线程数设置为一个相对较大的值,如20,最大线程数可以设置为30,任务队列同样选择有界队列。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class IOIntensiveThreadPool {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(20);
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
20,
30,
10L,
TimeUnit.SECONDS,
workQueue
);
// 提交I/O密集型任务
executorService.shutdown();
}
}
4.2 监控和调优线程池
为了确保线程池的性能和稳定性,需要对线程池进行监控。可以通过ThreadPoolExecutor
提供的方法获取线程池的各种状态信息,如活动线程数、任务队列大小、已完成任务数等。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolMonitoring {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
3,
5,
10L,
TimeUnit.SECONDS,
workQueue
);
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("Active threads: " + executorService.getActiveCount());
System.out.println("Queue size: " + executorService.getQueue().size());
System.out.println("Completed tasks: " + executorService.getCompletedTaskCount());
executorService.shutdown();
}
}
根据监控数据,我们可以对线程池的参数进行调优。如果发现任务队列经常满,可能需要增加线程数或调整任务队列的大小;如果发现线程池中有大量空闲线程,可能需要减少核心线程数。
4.3 合理处理线程池中的异常
在线程池执行任务过程中,可能会出现各种异常。对于未捕获的异常,ThreadPoolExecutor
默认会将其打印到标准错误输出。然而,在实际应用中,我们可能需要更优雅地处理这些异常,例如记录异常日志、进行重试等。
可以通过实现Thread.UncaughtExceptionHandler
接口来处理线程池中的未捕获异常。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExceptionHandlingInThreadPool {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ThreadFactory threadFactory = r -> {
Thread thread = new Thread(r);
thread.setUncaughtExceptionHandler((t, e) -> {
System.out.println("Uncaught exception in thread " + t.getName() + ": " + e.getMessage());
});
return thread;
};
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
3,
5,
10L,
TimeUnit.SECONDS,
workQueue,
threadFactory
);
executorService.submit(() -> {
throw new RuntimeException("Simulated exception");
});
executorService.shutdown();
}
}
在上述代码中,我们通过自定义ThreadFactory
,为每个线程设置了UncaughtExceptionHandler
,当线程执行任务抛出未捕获异常时,会打印出异常信息。这样可以帮助我们更好地定位和处理线程池中的异常情况。
4.4 正确关闭线程池
当不再需要使用线程池时,应该正确关闭线程池,以释放资源。ThreadPoolExecutor
提供了shutdown
和shutdownNow
方法来关闭线程池。
shutdown
方法:启动一个有序关闭过程,不再接受新任务,但会继续执行已提交的任务。shutdownNow
方法:尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
在大多数情况下,建议先调用shutdown
方法,如果在一定时间内线程池没有正常关闭,可以再调用shutdownNow
方法。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ShutdownThreadPoolExample {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
3,
5,
10L,
TimeUnit.SECONDS,
workQueue
);
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
在上述代码中,我们先调用shutdown
方法启动关闭过程,然后通过awaitTermination
方法等待线程池在60秒内完成任务。如果60秒后线程池仍未关闭,我们调用shutdownNow
方法尝试强制停止线程池,并再次等待60秒。这样可以确保线程池在程序结束时能正确关闭,避免资源泄漏。
通过以上对Java项目中避免使用Executors
创建线程池原因的分析,以及正确创建和使用线程池的建议,开发者可以更好地利用线程池这一强大的工具,提升Java应用程序的性能和稳定性,避免因线程池使用不当而带来的各种问题。在实际开发中,应根据具体的业务场景,精心设计和调整线程池的参数,以充分发挥线程池的优势,满足系统的并发处理需求。