MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java LinkedBlockingQueue的链表结构及性能特点

2022-05-308.0k 阅读

Java LinkedBlockingQueue 的链表结构

  1. 基本结构概述
    • LinkedBlockingQueue是Java并发包中提供的一种线程安全的阻塞队列,它基于链表结构实现。这种链表结构使得LinkedBlockingQueue在处理大量元素时具有较好的扩展性,尤其适用于需要动态添加和移除元素的场景。
    • 从本质上来说,LinkedBlockingQueue使用的是一个单向链表。链表中的每个节点(Node)存储了队列中的一个元素,并且每个节点都指向下一个节点,从而形成了一个链式结构。
    • 以Java源码来看,LinkedBlockingQueue内部定义了一个静态内部类Node,代码如下:
static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}
  • 这里item用于存储队列中的元素,next则指向下一个节点。
  1. 链表的头节点和尾节点
    • LinkedBlockingQueue维护了两个重要的节点引用,即头节点(head)和尾节点(tail)。头节点用于出队操作,尾节点用于入队操作。
    • 当队列初始化时,headtail都指向一个空节点(这个空节点的itemnull)。例如,在LinkedBlockingQueue的构造函数中:
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
  • 这里last就是tail的别名。入队操作时,新元素会被添加到tail节点之后,并将tail更新为新添加的节点。出队操作则是从head节点之后取出元素,并将head更新为下一个节点。
  • 例如入队操作的部分核心代码:
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() >= capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}
private void enqueue(Node<E> node) {
    tail = tail.next = node;
}
  • enqueue方法中,新节点node被添加到tail之后,并更新tail
  • 出队操作的部分核心代码:
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
  • dequeue方法中,从head的下一个节点取出元素,并更新head
  1. 链表结构与容量限制
    • LinkedBlockingQueue可以是有界的,也可以是无界的。当创建LinkedBlockingQueue时,如果指定了容量(capacity),则队列在达到该容量时,入队操作会被阻塞(如果使用put方法)或者返回false(如果使用offer方法)。
    • 有界队列的容量限制在链表结构中的体现主要是通过count变量来控制。count记录了当前队列中的元素数量,当count达到capacity时,入队操作就会受到限制。例如,在入队操作的offer方法中,首先检查count是否大于等于capacity
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() >= capacity)
        return false;
    // 后续入队操作代码
}
  • 而对于无界队列,LinkedBlockingQueue在创建时会将capacity设置为Integer.MAX_VALUE,理论上可以无限添加元素。但在实际应用中,由于内存等资源的限制,不可能真正无限制添加。

