MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java中BlockingQueue的线程安全机制详解

2023-07-104.0k 阅读

Java中BlockingQueue的线程安全机制详解

在Java并发编程领域,BlockingQueue 是一个非常重要的接口,它提供了线程安全的队列操作,并且在多线程环境下能有效协调生产者 - 消费者模型。本文将深入探讨 BlockingQueue 的线程安全机制,包括其核心原理、不同实现类的特点以及如何在实际项目中合理应用。

1. BlockingQueue简介

BlockingQueue 继承自 Queue 接口,它定义了一组在多线程环境下阻塞操作的方法。主要有以下两类阻塞方法:

  • 插入操作:当队列满时,put(E e) 方法会阻塞,直到队列有空间可用。
  • 移除操作:当队列空时,take() 方法会阻塞,直到队列中有元素可移除。

这种阻塞特性使得 BlockingQueue 非常适合用于实现生产者 - 消费者模式,生产者线程向队列中放入元素,消费者线程从队列中取出元素,在队列满或空时,相应线程会自动阻塞,避免了忙等待和数据竞争问题。

2. 核心原理 - 锁与条件变量

BlockingQueue 的线程安全机制主要依赖于Java的锁(Lock)和条件变量(Condition)。以 ArrayBlockingQueue 为例,这是一个基于数组实现的有界阻塞队列。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    // 存储元素的数组
    final Object[] items;
    // 下一个要取出元素的索引
    int takeIndex;
    // 下一个要插入元素的索引
    int putIndex;
    // 队列中元素的数量
    int count;
    // 可重入锁
    final ReentrantLock lock;
    // 非空条件变量
    private final Condition notEmpty;
    // 非满条件变量
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
}

在上述代码中,ReentrantLock 用于保证对队列状态(如 counttakeIndexputIndex)的访问是线程安全的。notEmptynotFull 是两个条件变量,分别用于表示队列非空和非满的状态。

put 方法调用时,如果队列已满(count == items.length),当前线程会调用 notFull.await() 方法进入等待状态,并释放锁。当队列有空间时(例如有元素被取出),dequeue 方法会调用 notFull.signal() 唤醒等待在 notFull 条件变量上的线程。

类似地,take 方法在队列空时(count == 0),会调用 notEmpty.await() 进入等待状态,当有元素插入队列时,enqueue 方法会调用 notEmpty.signal() 唤醒等待在 notEmpty 条件变量上的线程。

