MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 阻塞队列概念及常见实现类分析

2024-01-064.0k 阅读

Java 阻塞队列概念

在多线程编程中,阻塞队列(BlockingQueue)是一种特殊的队列,它提供了线程安全的特性,并且在某些情况下会阻塞线程。阻塞队列常用于生产者 - 消费者模式,是实现线程间协作的重要工具。

阻塞队列有以下几个关键特性:

  1. 线程安全:多个线程可以安全地访问阻塞队列,不需要额外的同步机制来保证数据的一致性。这是因为阻塞队列内部已经实现了必要的同步逻辑。
  2. 阻塞操作:当队列满时,往队列中添加元素的操作会被阻塞,直到队列中有空间可用。反之,当队列空时,从队列中获取元素的操作会被阻塞,直到队列中有元素可供获取。这种阻塞机制使得生产者和消费者线程能够协调工作,避免了数据竞争和空指针等问题。

阻塞队列在Java并发包(java.util.concurrent)中被定义为BlockingQueue接口,该接口继承自Queue接口。BlockingQueue接口定义了一系列用于操作队列的方法,这些方法可以分为以下几类:

  1. 添加元素

    • boolean add(E e):将指定元素插入此队列(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException
    • boolean offer(E e):将指定元素插入此队列(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false
    • void put(E e) throws InterruptedException:将指定元素插入此队列,将等待(如果必要)直到有空间可用。此方法会响应中断,如果在等待过程中线程被中断,会抛出 InterruptedException
    • boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException:将指定元素插入此队列,将等待(如果必要)直到有空间可用或者指定的等待时间过去。同样会响应中断,如果在等待过程中线程被中断,会抛出 InterruptedException
  2. 获取元素

    • E remove():获取并移除此队列的头部,此方法在队列为空时会抛出 NoSuchElementException
    • E poll():获取并移除此队列的头部,如果此队列为空,则返回 null
    • E take() throws InterruptedException:获取并移除此队列的头部,将等待(如果必要)直到有元素可用。会响应中断,如果在等待过程中线程被中断,会抛出 InterruptedException
    • E poll(long timeout, TimeUnit unit) throws InterruptedException:获取并移除此队列的头部,将等待(如果必要)直到有元素可用或者指定的等待时间过去。同样会响应中断,如果在等待过程中线程被中断,会抛出 InterruptedException
  3. 检查元素

    • E element():获取但不移除此队列的头部,如果此队列为空,则抛出 NoSuchElementException
    • E peek():获取但不移除此队列的头部,如果此队列为空,则返回 null

Java 阻塞队列常见实现类分析

ArrayBlockingQueue

ArrayBlockingQueue是基于数组实现的有界阻塞队列。它在创建时需要指定队列的容量,一旦创建,容量不能改变。

  1. 特性

    • 有界性:队列的容量是固定的,创建时通过构造函数指定。这意味着当队列满时,继续添加元素会导致阻塞(使用put方法)或返回false(使用offer方法)。
    • 公平性ArrayBlockingQueue可以通过构造函数设置为公平或非公平的。公平模式下,线程获取锁的顺序是按照等待顺序,而非公平模式下,线程可能会插队获取锁,非公平模式的性能通常比公平模式更好。
  2. 代码示例

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个容量为 3 的 ArrayBlockingQueue
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);

        Thread producer = new Thread(() -> {
            try {
                queue.put(1);
                System.out.println("Produced: 1");
                queue.put(2);
                System.out.println("Produced: 2");
                queue.put(3);
                System.out.println("Produced: 3");
                // 尝试添加第 4 个元素,队列已满,会阻塞
                queue.put(4);
                System.out.println("Produced: 4");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                Thread.sleep(2000); // 等待生产者添加元素
                System.out.println("Consumed: " + queue.take());
                System.out.println("Consumed: " + queue.take());
                System.out.println("Consumed: " + queue.take());
                // 尝试获取第 4 个元素,队列已空,会阻塞
                System.out.println("Consumed: " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

在上述代码中,producer线程向ArrayBlockingQueue中添加元素,consumer线程从队列中取出元素。当producer添加到第4个元素时,由于队列容量为3,put方法会阻塞。consumer线程启动后,先睡眠2秒等待producer添加元素,然后开始消费。当消费完3个元素后,再调用take方法会阻塞,因为队列已空。

LinkedBlockingQueue

LinkedBlockingQueue是基于链表实现的阻塞队列,它可以是有界的,也可以是无界的(默认是无界的,即容量为Integer.MAX_VALUE)。

  1. 特性

    • 有界性:可以通过构造函数指定队列的容量,如果不指定,则为无界队列。有界队列在满时会阻塞添加操作,无界队列理论上不会满,因此添加操作不会因为队列满而阻塞。
    • 高效性:由于是链表结构,在并发环境下,LinkedBlockingQueue的性能通常优于ArrayBlockingQueue,尤其是在高并发读写的场景下。这是因为链表结构的插入和删除操作不需要移动大量元素,而数组结构在添加和删除元素时可能需要进行数组的复制和移动。
  2. 代码示例

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class LinkedBlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个容量为 3 的 LinkedBlockingQueue
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);

        Thread producer = new Thread(() -> {
            try {
                queue.put(1);
                System.out.println("Produced: 1");
                queue.put(2);
                System.out.println("Produced: 2");
                queue.put(3);
                System.out.println("Produced: 3");
                // 尝试添加第 4 个元素,队列已满,会阻塞
                queue.put(4);
                System.out.println("Produced: 4");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                Thread.sleep(2000); // 等待生产者添加元素
                System.out.println("Consumed: " + queue.take());
                System.out.println("Consumed: " + queue.take());
                System.out.println("Consumed: " + queue.take());
                // 尝试获取第 4 个元素,队列已空,会阻塞
                System.out.println("Consumed: " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

此代码与ArrayBlockingQueue的示例类似,只是将队列类型改为LinkedBlockingQueue。同样展示了生产者 - 消费者模式下,队列满时添加操作阻塞,队列空时获取操作阻塞的特性。

PriorityBlockingQueue

PriorityBlockingQueue是一个无界的阻塞队列,它基于堆数据结构实现,元素按照自然顺序或者自定义的比较器顺序进行排序。

  1. 特性

    • 无界性:队列理论上没有容量限制,只要系统资源足够,就可以不断添加元素。
    • 优先级排序:队列中的元素按照其优先级顺序出队,优先级的定义可以是元素的自然顺序(如果元素实现了Comparable接口),也可以通过构造函数传入的Comparator来定义。
  2. 代码示例

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.BlockingQueue;

class PriorityElement implements Comparable<PriorityElement> {
    private int value;
    private int priority;

    public PriorityElement(int value, int priority) {
        this.value = value;
        this.priority = priority;
    }

    @Override
    public int compareTo(PriorityElement other) {
        return this.priority - other.priority;
    }

    @Override
    public String toString() {
        return "Value: " + value + ", Priority: " + priority;
    }
}

public class PriorityBlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<PriorityElement> queue = new PriorityBlockingQueue<>();

        Thread producer = new Thread(() -> {
            try {
                queue.put(new PriorityElement(10, 2));
                System.out.println("Produced: Value 10, Priority 2");
                queue.put(new PriorityElement(20, 1));
                System.out.println("Produced: Value 20, Priority 1");
                queue.put(new PriorityElement(30, 3));
                System.out.println("Produced: Value 30, Priority 3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                Thread.sleep(2000); // 等待生产者添加元素
                System.out.println("Consumed: " + queue.take());
                System.out.println("Consumed: " + queue.take());
                System.out.println("Consumed: " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

在上述代码中,PriorityElement类实现了Comparable接口,通过priority字段定义了元素的优先级。producer线程向PriorityBlockingQueue中添加元素,consumer线程从队列中取出元素。由于优先级的定义,元素会按照优先级顺序出队,即使添加顺序不同,也会先取出优先级高的元素。

DelayQueue

DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。队列中的元素必须实现Delayed接口,该接口继承自Comparable接口。

  1. 特性

    • 无界性:与PriorityBlockingQueue类似,DelayQueue理论上没有容量限制。
    • 延迟特性:元素只有在延迟时间到达后才能被取出。这使得DelayQueue非常适合实现定时任务等功能。
  2. 代码示例

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedElement implements Delayed {
    private String name;
    private long delayTime;
    private long startTime;

    public DelayedElement(String name, long delayTime) {
        this.name = name;
        this.delayTime = delayTime;
        this.startTime = System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = delayTime - (System.currentTimeMillis() - startTime);
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "Name: " + name + ", Delay: " + delayTime + "ms";
    }
}

public class DelayQueueExample {
    public static void main(String[] args) {
        DelayQueue<DelayedElement> queue = new DelayQueue<>();

        Thread producer = new Thread(() -> {
            try {
                queue.put(new DelayedElement("Element1", 3000));
                System.out.println("Produced: Element1 with 3s delay");
                queue.put(new DelayedElement("Element2", 1000));
                System.out.println("Produced: Element2 with 1s delay");
                queue.put(new DelayedElement("Element3", 2000));
                System.out.println("Produced: Element3 with 2s delay");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                System.out.println("Consumed: " + queue.take());
                System.out.println("Consumed: " + queue.take());
                System.out.println("Consumed: " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

在这个示例中,DelayedElement类实现了Delayed接口,通过getDelay方法计算元素的剩余延迟时间。producer线程向DelayQueue中添加具有不同延迟时间的元素,consumer线程从队列中取出元素。元素会按照延迟时间的先后顺序被取出,即延迟时间短的元素先被取出。

SynchronousQueue

SynchronousQueue是一个特殊的阻塞队列,它没有容量,每个插入操作必须等待另一个线程的移除操作,反之亦然。

  1. 特性

    • 无容量:队列不存储任何元素,直接在生产者和消费者之间传递数据。这使得SynchronousQueue非常适合传递性的场景,例如线程间的数据交换。
    • 高并发性能:由于不需要存储元素,SynchronousQueue在高并发场景下具有较好的性能。
  2. 代码示例

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.BlockingQueue;

public class SynchronousQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new SynchronousQueue<>();

        Thread producer = new Thread(() -> {
            try {
                System.out.println("Producer trying to put 10");
                queue.put(10);
                System.out.println("Producer put 10");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                Thread.sleep(2000); // 等待生产者准备好
                System.out.println("Consumer trying to take");
                Integer value = queue.take();
                System.out.println("Consumer took: " + value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

在上述代码中,producer线程尝试向SynchronousQueue中添加元素10,由于队列没有容量,put方法会阻塞,直到consumer线程调用take方法。consumer线程启动后,先睡眠2秒,然后尝试从队列中取出元素。当consumer调用take方法时,producerput操作才会继续执行,从而完成数据的传递。

不同阻塞队列的应用场景选择

  1. ArrayBlockingQueue:适用于需要固定容量,并且对公平性有要求的场景。例如,在一些资源有限且需要公平分配的系统中,ArrayBlockingQueue可以保证线程按照顺序获取资源。
  2. LinkedBlockingQueue:适用于高并发读写场景,尤其是队列长度不确定的情况。如果需要一个有界队列,可以通过构造函数指定容量;如果不需要限制队列长度,默认的无界队列可以满足需求。
  3. PriorityBlockingQueue:适用于需要按照元素优先级进行处理的场景,如任务调度系统,优先级高的任务优先执行。
  4. DelayQueue:主要用于实现定时任务,例如定时消息发送、缓存过期清理等功能。
  5. SynchronousQueue:适用于线程间直接传递数据的场景,如生产者 - 消费者模式中,生产者和消费者之间的数据传递不需要中间存储,直接进行交互。

通过对不同阻塞队列的深入了解,开发者可以根据具体的业务需求和场景选择最合适的阻塞队列,从而优化多线程程序的性能和稳定性。在实际应用中,还需要考虑队列的容量、元素类型、并发访问频率等因素,以确保选择的阻塞队列能够满足系统的要求。