MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java PriorityBlockingQueue的优先级排序

2022-12-303.3k 阅读

Java PriorityBlockingQueue简介

在Java的并发编程领域中,PriorityBlockingQueue是一个重要的类,它实现了BlockingQueue接口。PriorityBlockingQueue是一个无界的阻塞队列,其中的元素按照其自然顺序(如果元素实现了Comparable接口)或者根据构造队列时提供的Comparator进行排序。与普通的队列不同,PriorityBlockingQueue每次出队操作返回的是队列中优先级最高的元素。

这个队列适用于需要处理具有优先级任务的场景,比如在一个任务调度系统中,某些任务可能具有更高的优先级,需要优先被处理。PriorityBlockingQueue会自动根据元素的优先级进行排序,使得高优先级的任务总是排在队列的前面,方便获取和处理。

自然顺序排序

当元素实现了Comparable接口时,PriorityBlockingQueue会按照元素的自然顺序进行排序。以一个简单的整数队列为例,来看代码实现:

import java.util.concurrent.PriorityBlockingQueue;

public class NaturalOrderExample {
    public static void main(String[] args) {
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
        queue.add(3);
        queue.add(1);
        queue.add(2);

        while (!queue.isEmpty()) {
            System.out.println(queue.poll());
        }
    }
}

在上述代码中,我们创建了一个PriorityBlockingQueue,并向其中添加了三个整数。由于Integer类已经实现了Comparable接口,所以队列会按照自然顺序(从小到大)对元素进行排序。在while循环中,通过poll()方法依次取出队列中的元素并打印,输出结果会是123

自定义比较器排序

有时候,元素的自然顺序可能不符合我们实际的优先级需求,这时候就需要使用自定义的Comparator。假设我们有一个表示任务的类Task,每个任务有一个优先级属性,我们希望根据优先级来对任务进行排序。

import java.util.concurrent.PriorityBlockingQueue;
import java.util.Comparator;

class Task {
    private int priority;
    private String taskName;

    public Task(int priority, String taskName) {
        this.priority = priority;
        this.taskName = taskName;
    }

    public int getPriority() {
        return priority;
    }

    public String getTaskName() {
        return taskName;
    }

    @Override
    public String toString() {
        return "Task{" +
                "priority=" + priority +
                ", taskName='" + taskName + '\'' +
                '}';
    }
}

class TaskComparator implements Comparator<Task> {
    @Override
    public int compare(Task task1, Task task2) {
        return Integer.compare(task1.getPriority(), task2.getPriority());
    }
}

public class CustomComparatorExample {
    public static void main(String[] args) {
        PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(10, new TaskComparator());
        queue.add(new Task(2, "Task B"));
        queue.add(new Task(1, "Task A"));
        queue.add(new Task(3, "Task C"));

        while (!queue.isEmpty()) {
            System.out.println(queue.poll());
        }
    }
}

在上述代码中,我们定义了一个Task类,它有prioritytaskName两个属性。然后创建了一个TaskComparator类,实现了Comparator接口,在compare方法中,根据任务的优先级进行比较。在main方法中,我们创建了一个PriorityBlockingQueue,并传入TaskComparator实例作为构造参数。接着向队列中添加了三个任务,最后通过poll()方法依次取出并打印任务,输出结果会按照优先级从小到大的顺序,即Task ATask BTask C

PriorityBlockingQueue的内部实现原理

PriorityBlockingQueue内部是基于堆数据结构来实现的。堆是一种特殊的树状数据结构,它满足堆性质:对于最大堆,父节点的值总是大于或等于子节点的值;对于最小堆,父节点的值总是小于或等于子节点的值。PriorityBlockingQueue使用的是最小堆,这样根节点(即队列的头部)就是优先级最低的元素(按照自然顺序或自定义比较器定义的优先级)。

当向队列中添加元素时,PriorityBlockingQueue会先将元素添加到堆的末尾,然后通过siftUp操作来调整堆,使其重新满足堆性质。siftUp操作会比较新添加元素与其父节点,如果新元素的优先级更高(根据自然顺序或比较器),则将其与父节点交换位置,不断重复这个过程,直到堆性质得到恢复。

private void siftUpComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        if (key.compareTo((E) e) >= 0)
            break;
        queue[k] = e;
        k = parent;
    }
    queue[k] = key;
}

上述代码是PriorityBlockingQueueComparable类型元素的 siftUp方法实现。这里k表示新元素在数组中的索引位置,x是新元素。通过循环比较新元素与父节点的优先级,进行交换和调整。