Java LinkedBlockingQueue 的性能特点

  1. 线程安全与锁机制
    • LinkedBlockingQueue是线程安全的,这得益于它内部使用的锁机制。它使用了两把锁,一把是putLock用于入队操作,另一把是takeLock用于出队操作。
    • 这种分离锁的设计使得入队和出队操作可以同时进行,在高并发场景下,提高了队列的吞吐量。例如,当一个线程在执行入队操作(持有putLock)时,另一个线程可以同时执行出队操作(持有takeLock),而不会相互阻塞。
    • 以入队操作的put方法为例:
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
  • put方法中,首先获取putLock,如果队列已满,则通过notFull.await()等待,直到有空间可用。出队操作类似,获取takeLock,如果队列为空,则通过notEmpty.await()等待,直到有元素可用。
  • 同时,LinkedBlockingQueue还使用了AtomicInteger类型的count变量来记录队列中的元素数量,保证了在多线程环境下对元素数量统计的原子性。
  1. 阻塞与非阻塞操作
    • LinkedBlockingQueue提供了阻塞和非阻塞两种操作方式。阻塞操作主要包括puttake方法。
      • put方法:当队列已满时,调用put方法的线程会被阻塞,直到队列有空间可用。例如:
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
queue.put(1);
queue.put(2);
// 此时队列已满,下面这行代码会阻塞当前线程
queue.put(3); 
 - `take`方法:当队列为空时,调用`take`方法的线程会被阻塞,直到队列中有元素可用。例如:
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
queue.put(1);
queue.put(2);
new Thread(() -> {
    try {
        Integer num = queue.take();
        System.out.println("取出元素: " + num);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();
// 主线程睡眠2秒模拟其他操作
try {
    Thread.sleep(2000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
queue.take();
// 此时队列已空,下面这行代码会阻塞当前线程
queue.take(); 
  • 非阻塞操作主要包括offerpoll方法。
    • offer方法:尝试将元素添加到队列中,如果队列已满,立即返回false,不会阻塞线程。例如:
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
boolean result1 = queue.offer(1);
boolean result2 = queue.offer(2);
// 此时队列已满,返回false
boolean result3 = queue.offer(3); 
System.out.println("result1: " + result1 + ", result2: " + result2 + ", result3: " + result3);
 - `poll`方法:尝试从队列中取出元素,如果队列为空,立即返回`null`,不会阻塞线程。例如:
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
queue.offer(1);
queue.offer(2);
Integer num1 = queue.poll();
Integer num2 = queue.poll();
// 此时队列已空,返回null
Integer num3 = queue.poll(); 
System.out.println("num1: " + num1 + ", num2: " + num2 + ", num3: " + num3);
  1. 性能与应用场景
    • 性能方面:由于LinkedBlockingQueue采用链表结构,在插入和删除元素时,不需要移动大量元素,时间复杂度为O(1)。并且通过分离锁机制,在高并发场景下能提供较好的吞吐量。
    • 应用场景:适用于生产者 - 消费者模型。例如,在一个多线程的任务处理系统中,生产者线程将任务添加到LinkedBlockingQueue中,消费者线程从队列中取出任务并处理。由于LinkedBlockingQueue的线程安全和阻塞特性,生产者和消费者可以高效且安全地协同工作。
    • 再比如,在一些消息队列系统的简单实现中,LinkedBlockingQueue可以作为消息的暂存队列,生产者发送消息到队列,消费者从队列获取消息进行处理,保证了消息的顺序性和线程安全。
    • 然而,LinkedBlockingQueue也有一些局限性。由于链表结构需要额外的内存空间来存储节点的引用,在存储大量小数据时,可能会消耗较多的内存。并且,虽然分离锁机制提高了并发性能,但在一些极端高并发场景下,锁竞争仍然可能成为性能瓶颈。

代码示例综合展示

  1. 基本操作示例
import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个容量为2的LinkedBlockingQueue
        LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);

        try {
            // 尝试添加元素
            boolean offerResult1 = queue.offer(1);
            boolean offerResult2 = queue.offer(2);
            boolean offerResult3 = queue.offer(3);
            System.out.println("offer 1 result: " + offerResult1);
            System.out.println("offer 2 result: " + offerResult2);
            System.out.println("offer 3 result: " + offerResult3);

            // 阻塞式添加元素
            queue.put(3);
            System.out.println("元素3已通过put方法添加到队列");

            // 尝试取出元素
            Integer pollResult1 = queue.poll();
            Integer pollResult2 = queue.poll();
            Integer pollResult3 = queue.poll();
            System.out.println("poll 1 result: " + pollResult1);
            System.out.println("poll 2 result: " + pollResult2);
            System.out.println("poll 3 result: " + pollResult3);

            // 阻塞式取出元素
            Integer takeResult = queue.take();
            System.out.println("通过take方法取出的元素: " + takeResult);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,首先创建了一个容量为2的LinkedBlockingQueue。然后通过offer方法尝试添加元素,展示了非阻塞添加的行为。接着使用put方法进行阻塞式添加,当队列满时,put方法会阻塞直到有空间可用。之后通过poll方法尝试非阻塞取出元素,最后使用take方法进行阻塞式取出,当队列为空时,take方法会阻塞直到有元素可用。 2. 生产者 - 消费者模型示例

import java.util.concurrent.LinkedBlockingQueue;

class Producer implements Runnable {
    private final LinkedBlockingQueue<Integer> queue;

    public Producer(LinkedBlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            try {
                System.out.println("生产者生产: " + i);
                queue.put(i);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private final LinkedBlockingQueue<Integer> queue;

    public Consumer(LinkedBlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Integer num = queue.take();
                System.out.println("消费者消费: " + num);
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
    }
}

public class ProducerConsumerExample {
    public static void main(String[] args) {
        LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
        Thread producerThread = new Thread(new Producer(queue));
        Thread consumerThread = new Thread(new Consumer(queue));

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();
            Thread.sleep(2000);
            consumerThread.interrupt();
            consumerThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个生产者 - 消费者模型示例中,Producer类实现了Runnable接口,负责向LinkedBlockingQueue中添加元素。Consumer类同样实现了Runnable接口,负责从队列中取出元素。LinkedBlockingQueue的容量设置为3,生产者和消费者线程通过队列进行数据交互,展示了LinkedBlockingQueue在实际多线程场景中的应用。

通过以上对LinkedBlockingQueue链表结构和性能特点的深入分析以及代码示例,希望能帮助读者更好地理解和使用这一重要的Java并发工具。在实际应用中,应根据具体的业务需求和场景,合理选择和使用LinkedBlockingQueue,以充分发挥其优势,避免潜在的性能问题。