3. 不同实现类的特点

  • ArrayBlockingQueue

    • 基于数组实现:内部使用数组存储元素,因此是有界队列。队列的容量在创建时就固定下来,后续无法动态扩展。
    • 公平性可选:构造函数中可以通过 fair 参数指定是否使用公平锁。公平锁保证等待时间最长的线程优先获取锁,但会带来一定的性能开销;非公平锁则允许线程在锁可用时直接竞争,性能相对较高。
    • 性能特性:由于是基于数组,在遍历和查找元素时效率较高,但插入和删除元素可能涉及数组元素的移动,尤其是在队列头部或尾部操作时。
  • LinkedBlockingQueue

    • 基于链表实现:可以是有界队列也可以是无界队列(构造函数中不传容量参数则为无界)。无界队列理论上可以容纳无限个元素,但在实际应用中可能会因为内存限制而受到约束。
    • 锁分离机制:与 ArrayBlockingQueue 使用一个锁不同,LinkedBlockingQueue 使用了两把锁,一把用于入队操作(putLock),一把用于出队操作(takeLock)。这种设计使得入队和出队操作可以并行执行,在高并发场景下性能更好。
    • 性能特性:由于基于链表,插入和删除操作在链表头部或尾部进行,不需要移动大量元素,效率较高。但遍历和查找元素时需要从头遍历链表,效率相对较低。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    // 链表头节点
    transient Node<E> head;
    // 链表尾节点
    private transient Node<E> last;
    // 队列元素数量
    private final AtomicInteger count = new AtomicInteger();
    // 队列容量,若为0则表示无界
    private final int capacity;
    // 入队锁
    private final ReentrantLock putLock = new ReentrantLock();
    // 非满条件变量
    private final Condition notFull = putLock.newCondition();
    // 出队锁
    private final ReentrantLock takeLock = new ReentrantLock();
    // 非空条件变量
    private final Condition notEmpty = takeLock.newCondition();

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
}
  • PriorityBlockingQueue
    • 优先级队列:元素按照自然顺序或自定义比较器进行排序,每次取出的元素是队列中优先级最高的元素。
    • 无界队列:理论上可以存储无限个元素,但同样受内存限制。
    • 实现原理:内部使用堆数据结构来维护元素的顺序。插入操作时,新元素会被添加到堆的合适位置以保持堆序;取出操作则从堆顶取出优先级最高的元素。
    • 线程安全机制:使用一把锁(lock)来保证对队列操作的线程安全。在插入和删除元素时,需要调整堆结构,这一过程通过锁来保护。
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    // 存储元素的数组,使用堆结构
    private transient Object[] queue;
    // 队列元素数量
    private transient int size;
    // 可重入锁
    private final ReentrantLock lock = new ReentrantLock();
    // 非空条件变量
    private final Condition notEmpty = lock.newCondition();
    // 比较器,用于元素排序
    private final Comparator<? super E> comparator;

    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.queue = new Object[initialCapacity];
        this.comparator = comparator;
    }

    public void put(E e) {
        offer(e);
    }

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

    private void siftUpComparable(int k, E x, Object[] array) {
        Comparable<? super E> key = (Comparable<? super E>)x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

    private void siftUpUsingComparator(int k, E x, Object[] array,
                                       Comparator<? super E> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (E) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        Object[] array = queue;
        @SuppressWarnings("unchecked")
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        if (n > 0)
            siftDownComparable(0, x, array, n, comparator);
        size = n;
        return result;
    }

    private void siftDownComparable(int k, E x, Object[] array, int n,
                                    Comparator<? super E> cmp) {
        if (cmp == null) {
            siftDownComparable(k, x, array, n);
        } else {
            siftDownUsingComparator(k, x, array, n, cmp);
        }
    }

    private void siftDownComparable(int k, E x, Object[] array, int n) {
        Comparable<? super E> key = (Comparable<? super E>)x;
        int half = n >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super E>) c).compareTo((E) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((E) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }

    private void siftDownUsingComparator(int k, E x, Object[] array, int n,
                                         Comparator<? super E> cmp) {
        int half = n >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                cmp.compare((E) c, (E) array[right]) > 0)
                c = array[child = right];
            if (cmp.compare(x, (E) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }
}
  • DelayQueue
    • 延迟队列:队列中的元素只有在延迟时间到期后才能被取出。元素必须实现 Delayed 接口,该接口继承自 Comparable 接口,用于定义延迟时间和比较逻辑。
    • 无界队列:同样理论上无界,但受内存限制。
    • 实现原理:内部使用 PriorityQueue 存储元素,并通过 ReentrantLockCondition 实现线程安全和阻塞操作。在 take 方法中,会检查队首元素的延迟时间是否到期,未到期则等待。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {
    // 存储元素的优先级队列
    private final transient PriorityQueue<E> q = new PriorityQueue<E>();
    // 用于控制线程等待的锁
    private final transient ReentrantLock lock = new ReentrantLock();
    // 用于等待延迟到期的条件变量
    private final Condition available = lock.newCondition();
    // 用于跟踪最近一次获取到延迟到期元素的线程
    private transient Thread leader = null;

    public DelayQueue() {}

    public void put(E e) {
        offer(e);
    }

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            boolean success = q.offer(e);
            if (success && q.peek() == e) {
                leader = null;
                available.signal();
            }
            return success;
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null;
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            lock.unlock();
        }
    }
}