当从队列中取出元素时,PriorityBlockingQueue会先将堆顶元素(即优先级最低的元素)取出,然后将堆的最后一个元素移动到堆顶,再通过siftDown操作来调整堆。siftDown操作会比较堆顶元素与其子节点,选择优先级更低的子节点与堆顶元素交换位置,不断重复这个过程,直到堆性质得到恢复。

private void siftDownComparable(int k, E x) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        Object c = queue[child];
        int right = child + 1;
        if (right < size &&
                ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            c = queue[child = right];
        if (((Comparable<? super E>) x).compareTo((E) c) <= 0)
            break;
        queue[k] = c;
        k = child;
    }
    queue[k] = x;
}

上述代码是PriorityBlockingQueueComparable类型元素的siftDown方法实现。这里k表示堆顶元素在数组中的索引位置,x是原来堆顶元素。通过循环比较堆顶元素与子节点的优先级,进行交换和调整。

PriorityBlockingQueue的线程安全性

PriorityBlockingQueue是线程安全的,这是因为它的所有关键操作,如addoffertake等,都使用了内部的锁机制。在PriorityBlockingQueue的实现中,使用了ReentrantLock来保证线程安全。

private final ReentrantLock lock = new ReentrantLock();

put方法为例,其实现如下:

public void put(E e) {
    offer(e); // never need to block
}

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e);
        else
            siftUpUsingComparator(n, e, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

offer方法中,首先获取ReentrantLock锁,在完成添加元素和调整堆的操作后,最后释放锁。这样可以确保在多线程环境下,对PriorityBlockingQueue的操作是线程安全的。多个线程同时访问PriorityBlockingQueue时,只有获取到锁的线程才能进行队列的操作,其他线程会被阻塞,直到锁被释放。

PriorityBlockingQueue的应用场景

  1. 任务调度系统:在一个多任务的应用程序中,不同的任务可能具有不同的优先级。例如,在一个服务器中,处理系统监控任务可能比处理普通用户请求具有更高的优先级。PriorityBlockingQueue可以用来存储这些任务,调度器可以从队列中取出优先级最高的任务进行处理。
  2. 实时系统:在实时系统中,数据的处理顺序可能非常关键。比如在一个音频处理系统中,高优先级的音频数据需要优先处理,以保证音频的流畅播放。PriorityBlockingQueue可以按照优先级存储音频数据块,处理模块从队列中获取数据时,总是能得到优先级最高的数据块。
  3. 资源分配:在资源有限的情况下,需要优先分配给优先级高的请求。例如,在一个云计算平台中,某些高优先级的用户可能需要优先获得计算资源。PriorityBlockingQueue可以存储资源请求,资源分配模块从队列中取出优先级最高的请求进行处理,确保高优先级用户的需求得到及时满足。

与其他队列的比较

  1. PriorityQueue的比较PriorityQueue是一个非阻塞的优先队列,而PriorityBlockingQueue是阻塞队列。这意味着在PriorityQueue中,如果队列为空时调用poll方法,会返回null;而在PriorityBlockingQueue中,如果队列为空时调用take方法,调用线程会被阻塞,直到队列中有元素可供取出。此外,PriorityBlockingQueue是线程安全的,而PriorityQueue不是线程安全的,在多线程环境下需要额外的同步措施。
  2. LinkedBlockingQueue的比较LinkedBlockingQueue是一个基于链表的阻塞队列,它按照元素进入队列的顺序(FIFO)进行处理。而PriorityBlockingQueue是按照元素的优先级进行处理。LinkedBlockingQueue适用于需要按照顺序处理任务的场景,而PriorityBlockingQueue适用于需要根据优先级处理任务的场景。

注意事项

  1. 元素不可变:由于PriorityBlockingQueue是根据元素的优先级进行排序的,一旦元素被添加到队列中,其优先级不应改变。如果改变了元素的优先级,可能会导致队列的排序混乱,从而影响程序的正确性。因此,建议在设计元素类时,将优先级相关的属性设置为不可变。
  2. 性能问题:虽然PriorityBlockingQueue在处理优先级任务方面非常有用,但在高并发环境下,由于其内部使用锁机制来保证线程安全,可能会导致性能瓶颈。在某些场景下,可以考虑使用无锁的数据结构或其他并发队列来提高性能。另外,在队列元素数量较多时,堆的调整操作可能会比较耗时,需要根据实际应用场景进行性能测试和优化。

通过深入了解PriorityBlockingQueue的优先级排序机制、内部实现原理、线程安全性、应用场景以及与其他队列的比较,开发人员可以更好地在并发编程中使用它来满足不同的需求,构建高效、可靠的系统。在实际应用中,要根据具体场景选择合适的队列,并注意相关的注意事项,以确保程序的正确性和性能。