Java并发包中的Executor框架
Java并发包中的Executor框架概述
在Java的并发编程领域中,Executor
框架是一个极为重要的工具集,它为我们管理和执行任务提供了一种优雅且高效的方式。在传统的Java线程使用中,我们通常会通过创建Thread
对象,然后调用start()
方法来启动线程。然而,随着并发任务数量的增加以及对线程管理复杂度的提升,这种方式变得难以维护和扩展。Executor
框架应运而生,它将任务的提交与执行分离,使得我们能够更方便地管理线程资源,提高程序的性能和可维护性。
Executor
框架主要由三个核心部分组成:Executor
接口、ExecutorService
接口及其实现类,以及ThreadPoolExecutor
类等相关工具类。Executor
接口是整个框架的基础,它定义了一个简单的方法execute(Runnable task)
,用于提交一个Runnable
任务。ExecutorService
接口继承自Executor
接口,在其基础上增加了生命周期管理和任务提交的更多方法,比如submit(Callable<T> task)
,它可以返回一个Future
对象,用于获取任务的执行结果。ThreadPoolExecutor
类则是ExecutorService
接口的一个重要实现类,它提供了灵活的线程池管理功能,我们可以通过配置参数来控制线程池的大小、任务队列的容量等。
Executor接口
Executor
接口是Executor
框架的最基础部分,其定义非常简单:
public interface Executor {
void execute(Runnable task);
}
这个接口只有一个方法execute
,该方法接收一个Runnable
任务,并异步执行这个任务。这里所谓的“异步执行”意味着调用execute
方法后,调用线程不会等待任务执行完成,而是继续执行后续的代码。
下面是一个简单的使用Executor
接口的示例:
import java.util.concurrent.Executor;
class MyExecutor implements Executor {
@Override
public void execute(Runnable task) {
new Thread(task).start();
}
}
public class ExecutorExample {
public static void main(String[] args) {
Executor executor = new MyExecutor();
executor.execute(() -> System.out.println("Task executed by custom executor"));
}
}
在上述代码中,我们定义了一个MyExecutor
类实现了Executor
接口,在execute
方法中,我们简单地创建了一个新线程并启动它来执行传入的Runnable
任务。在main
方法中,我们创建了MyExecutor
的实例,并提交了一个任务。虽然这是一个简单的实现,但它展示了Executor
接口的基本用法。
ExecutorService接口
ExecutorService
接口继承自Executor
接口,为我们提供了更多的功能,主要包括生命周期管理和任务提交的扩展方法。
生命周期管理方法
shutdown()
:启动一个有序关闭,不再接受新任务,但会继续执行已提交的任务。shutdownNow()
:尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。isShutdown()
:如果已经调用了shutdown()
或shutdownNow()
,则返回true
。isTerminated()
:如果所有任务都已完成关闭,则返回true
。
任务提交扩展方法
submit(Callable<T> task)
:提交一个返回值的任务,并返回一个表示任务执行结果的Future
对象。submit(Runnable task)
:提交一个Runnable
任务,并返回一个表示任务执行结果的Future
对象,Future.get()
方法将返回null
。submit(Runnable task, T result)
:提交一个Runnable
任务,并返回一个表示任务执行结果的Future
对象,Future.get()
方法将返回传入的result
对象。
下面是一个使用ExecutorService
接口的示例:
import java.util.concurrent.*;
public class ExecutorServiceExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<String> future1 = executorService.submit(() -> {
TimeUnit.SECONDS.sleep(2);
return "Task 1 completed";
});
Future<String> future2 = executorService.submit(() -> {
TimeUnit.SECONDS.sleep(1);
return "Task 2 completed";
});
try {
System.out.println(future1.get());
System.out.println(future2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
在上述代码中,我们通过Executors.newFixedThreadPool(2)
创建了一个固定大小为2的线程池。然后提交了两个Callable
任务,这两个任务分别模拟了不同的执行时间。通过Future.get()
方法,我们可以获取任务的执行结果。最后,调用shutdown()
方法关闭线程池。
ThreadPoolExecutor类
ThreadPoolExecutor
类是ExecutorService
接口的核心实现类,它提供了高度可定制的线程池管理功能。通过构造函数,我们可以设置线程池的各种参数,以满足不同的应用场景需求。
ThreadPoolExecutor的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
:核心线程数,线程池中会一直存活的线程数量,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
。maximumPoolSize
:线程池允许创建的最大线程数。当任务队列已满且活动线程数小于最大线程数时,线程池会创建新的线程来执行任务。keepAliveTime
:当线程数大于核心线程数时,多余的空闲线程的存活时间。即如果一个线程空闲时间超过了keepAliveTime
,且当前线程数大于核心线程数,那么这个线程将会被终止。unit
:keepAliveTime
的时间单位。workQueue
:任务队列,用于存放等待执行的任务。常用的任务队列有ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
等。threadFactory
:线程工厂,用于创建新的线程。通过线程工厂,我们可以对创建的线程进行一些定制,比如设置线程名称、线程优先级等。handler
:拒绝策略,当任务队列已满且线程数达到最大线程数时,新提交的任务将被拒绝,此时会调用拒绝策略来处理被拒绝的任务。常用的拒绝策略有AbortPolicy
(默认策略,直接抛出异常)、CallerRunsPolicy
(将任务交回给调用者执行)、DiscardPolicy
(直接丢弃任务)和DiscardOldestPolicy
(丢弃队列中最老的任务,然后尝试提交新任务)。
示例:自定义ThreadPoolExecutor
import java.util.concurrent.*;
public class ThreadPoolExecutorExample {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
4,
10,
TimeUnit.SECONDS,
workQueue,
threadFactory,
handler
);
for (int i = 0; i < 6; i++) {
int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
在上述代码中,我们创建了一个ThreadPoolExecutor
实例,核心线程数为2,最大线程数为4,任务队列容量为2,线程存活时间为10秒。我们提交了6个任务,由于任务队列只能容纳2个任务,且核心线程数为2,所以一开始会有2个任务在核心线程中执行,2个任务进入任务队列,另外2个任务会导致线程池创建新的线程(因为最大线程数为4)。当所有任务提交完毕后,我们调用shutdown()
方法关闭线程池。
常用的线程池创建方式
除了直接使用ThreadPoolExecutor
类来创建线程池外,Executors
类还提供了一些静态工厂方法来创建不同类型的线程池,这些线程池适用于不同的场景。
newFixedThreadPool(int nThreads)
创建一个固定大小的线程池,线程池中的线程数量始终保持为nThreads
。当有新任务提交时,如果线程池中有空闲线程,则立即执行任务;如果没有空闲线程,则任务会被放入任务队列中等待。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
这种线程池适用于对线程数量有明确限制,且任务执行时间相对较长的场景,比如数据库连接池的管理。
newCachedThreadPool()
创建一个可缓存的线程池,线程池的大小会根据任务的数量自动调整。如果线程池中的线程空闲时间超过60秒,那么这些线程将会被回收。当有新任务提交时,如果线程池中有空闲线程,则立即执行任务;如果没有空闲线程,则会创建新的线程来执行任务。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
这种线程池适用于任务执行时间较短,但任务数量较多且提交频率不稳定的场景,比如处理大量的短期异步任务。
newSingleThreadExecutor()
创建一个单线程的线程池,线程池中只有一个线程。所有任务会按照提交的顺序依次在这个线程中执行。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
这种线程池适用于需要顺序执行任务,且不希望多个任务并发执行的场景,比如对文件的顺序读写操作。
newScheduledThreadPool(int corePoolSize)
创建一个固定大小的线程池,支持定时及周期性任务执行。线程池中的核心线程数为corePoolSize
。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
scheduledThreadPool.schedule(() -> System.out.println("Delayed task executed"), 5, TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(() -> System.out.println("Periodic task executed"), 0, 2, TimeUnit.SECONDS);
在上述代码中,schedule
方法用于延迟执行任务,scheduleAtFixedRate
方法用于周期性执行任务。这种线程池适用于需要定时执行任务的场景,比如定时备份数据、定时清理缓存等。
Future和Callable接口
在Executor
框架中,Future
和Callable
接口是非常重要的组成部分,它们为我们提供了获取任务执行结果以及处理异步任务的能力。
Callable接口
Callable
接口类似于Runnable
接口,不同之处在于Callable
接口的call
方法可以返回一个值,并且可以抛出异常。
public interface Callable<V> {
V call() throws Exception;
}
下面是一个简单的Callable
示例:
import java.util.concurrent.Callable;
class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return 42;
}
}
在上述代码中,MyCallable
类实现了Callable
接口,call
方法模拟了一个耗时2秒的任务,并返回一个整数值42。
Future接口
Future
接口用于表示异步任务的执行结果,我们可以通过Future
对象获取任务的执行结果、取消任务、判断任务是否完成等。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
cancel(boolean mayInterruptIfRunning)
:尝试取消任务的执行。如果任务已经完成、已经被取消或者由于某些原因无法取消,则返回false
;如果任务还未开始执行,则任务将被取消且返回true
;如果任务正在执行,且mayInterruptIfRunning
为true
,则会尝试中断执行任务的线程来取消任务。isCancelled()
:判断任务是否在完成之前被取消。isDone()
:判断任务是否已经完成,任务完成包括正常结束、异常结束或被取消。get()
:获取任务的执行结果,如果任务还未完成,则调用线程会被阻塞,直到任务完成。get(long timeout, TimeUnit unit)
:在指定的时间内获取任务的执行结果,如果在指定时间内任务未完成,则抛出TimeoutException
。
下面是一个使用Future
和Callable
的示例:
import java.util.concurrent.*;
public class FutureAndCallableExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(new MyCallable());
try {
System.out.println("Task result: " + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return 42;
}
}
在上述代码中,我们通过ExecutorService
提交了一个MyCallable
任务,并获取了对应的Future
对象。通过future.get()
方法,我们获取了任务的执行结果。
CompletionService接口
CompletionService
接口结合了Executor
和BlockingQueue
的功能,它允许我们在任务完成时获取任务的结果,而不需要按照任务提交的顺序获取。
CompletionService的使用
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);
for (int i = 0; i < 5; i++) {
int taskNumber = i;
completionService.submit(() -> {
Thread.sleep((long) (Math.random() * 3000));
return taskNumber * taskNumber;
});
}
for (int i = 0; i < 5; i++) {
try {
Future<Integer> future = completionService.take();
System.out.println("Task result: " + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executorService.shutdown();
}
}
在上述代码中,我们创建了一个ExecutorCompletionService
实例,并提交了5个任务。通过completionService.take()
方法,我们可以按照任务完成的顺序获取任务的结果。这种方式在处理多个异步任务,且希望尽快获取已完成任务的结果时非常有用。
Fork/Join框架
Fork/Join
框架是Executor
框架的一个扩展,它专门用于处理可以被分解为多个小任务的大任务,通过递归分解任务并并行执行这些小任务,最后合并小任务的结果来得到大任务的结果。
Fork/Join框架的核心类
ForkJoinPool
:Fork/Join
框架的线程池,它负责管理和调度ForkJoinTask
任务。ForkJoinTask
:表示一个可以被分解和并行执行的任务,它是一个抽象类,有两个主要的实现类:RecursiveAction
(用于没有返回值的任务)和RecursiveTask
(用于有返回值的任务)。
示例:使用Fork/Join框架计算数组元素之和
import java.util.concurrent.*;
public class ForkJoinSumExample {
public static void main(String[] args) {
long[] numbers = new long[1000000];
for (int i = 0; i < numbers.length; i++) {
numbers[i] = i + 1;
}
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new SumTask(numbers, 0, numbers.length);
long result = forkJoinPool.invoke(task);
System.out.println("Sum of array elements: " + result);
}
}
class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private long[] numbers;
private int start;
private int end;
public SumTask(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
} else {
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(numbers, start, mid);
SumTask rightTask = new SumTask(numbers, mid, end);
leftTask.fork();
long rightResult = rightTask.compute();
long leftResult = leftTask.join();
return leftResult + rightResult;
}
}
}
在上述代码中,我们定义了一个SumTask
类继承自RecursiveTask
,用于计算数组指定区间内元素的和。通过ForkJoinPool
提交任务,并最终获取计算结果。Fork/Join
框架通过递归分解任务,利用多线程并行计算,提高了计算效率。
总结
Executor
框架是Java并发编程中的一个强大工具集,它涵盖了从简单的任务执行到复杂的线程池管理、异步任务处理以及任务分解并行计算等多个方面。通过合理使用Executor
框架,我们可以有效地提高程序的性能、可维护性和资源利用率。无论是处理大量的短期任务,还是执行长时间运行的任务,Executor
框架都提供了丰富的功能和灵活的配置选项来满足不同的需求。同时,结合Future
、Callable
、CompletionService
以及Fork/Join
框架等相关组件,我们能够更全面地掌控并发任务的执行和结果处理,从而编写出高效、健壮的并发程序。在实际开发中,深入理解并熟练运用Executor
框架是每个Java开发者必备的技能之一。
希望通过本文的介绍和示例代码,读者能够对Java并发包中的Executor框架
有一个全面且深入的了解,并能够在自己的项目中灵活运用这些知识来优化并发程序的性能。