Java LinkedBlockingQueue 在线程池的特性
2023-08-228.0k 阅读
Java LinkedBlockingQueue 简介
在深入探讨 LinkedBlockingQueue
在线程池中的特性之前,先来了解一下 LinkedBlockingQueue
本身。LinkedBlockingQueue
是 Java 并发包 java.util.concurrent
中的一个重要类,它实现了 BlockingQueue
接口。BlockingQueue
提供了线程安全的队列操作,并且当队列满或空时,相关操作会阻塞调用线程。
LinkedBlockingQueue
基于链表结构实现,这使得它在存储元素时具有较好的灵活性,尤其适用于需要动态增长和收缩的队列场景。它有两个构造函数,一个可以指定容量,另一个不指定容量时默认容量为 Integer.MAX_VALUE
。
// 创建一个指定容量的 LinkedBlockingQueue
BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(10);
// 创建一个无界的 LinkedBlockingQueue
BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();
LinkedBlockingQueue 的基本操作
- 添加元素
add(E e)
:将指定元素插入此队列(如果立即可行且不会超过队列容量),成功时返回true
,如果当前没有可用空间则抛出IllegalStateException
。offer(E e)
:将指定元素插入此队列(如果立即可行且不会超过队列容量),成功时返回true
,如果当前没有可用空间则返回false
。put(E e)
:将指定元素插入此队列,将等待可用的空间(如果有必要)。
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
try {
queue.put(1);
queue.put(2);
queue.put(3);
// 下面这行代码会阻塞,直到有空间可用
queue.put(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
- 移除元素
remove(Object o)
:从此队列中移除指定元素的单个实例(如果存在)。poll()
:检索并移除队列的头,如果队列为空,则返回null
。take()
:检索并移除队列的头,等待直到队列中有元素可用。
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
queue.add(1);
queue.add(2);
try {
Integer removed = queue.take();
System.out.println("Removed: " + removed);
// 下面这行代码会阻塞,直到队列中有元素
removed = queue.take();
System.out.println("Removed: " + removed);
} catch (InterruptedException e) {
e.printStackTrace();
}
- 查询元素
element()
:检索但不移除队列的头,如果队列为空,则抛出NoSuchElementException
。peek()
:检索但不移除队列的头,如果队列为空,则返回null
。
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
queue.add(1);
Integer head = queue.peek();
System.out.println("Head: " + head);
LinkedBlockingQueue 在线程池中的应用
- 线程池概述
线程池是一种管理和复用线程的机制,它可以避免频繁创建和销毁线程带来的开销。在 Java 中,
ThreadPoolExecutor
是线程池的核心实现类。ThreadPoolExecutor
的构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 构造函数实现
}
其中,workQueue
就是用于存放等待执行任务的队列,而 LinkedBlockingQueue
是常用的队列实现之一。
2. 使用 LinkedBlockingQueue 的线程池示例
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class LinkedBlockingQueueThreadPoolExample {
public static void main(String[] args) {
// 创建一个容量为 5 的 LinkedBlockingQueue
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5);
// 创建线程池,核心线程数为 2,最大线程数为 4
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
4,
10,
TimeUnit.SECONDS,
workQueue
);
for (int i = 0; i < 10; i++) {
int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " has finished");
});
}
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
在这个示例中,我们创建了一个容量为 5 的 LinkedBlockingQueue
作为线程池的工作队列。线程池的核心线程数为 2,最大线程数为 4。当提交 10 个任务时,首先会由 2 个核心线程执行任务,当任务数超过核心线程数且队列未满时,任务会进入队列等待。当队列满后,才会创建新的线程(直到最大线程数)来执行任务。
LinkedBlockingQueue 在线程池中的特性
- 队列容量对线程池行为的影响
- 有界队列:当
LinkedBlockingQueue
设置了固定容量时,例如上述示例中的容量为 5。当提交的任务数超过核心线程数且队列已满时,线程池会创建新的线程(直到最大线程数)来处理任务。如果任务数继续增加,超过最大线程数和队列容量之和,就会触发拒绝策略。 - 无界队列:如果使用无界的
LinkedBlockingQueue
(即不指定容量),那么线程池的最大线程数设置将失效。因为只要任务不断提交,队列就会不断增长,线程池始终只会使用核心线程数来处理任务,不会创建超过核心线程数的线程,直到系统资源耗尽。
- 有界队列:当
- 阻塞特性
LinkedBlockingQueue
的put
和take
方法是阻塞的。在线程池中,当任务提交到队列时,如果队列已满,put
方法会阻塞提交任务的线程,直到队列有空间可用。同样,当线程池中的线程从队列中获取任务时,如果队列为空,take
方法会阻塞线程,直到队列中有任务。- 这种阻塞特性保证了线程池中的任务处理的有序性和稳定性。例如,在高并发场景下,大量任务同时提交到线程池,如果没有阻塞机制,可能会导致任务丢失或混乱处理。
- 线程安全
LinkedBlockingQueue
是线程安全的,它内部使用了锁机制来保证多线程环境下的操作一致性。在put
、take
等方法中,通过ReentrantLock
来实现线程同步。- 例如,
put
方法的实现如下:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
putLock.lockInterruptibly();
try {
while (count == capacity) {
notFull.await();
}
enqueue(node);
c = ++count;
if (c < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 1)
signalNotEmpty();
}
- 这里通过
putLock.lockInterruptibly()
来获取锁,并在操作完成后通过putLock.unlock()
释放锁,保证了在多线程环境下添加元素的线程安全性。
- 公平性
LinkedBlockingQueue
默认为非公平队列。非公平队列在竞争锁时,不保证等待时间最长的线程优先获取锁,可能会出现某些线程长时间等待的情况。- 可以通过自定义实现公平队列,在构造
LinkedBlockingQueue
时传入一个公平的ReentrantLock
。例如:
ReentrantLock fairLock = new ReentrantLock(true);
BlockingQueue<Integer> fairQueue = new LinkedBlockingQueue<>(10, fairLock);
- 但是公平队列通常会带来一定的性能开销,因为在竞争锁时需要额外的维护等待队列的操作。
- 内存使用
- 基于链表结构的
LinkedBlockingQueue
在内存使用上相对灵活。对于有界队列,它不会超过设定的容量所占用的内存(不考虑链表节点本身的一些额外开销)。而对于无界队列,随着任务的不断添加,内存占用会持续增长,直到达到系统资源的限制。 - 在实际应用中,需要根据系统的内存资源和任务负载来合理选择队列的容量。如果任务处理时间较长且任务数量较大,使用有界队列可以避免内存耗尽的风险,但可能会频繁触发拒绝策略;而使用无界队列虽然可以避免拒绝策略,但需要密切关注内存使用情况。
- 基于链表结构的
- 任务调度顺序
LinkedBlockingQueue
按照任务提交的顺序来调度任务。当线程从队列中获取任务时,会按照先进先出(FIFO)的原则取出任务。这种调度顺序在大多数场景下符合任务处理的逻辑,例如在处理一些需要顺序执行的业务逻辑时,如订单处理,先提交的订单先处理。- 然而,在某些特定场景下,可能需要按照其他顺序调度任务,比如按照任务的优先级。这时候就需要对
LinkedBlockingQueue
进行扩展或者使用其他支持优先级调度的队列实现,如PriorityBlockingQueue
。
- 与其他队列的对比
- ArrayBlockingQueue:
ArrayBlockingQueue
基于数组实现,而LinkedBlockingQueue
基于链表实现。ArrayBlockingQueue
在创建时必须指定容量,且容量不可变。它的内部使用一个数组来存储元素,在内存使用上相对紧凑。ArrayBlockingQueue
可以通过构造函数设置为公平队列,而LinkedBlockingQueue
默认为非公平队列。 - PriorityBlockingQueue:
PriorityBlockingQueue
是一个支持优先级的无界队列,它根据元素的自然顺序或者自定义的比较器来对元素进行排序。与LinkedBlockingQueue
不同,PriorityBlockingQueue
不保证同优先级元素的顺序,并且在队列为空时,take
方法不会阻塞,而是返回null
。在需要按照优先级处理任务的场景下,PriorityBlockingQueue
更为适用,而LinkedBlockingQueue
更适用于按顺序处理任务的场景。 - SynchronousQueue:
SynchronousQueue
是一个没有容量的队列,每个插入操作必须等待另一个线程的移除操作,反之亦然。它不存储元素,主要用于在线程之间传递数据。与LinkedBlockingQueue
相比,SynchronousQueue
更适合于需要高效传递任务且不希望任务在队列中等待的场景,而LinkedBlockingQueue
适合于任务需要在队列中暂存等待处理的场景。
- ArrayBlockingQueue:
实际应用场景
- 高并发任务处理
在 Web 服务器中,大量的 HTTP 请求可以作为任务提交到线程池,使用
LinkedBlockingQueue
作为工作队列。例如,一个电商网站在促销活动期间,会收到大量的订单请求。通过线程池和LinkedBlockingQueue
,可以将这些订单请求有序地处理,避免因为瞬间高并发导致系统崩溃。 - 消息处理系统
在消息队列系统中,生产者将消息发送到线程池的工作队列(
LinkedBlockingQueue
),消费者线程从队列中取出消息进行处理。例如,在一个日志收集系统中,各个应用程序将日志消息发送到线程池,线程池中的线程从LinkedBlockingQueue
中取出日志消息并进行存储或分析。 - 批处理任务
对于一些需要批量处理的任务,如数据导入。可以将数据分块作为任务提交到线程池,
LinkedBlockingQueue
可以暂存这些任务,线程池中的线程按照顺序依次处理这些任务,保证数据处理的准确性和顺序性。
性能优化与注意事项
- 队列容量的优化
- 根据任务的特点和系统资源来合理设置
LinkedBlockingQueue
的容量。如果任务处理时间短且数量大,可以适当增大队列容量,减少线程创建和销毁的开销;如果任务处理时间长,应设置较小的队列容量,避免任务长时间在队列中等待导致响应时间过长。 - 可以通过性能测试来确定最优的队列容量。例如,使用 JMeter 等工具模拟不同数量的并发任务,观察系统在不同队列容量下的吞吐量和响应时间,从而找到最佳配置。
- 根据任务的特点和系统资源来合理设置
- 线程池参数与队列的配合
- 线程池的核心线程数、最大线程数和
LinkedBlockingQueue
的容量需要相互配合。如果队列容量过大,而核心线程数过小,可能会导致任务长时间在队列中等待,降低系统的响应速度;如果队列容量过小,而最大线程数过大,可能会频繁创建和销毁线程,增加系统开销。 - 一般来说,可以根据任务的 I/O 特性来调整参数。对于 I/O 密集型任务,可以适当增加核心线程数,因为 I/O 操作会使线程阻塞,需要更多的线程来充分利用 CPU 资源;对于 CPU 密集型任务,核心线程数可以设置为 CPU 核心数或略小于 CPU 核心数,同时合理设置队列容量和最大线程数。
- 线程池的核心线程数、最大线程数和
- 避免死锁
在使用
LinkedBlockingQueue
在线程池中时,要注意避免死锁。死锁通常发生在多个线程相互等待对方释放资源的情况下。例如,如果一个线程从队列中取出任务后,在处理任务过程中又尝试向同一个队列中添加任务,并且添加操作因为队列满而阻塞,同时其他线程也在等待从该队列中取出任务,就可能会导致死锁。- 为了避免死锁,需要合理设计任务处理逻辑,确保任务在处理过程中不会产生循环依赖的资源获取操作。同时,可以使用
ThreadMXBean
等工具来检测死锁,及时发现和解决潜在的死锁问题。
- 为了避免死锁,需要合理设计任务处理逻辑,确保任务在处理过程中不会产生循环依赖的资源获取操作。同时,可以使用
- 内存管理
- 对于无界的
LinkedBlockingQueue
,要密切关注内存使用情况。可以通过 JVM 监控工具,如 VisualVM 或 JConsole,实时查看堆内存的使用情况。如果发现内存持续增长且没有稳定的趋势,可能需要调整任务处理逻辑,或者限制任务的提交速度,以避免内存耗尽。 - 对于有界队列,虽然不会出现无限制的内存增长,但也要注意队列满时的处理策略,避免因为频繁的拒绝策略导致任务丢失或系统异常。
- 对于无界的
总结 LinkedBlockingQueue 在线程池中的关键要点
- 队列容量决定线程池行为:有界队列在满时会促使线程池创建更多线程(直到最大线程数),无界队列会使最大线程数设置失效。
- 阻塞特性保证任务有序处理:
put
和take
方法的阻塞机制确保了任务在队列满或空时的正确处理,维持了线程池任务处理的稳定性。 - 线程安全是基础:内部锁机制保证了多线程环境下
LinkedBlockingQueue
操作的一致性和安全性。 - 公平性与性能权衡:默认非公平队列有较好性能,但在某些场景下可能需要考虑公平队列的实现。
- 内存使用需谨慎:根据任务负载合理选择有界或无界队列,避免内存问题。
- 任务调度按 FIFO 原则:符合大多数任务处理的顺序逻辑,特定场景下可考虑其他调度方式。
- 与其他队列各有适用场景:与
ArrayBlockingQueue
、PriorityBlockingQueue
和SynchronousQueue
等对比,根据需求选择合适的队列实现。 - 性能优化与注意事项:合理设置队列容量,配合线程池参数,避免死锁和关注内存管理是保证系统高效稳定运行的关键。
通过深入理解 LinkedBlockingQueue
在线程池中的这些特性和要点,并在实际应用中合理运用和优化,能够构建出高效、稳定的多线程应用程序,满足各种复杂的业务需求。无论是在高并发的 Web 应用、消息处理系统还是批处理任务场景中,LinkedBlockingQueue
与线程池的组合都能发挥重要作用,为系统的性能和可靠性提供有力保障。