Java 线程池的任务提交
Java 线程池的任务提交基础概念
在Java多线程编程中,线程池是一种非常重要的工具,它可以有效地管理和复用线程,提高程序的性能和资源利用率。而任务提交则是线程池使用过程中的关键操作,即将需要执行的任务交给线程池来处理。
线程池的核心是一个线程队列和一组工作线程。当我们提交任务时,任务首先被放入任务队列中,然后工作线程从任务队列中取出任务并执行。这种机制实现了任务的异步执行,减少了线程创建和销毁的开销。
Java提供了java.util.concurrent.Executor
框架来支持线程池的使用。其中,ExecutorService
接口扩展了Executor
接口,提供了更多控制线程池生命周期和任务提交的方法。ThreadPoolExecutor
类是ExecutorService
接口的一个实现类,它提供了灵活的线程池配置和任务提交策略。
任务提交的基本方法
在ExecutorService
接口中,定义了两种主要的任务提交方法:execute(Runnable task)
和submit(Callable<T> task)
。
execute(Runnable task)
方法:该方法用于提交一个Runnable
任务。Runnable
接口只有一个run()
方法,该方法没有返回值。当任务执行完毕后,我们无法获取任务的执行结果。示例代码如下:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecuteExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提交Runnable任务
executorService.execute(() -> {
System.out.println("Task executed by thread: " + Thread.currentThread().getName());
});
// 关闭线程池
executorService.shutdown();
}
}
在上述代码中,我们创建了一个固定大小为2的线程池,并使用execute
方法提交了一个Runnable
任务。任务在一个线程池中线程上执行,并打印出执行任务的线程名称。
submit(Callable<T> task)
方法:该方法用于提交一个Callable
任务。Callable
接口有一个call()
方法,该方法可以返回一个值。submit
方法返回一个Future<T>
对象,通过这个对象我们可以获取任务的执行结果,还可以取消任务的执行。示例代码如下:
import java.util.concurrent.*;
public class SubmitExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提交Callable任务
Future<Integer> future = executorService.submit(() -> {
System.out.println("Task executed by thread: " + Thread.currentThread().getName());
return 42;
});
try {
// 获取任务执行结果
Integer result = future.get();
System.out.println("Task result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 关闭线程池
executorService.shutdown();
}
}
在上述代码中,我们提交了一个Callable
任务,该任务返回一个整数值42。通过Future
对象的get()
方法,我们可以获取任务的执行结果。
任务提交与线程池状态
线程池有几种不同的状态,如RUNNING
、SHUTDOWN
、STOP
、TIDYING
和TERMINATED
。任务提交的行为会受到线程池状态的影响。
-
RUNNING
状态:在线程池处于RUNNING
状态时,可以正常提交任务。新提交的任务会被放入任务队列或直接分配给空闲线程执行。 -
SHUTDOWN
状态:当调用ExecutorService
的shutdown()
方法后,线程池进入SHUTDOWN
状态。在这个状态下,不再接受新的任务提交,但会继续执行已提交到任务队列中的任务。示例代码如下:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ShutdownExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提交任务
executorService.execute(() -> {
System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
});
// 关闭线程池
executorService.shutdown();
// 尝试提交新任务,会被拒绝
executorService.execute(() -> {
System.out.println("Task 2 (should not execute)");
});
// 等待已提交任务执行完毕
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
}
在上述代码中,调用shutdown()
方法后,再提交新任务会被拒绝,线程池会继续执行已提交的任务,直到所有任务完成或超时。
-
STOP
状态:当调用ExecutorService
的shutdownNow()
方法后,线程池进入STOP
状态。在这个状态下,不再接受新的任务提交,并且会尝试停止正在执行的任务,清空任务队列。 -
TIDYING
和TERMINATED
状态:当所有任务执行完毕,且线程池中的工作线程都已关闭时,线程池进入TIDYING
状态,然后很快进入TERMINATED
状态。在这两个状态下,无法再提交任务。
任务提交策略
ThreadPoolExecutor
类提供了几种任务提交策略,这些策略决定了当任务队列已满且线程池中的线程都在忙碌时,新提交的任务如何处理。
- AbortPolicy(默认策略):当任务无法执行时,直接抛出
RejectedExecutionException
异常。示例代码如下:
import java.util.concurrent.*;
public class AbortPolicyExample {
public static void main(String[] args) {
// 创建一个固定大小为2,任务队列容量为1的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
2,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new ThreadPoolExecutor.AbortPolicy());
// 提交任务
executor.execute(() -> {
System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
System.out.println("Task 3 (should throw exception)");
});
// 关闭线程池
executor.shutdown();
}
}
在上述代码中,由于任务队列已满且线程池中的线程都在忙碌,提交第三个任务时会抛出RejectedExecutionException
异常。
- CallerRunsPolicy:当任务无法执行时,由提交任务的线程来执行该任务。这样可以降低新任务的提交速度,示例代码如下:
import java.util.concurrent.*;
public class CallerRunsPolicyExample {
public static void main(String[] args) {
// 创建一个固定大小为2,任务队列容量为1的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
2,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy());
// 提交任务
executor.execute(() -> {
System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
System.out.println("Task 3 executed by main thread: " + Thread.currentThread().getName());
});
// 关闭线程池
executor.shutdown();
}
}
在上述代码中,当任务队列已满且线程池中的线程都在忙碌时,第三个任务由主线程执行。
- DiscardPolicy:当任务无法执行时,直接丢弃该任务,不抛出任何异常。示例代码如下:
import java.util.concurrent.*;
public class DiscardPolicyExample {
public static void main(String[] args) {
// 创建一个固定大小为2,任务队列容量为1的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
2,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardPolicy());
// 提交任务
executor.execute(() -> {
System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
System.out.println("Task 3 (should be discarded)");
});
// 关闭线程池
executor.shutdown();
}
}
在上述代码中,当任务队列已满且线程池中的线程都在忙碌时,第三个任务被直接丢弃,不会执行也不会抛出异常。
- DiscardOldestPolicy:当任务无法执行时,丢弃任务队列中最老的任务,然后尝试提交新任务。示例代码如下:
import java.util.concurrent.*;
public class DiscardOldestPolicyExample {
public static void main(String[] args) {
// 创建一个固定大小为2,任务队列容量为1的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
2,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardOldestPolicy());
// 提交任务
executor.execute(() -> {
System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
System.out.println("Task 2 (should be discarded)");
});
executor.execute(() -> {
System.out.println("Task 3 executed by thread: " + Thread.currentThread().getName());
});
// 关闭线程池
executor.shutdown();
}
}
在上述代码中,当任务队列已满且线程池中的线程都在忙碌时,第二个任务被丢弃,第三个任务被提交执行。
批量任务提交
在实际应用中,我们经常需要提交一批任务并等待所有任务执行完毕。Java提供了invokeAll
方法来实现这一功能。invokeAll
方法接受一个Collection
集合,集合中包含多个Callable
任务。该方法会等待所有任务执行完毕,并返回一个包含每个任务执行结果的List<Future<T>>
。示例代码如下:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class InvokeAllExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 创建多个Callable任务
List<Callable<Integer>> tasks = new ArrayList<>();
tasks.add(() -> {
System.out.println("Task 1 executed by thread: " + Thread.currentThread().getName());
return 10;
});
tasks.add(() -> {
System.out.println("Task 2 executed by thread: " + Thread.currentThread().getName());
return 20;
});
// 提交并等待所有任务执行完毕
try {
List<Future<Integer>> futures = executorService.invokeAll(tasks);
for (Future<Integer> future : futures) {
try {
System.out.println("Task result: " + future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭线程池
executorService.shutdown();
}
}
在上述代码中,我们创建了两个Callable
任务,并使用invokeAll
方法提交这两个任务。线程池会并行执行这些任务,当所有任务执行完毕后,我们可以通过Future
对象获取每个任务的执行结果。
异常处理与任务提交
当使用execute
方法提交Runnable
任务时,如果任务执行过程中抛出异常,默认情况下这个异常会被线程池捕获并打印到标准错误输出,但不会被上层调用者直接捕获。如果我们希望在任务执行出现异常时能够进行自定义处理,可以通过Thread.UncaughtExceptionHandler
来实现。示例代码如下:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExceptionHandlingExecuteExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 设置线程的UncaughtExceptionHandler
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
System.out.println("Uncaught exception in thread " + thread.getName() + ": " + exception.getMessage());
});
// 提交可能抛出异常的任务
executorService.execute(() -> {
throw new RuntimeException("Task execution failed");
});
// 关闭线程池
executorService.shutdown();
}
}
在上述代码中,我们通过Thread.setDefaultUncaughtExceptionHandler
方法设置了全局的未捕获异常处理器,当任务执行抛出异常时,会调用这个处理器进行处理。
当使用submit
方法提交Callable
任务时,任务执行过程中抛出的异常会被封装在Future
对象的get()
方法中。我们可以通过try-catch
块捕获ExecutionException
和InterruptedException
来处理异常。示例代码如下:
import java.util.concurrent.*;
public class ExceptionHandlingSubmitExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提交可能抛出异常的Callable任务
Future<Integer> future = executorService.submit(() -> {
throw new RuntimeException("Task execution failed");
});
try {
Integer result = future.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println("Exception caught: " + e.getMessage());
}
// 关闭线程池
executorService.shutdown();
}
}
在上述代码中,通过try-catch
块捕获get()
方法抛出的异常,从而处理任务执行过程中抛出的异常。
任务提交与性能优化
合理的任务提交策略和线程池配置对于提高程序性能至关重要。
-
线程池大小的选择:线程池大小的选择要根据任务的类型(CPU密集型或I/O密集型)来决定。对于CPU密集型任务,线程池大小一般设置为CPU核心数加1,以充分利用CPU资源并避免线程上下文切换的开销。对于I/O密集型任务,由于I/O操作会使线程阻塞,线程池大小可以适当增大,以提高系统的并发处理能力。例如,可以根据经验公式
线程数 = CPU核心数 * (1 + 平均I/O等待时间 / 平均CPU计算时间)
来估算线程池大小。 -
任务队列的选择:任务队列的类型和容量也会影响性能。如果任务执行时间较短且任务数量较多,可以选择无界队列(如
LinkedBlockingQueue
),这样可以避免任务被拒绝,但可能会导致内存消耗过大。如果任务执行时间较长且希望控制内存使用,可以选择有界队列(如ArrayBlockingQueue
),并根据实际情况设置合适的队列容量。 -
任务提交频率:如果任务提交频率过高,可能会导致任务队列迅速填满,进而触发任务拒绝策略。因此,需要根据线程池的处理能力合理控制任务提交频率,可以通过流量控制等手段来实现。
-
使用合适的线程池类型:除了
ThreadPoolExecutor
,Java还提供了一些预定义的线程池,如Executors.newFixedThreadPool
(固定大小线程池)、Executors.newCachedThreadPool
(可缓存线程池)、Executors.newSingleThreadExecutor
(单线程线程池)等。不同类型的线程池适用于不同的场景,选择合适的线程池类型可以提高性能。例如,newFixedThreadPool
适用于需要控制并发线程数的场景,newCachedThreadPool
适用于任务执行时间短且提交频率高的场景,newSingleThreadExecutor
适用于需要顺序执行任务的场景。
总结任务提交在Java线程池中的要点
任务提交是Java线程池使用过程中的核心操作,涉及到任务提交方法的选择、线程池状态对任务提交的影响、任务提交策略的应用、批量任务提交、异常处理以及性能优化等多个方面。深入理解这些要点并根据实际应用场景合理配置和使用线程池,可以有效地提高程序的性能和稳定性,充分发挥多线程编程的优势。在实际开发中,需要根据具体的业务需求和系统资源情况,精心调整线程池的参数和任务提交策略,以达到最佳的运行效果。同时,要注意处理任务执行过程中可能出现的异常,确保程序的健壮性。通过合理运用线程池的任务提交机制,可以使Java应用程序在多线程环境下高效、稳定地运行。