4. 实际应用场景

  • 生产者 - 消费者模式:这是 BlockingQueue 最典型的应用场景。例如,在一个日志系统中,生产者线程负责将日志信息写入 BlockingQueue,消费者线程从队列中取出日志并进行持久化存储或分析。这样可以有效解耦生产者和消费者的速度差异,提高系统的稳定性和性能。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

class Producer implements Runnable {
    private BlockingQueue<String> queue;

    public Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                queue.put(message);
                System.out.println("Produced: " + message);
                Thread.sleep((long) (Math.random() * 1000));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                System.out.println("Consumed: " + message);
                Thread.sleep((long) (Math.random() * 1000));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class ProducerConsumerExample {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);
        Thread producerThread = new Thread(new Producer(queue));
        Thread consumerThread = new Thread(new Consumer(queue));

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();
            consumerThread.interrupt();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
  • 任务调度PriorityBlockingQueueDelayQueue 可以用于实现任务调度系统。例如,在一个定时任务系统中,任务可以按照优先级或延迟时间放入相应的队列,调度线程从队列中取出任务并执行。
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedTask implements Delayed {
    private String taskName;
    private long delayTime;
    private long startTime;

    public DelayedTask(String taskName, long delayTime) {
        this.taskName = taskName;
        this.delayTime = delayTime;
        this.startTime = System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        long remainingDelay = delayTime - elapsedTime;
        return unit.convert(remainingDelay, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "DelayedTask{" +
                "taskName='" + taskName + '\'' +
                ", delayTime=" + delayTime +
                '}';
    }
}

public class TaskScheduler {
    public static void main(String[] args) {
        DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
        delayQueue.add(new DelayedTask("Task 1", 3000));
        delayQueue.add(new DelayedTask("Task 2", 1000));
        delayQueue.add(new DelayedTask("Task 3", 2000));

        Thread schedulerThread = new Thread(() -> {
            try {
                while (true) {
                    DelayedTask task = delayQueue.take();
                    System.out.println("Executing task: " + task);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        schedulerThread.start();

        try {
            Thread.sleep(5000);
            schedulerThread.interrupt();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

5. 性能优化与注意事项

  • 选择合适的实现类:根据实际需求选择 BlockingQueue 的实现类。如果需要固定容量且对遍历性能有要求,ArrayBlockingQueue 可能是较好的选择;如果是高并发场景且对插入和删除性能要求高,LinkedBlockingQueue 更合适;对于需要按优先级或延迟处理任务的场景,则应选择 PriorityBlockingQueueDelayQueue
  • 锁的使用与性能:虽然 BlockingQueue 内部的锁机制保证了线程安全,但过多的锁竞争会影响性能。例如,ArrayBlockingQueue 使用一把锁可能导致在高并发下的性能瓶颈,而 LinkedBlockingQueue 的锁分离机制可以在一定程度上缓解这一问题。在设计系统时,应尽量减少不必要的锁竞争,例如通过合理的任务划分和资源分配。
  • 队列容量与内存管理:对于有界队列,要合理设置队列容量。过小的容量可能导致生产者线程频繁阻塞,降低系统吞吐量;过大的容量则可能占用过多内存,尤其是在队列元素较大时。对于无界队列,要注意内存使用情况,避免内存溢出。
  • 异常处理:在使用 BlockingQueue 的阻塞方法(如 puttake)时,要正确处理 InterruptedException。当线程在等待队列操作时被中断,应及时响应中断,清理资源并正确处理业务逻辑。

通过深入理解 BlockingQueue 的线程安全机制、不同实现类的特点以及在实际应用中的注意事项,开发者可以更好地利用这一强大的工具,构建高效、稳定的多线程应用程序。无论是在企业级开发、大数据处理还是分布式系统中,BlockingQueue 都能发挥重要作用,帮助解决多线程间的数据共享和协调问题。