Java Executors 的任务执行流程
Java Executors 的任务执行流程基础概念
在Java并发编程领域,Executors
类是一个关键的工具类,它提供了一系列静态方法用于创建各种类型的ExecutorService
,而ExecutorService
负责管理和执行提交的任务。理解Executors
背后的任务执行流程,对于高效地编写并发程序至关重要。
线程池概念与Executors
的关系
线程池是一种管理和复用线程的机制,它可以避免频繁创建和销毁线程带来的开销。Executors
类为创建不同配置的线程池提供了便捷的方式。通过Executors
创建的线程池本质上是ThreadPoolExecutor
的实例,只不过进行了不同参数的预配置。例如,Executors.newFixedThreadPool(int nThreads)
方法创建一个固定大小的线程池,池中线程数量固定为nThreads
。
Executors
创建的常见线程池类型及其执行流程
固定大小线程池(newFixedThreadPool
)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小为3的线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " completed");
});
}
executorService.shutdown();
}
}
在上述代码中,我们创建了一个固定大小为3的线程池。当提交5个任务时,首先3个任务会立即被线程池中的3个线程执行。这3个线程并行运行任务,每个任务模拟执行2秒。由于线程池大小固定为3,剩下的2个任务会进入任务队列等待。一旦正在执行的任务中有一个完成,线程池中的线程会从任务队列中取出一个任务继续执行,直到所有任务完成。
缓存线程池(newCachedThreadPool
)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " completed");
});
}
executorService.shutdown();
}
}
缓存线程池的特点是它会根据需要创建新线程。当提交任务时,如果线程池中有空闲线程,就会复用空闲线程执行任务。如果没有空闲线程,就会创建一个新线程来执行任务。在上述代码中,提交5个任务时,由于一开始没有空闲线程,会创建5个新线程分别执行这5个任务。缓存线程池还有一个特性,线程如果在60秒内没有任务执行,就会被回收。所以在任务执行完一段时间后,如果没有新任务提交,线程池中的线程数量会逐渐减少。
单线程线程池(newSingleThreadExecutor
)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
int taskNumber = i;
executorService.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " completed");
});
}
executorService.shutdown();
}
}
单线程线程池只有一个线程来执行任务。当提交多个任务时,这些任务会按照提交顺序依次在这个唯一的线程中执行。在上述代码中,5个任务会逐个执行,前一个任务完成后,下一个任务才会开始执行,确保了任务执行的顺序性。
调度线程池(newScheduledThreadPool
)
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
scheduledExecutorService.schedule(() -> {
System.out.println("Delayed task is running on thread " + Thread.currentThread().getName());
}, 3, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println("Periodic task is running on thread " + Thread.currentThread().getName());
}, 0, 2, TimeUnit.SECONDS);
// 主线程睡眠10秒,确保任务有足够时间执行
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledExecutorService.shutdown();
}
}
调度线程池可以执行延迟任务和周期性任务。在上述代码中,schedule
方法用于提交一个延迟任务,该任务会在3秒后执行。scheduleAtFixedRate
方法用于提交一个周期性任务,该任务会在初始延迟0秒后开始执行,然后每隔2秒执行一次。调度线程池中有2个线程,可以同时执行多个调度任务。
ThreadPoolExecutor
内部执行流程深度剖析
Executors
创建的线程池本质上是ThreadPoolExecutor
的实例,深入理解ThreadPoolExecutor
的执行流程对于掌握Executors
至关重要。
核心线程与最大线程
ThreadPoolExecutor
中有两个重要的线程数量参数:核心线程数(corePoolSize
)和最大线程数(maximumPoolSize
)。核心线程数是线程池中保持活动的最小线程数,即使这些线程暂时没有任务执行,它们也不会被销毁(除非设置了allowCoreThreadTimeOut
为true
)。最大线程数是线程池能够容纳的最大线程数量。
当提交一个任务时,ThreadPoolExecutor
首先会判断当前运行的线程数是否小于核心线程数。如果小于核心线程数,就会创建一个新的核心线程来执行任务。例如:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorExample {
public static void main(String[] args) {
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
10, TimeUnit.SECONDS,
taskQueue
);
for (int i = 0; i < 5; i++) {
int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " completed");
});
}
executor.shutdown();
}
}
在上述代码中,核心线程数为2。当提交5个任务时,前2个任务会分别由新创建的2个核心线程执行。
任务队列
如果当前运行的线程数达到了核心线程数,那么新提交的任务会被放入任务队列。ThreadPoolExecutor
支持多种类型的任务队列,如ArrayBlockingQueue
、LinkedBlockingQueue
等。在上述代码中,我们使用了LinkedBlockingQueue
,它是一个无界队列(这里设置了容量为10,实际也可设置为无界)。当核心线程都在忙碌时,后续任务会进入任务队列等待。如果任务队列也满了,并且当前运行的线程数小于最大线程数,ThreadPoolExecutor
会创建新的非核心线程来执行任务。
拒绝策略
当任务队列已满,且线程数达到了最大线程数,此时再提交任务,ThreadPoolExecutor
会采用拒绝策略来处理。常见的拒绝策略有以下几种:
- AbortPolicy:这是默认的拒绝策略。当任务被拒绝时,会抛出
RejectedExecutionException
异常。 - CallerRunsPolicy:当任务被拒绝时,会在调用
execute
方法的线程中直接执行被拒绝的任务。 - DiscardPolicy:直接丢弃被拒绝的任务,不做任何处理。
- DiscardOldestPolicy:丢弃任务队列中最老的任务,然后尝试将新任务加入任务队列。
例如,我们可以在创建ThreadPoolExecutor
时指定拒绝策略:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.RejectedExecutionHandler;
public class ThreadPoolExecutorRejectionExample {
public static void main(String[] args) {
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(2);
RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
4,
10, TimeUnit.SECONDS,
taskQueue,
rejectionHandler
);
for (int i = 0; i < 7; i++) {
int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " completed");
});
}
executor.shutdown();
}
}
在上述代码中,我们设置了任务队列容量为2,当提交7个任务时,由于核心线程数为2,任务队列容量为2,最大线程数为4,前4个任务会被核心线程和新创建的非核心线程执行,任务队列会满。第5个任务会触发拒绝策略,这里采用了CallerRunsPolicy
,所以第5个任务会在主线程中执行。
任务执行过程中的线程生命周期管理
在ThreadPoolExecutor
中,线程的生命周期管理是任务执行流程的重要部分。
线程创建与启动
当需要创建新线程时,ThreadPoolExecutor
会通过ThreadFactory
来创建线程。默认的ThreadFactory
会创建一个新的线程,并设置线程的名称。创建好的线程会调用start
方法启动,然后进入RUNNABLE
状态,开始执行任务。
线程空闲与回收
当一个线程执行完任务后,它会尝试从任务队列中获取新的任务。如果任务队列中有任务,线程会继续执行任务。如果任务队列中没有任务,且当前运行的线程数大于核心线程数(当allowCoreThreadTimeOut
为false
时),那么这个空闲线程会在keepAliveTime
时间后被回收。例如,在我们之前创建的ThreadPoolExecutor
中,设置了keepAliveTime
为10秒,当一个非核心线程空闲10秒后,就会被回收。
Executors
在实际项目中的应用场景
Web服务器中的请求处理
在Web服务器中,大量的客户端请求需要处理。使用Executors
创建的线程池可以高效地处理这些请求。例如,使用固定大小线程池可以控制并发处理请求的数量,避免资源耗尽。缓存线程池可以根据请求的数量动态调整线程数量,适用于请求量波动较大的场景。
数据处理与计算任务
在数据处理和计算密集型任务中,如大数据分析、机器学习模型训练等,Executors
可以用于并行处理任务。调度线程池可以用于定时执行数据采集任务,或者周期性地运行模型评估任务。
分布式系统中的任务调度
在分布式系统中,Executors
同样发挥着重要作用。例如,在一个分布式文件系统中,Executors
可以用于调度文件的上传、下载和备份任务。单线程线程池可以确保某些关键任务(如元数据更新)按顺序执行,避免数据一致性问题。
优化Executors
任务执行流程的技巧
合理设置线程池参数
- 核心线程数:应根据任务的类型和系统资源来设置。对于CPU密集型任务,核心线程数一般设置为CPU核心数;对于I/O密集型任务,可以适当增加核心线程数,以充分利用CPU资源等待I/O操作完成。
- 最大线程数:要考虑系统的最大资源限制,避免创建过多线程导致系统资源耗尽。
- 任务队列容量:根据任务的数量和处理速度来设置。如果任务队列容量过小,可能会频繁触发拒绝策略;如果容量过大,可能会导致任务长时间等待。
选择合适的线程池类型
- 如果任务执行时间短且数量多,缓存线程池可能是一个不错的选择。
- 对于需要严格控制并发数量的场景,固定大小线程池更为合适。
- 当需要按顺序执行任务时,单线程线程池是首选。
- 对于有延迟或周期性执行需求的任务,调度线程池是最佳选择。
监控与调优
在实际应用中,应该对线程池的运行状态进行监控,如线程池中的线程数量、任务队列的大小、任务的执行时间等。通过监控数据,可以及时发现性能瓶颈,并对线程池参数进行调整,以优化任务执行流程。例如,可以使用Java的ManagementFactory
获取线程池的相关指标:
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolMonitoringExample {
public static void main(String[] args) {
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
4,
10, TimeUnit.SECONDS,
taskQueue
);
for (int i = 0; i < 5; i++) {
int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " completed");
});
}
// 监控线程池状态
Thread monitoringThread = new Thread(() -> {
while (true) {
ThreadInfo threadInfo = ManagementFactory.getThreadMXBean().getThreadInfo(executor.getActiveCount());
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Task queue size: " + executor.getQueue().size());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
monitoringThread.setDaemon(true);
monitoringThread.start();
executor.shutdown();
}
}
在上述代码中,我们启动了一个监控线程,每隔5秒打印一次线程池中的活动线程数和任务队列大小,通过这些信息可以进一步优化线程池的配置。
并发任务执行中的异常处理
在Executors
执行任务过程中,异常处理是不可忽视的环节。
submit
方法与异常处理
当使用ExecutorService
的submit
方法提交任务时,任务执行过程中抛出的异常不会直接在主线程中显示。例如:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class SubmitExceptionExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<?> future = executorService.submit(() -> {
throw new RuntimeException("Task failed");
});
try {
future.get();
} catch (Exception e) {
System.out.println("Caught exception: " + e.getMessage());
}
executorService.shutdown();
}
}
在上述代码中,任务抛出了一个RuntimeException
。如果不调用future.get()
方法,这个异常不会被捕获。调用future.get()
方法时,如果任务已经完成并抛出异常,get
方法会重新抛出这个异常,从而可以在try - catch
块中捕获并处理。
execute
方法与异常处理
使用ExecutorService
的execute
方法提交任务时,如果任务在执行过程中抛出异常,默认情况下这个异常会导致执行任务的线程终止,但不会在主线程中直接显示。可以通过自定义Thread.UncaughtExceptionHandler
来捕获这种异常:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecuteExceptionExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
System.out.println("Caught exception in thread " + t.getName() + ": " + e.getMessage());
});
executorService.execute(() -> {
throw new RuntimeException("Task failed");
});
executorService.shutdown();
}
}
在上述代码中,我们设置了默认的UncaughtExceptionHandler
,当执行任务的线程抛出未捕获的异常时,会在UncaughtExceptionHandler
中进行处理。
总结Executors
任务执行流程的要点
- 线程池类型选择:根据任务特性选择合适的线程池类型,如固定大小线程池用于控制并发量,缓存线程池用于动态调整线程数量等。
- 参数设置:合理设置核心线程数、最大线程数、任务队列容量等参数,以优化任务执行效率和系统资源利用。
- 异常处理:了解
submit
和execute
方法在异常处理上的差异,采取相应的异常捕获和处理机制。 - 监控与调优:通过监控线程池的运行状态,如线程数、任务队列大小等指标,对线程池进行动态调优,确保任务执行流程的高效性和稳定性。
通过深入理解Executors
的任务执行流程,开发者可以在并发编程中充分发挥Java多线程的优势,编写高效、稳定的应用程序。无论是在小型应用还是大型分布式系统中,合理运用Executors
都能显著提升系统性能。