MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 优先级队列在 Java 线程池中的运用

2024-06-177.1k 阅读

Java 优先级队列基础

什么是优先级队列

在 Java 中,PriorityQueue 是一个基于堆数据结构的无界队列。它根据元素的自然顺序或者通过一个 Comparator 来对元素进行排序,从而保证每次从队列中取出的元素都是当前队列中优先级最高的(在自然顺序下为最小元素,若使用 Comparator 则按其定义的顺序)。

优先级队列的特点

  1. 元素有序性PriorityQueue 并非像 Queue 接口的其他实现(如 LinkedList 实现的队列)那样按照元素的插入顺序进行处理。而是依据元素的优先级顺序,这使得每次获取的元素都是当前队列中优先级最高的。
  2. 无界性PriorityQueue 理论上可以存储无限数量的元素,不过在实际应用中,会受到系统内存的限制。
  3. 堆结构实现:底层基于堆数据结构实现。堆是一种特殊的完全二叉树,对于最小堆(PriorityQueue 默认实现),每个父节点的值都小于或等于其左右子节点的值。这种结构特点使得在获取优先级最高(最小)元素以及插入新元素时,都能保持较高的效率。

优先级队列的常用方法

  1. add(E e):将指定元素插入此优先级队列。如果插入成功则返回 true,如果由于容量限制而无法插入则抛出 IllegalStateException。例如:
PriorityQueue<Integer> pq = new PriorityQueue<>();
pq.add(3);
pq.add(1);
pq.add(2);
  1. offer(E e):将指定元素插入此优先级队列。如果插入成功则返回 true,如果由于容量限制而无法插入则返回 false。与 add 方法类似,但在无法插入时的处理方式不同。示例如下:
PriorityQueue<Integer> pq = new PriorityQueue<>();
boolean result = pq.offer(5);
  1. poll():获取并移除队列的头部元素(即优先级最高的元素)。如果队列为空,则返回 null
PriorityQueue<Integer> pq = new PriorityQueue<>();
pq.add(3);
pq.add(1);
pq.add(2);
Integer head = pq.poll(); // head 的值为 1
  1. peek():获取但不移除队列的头部元素。如果队列为空,则返回 null
PriorityQueue<Integer> pq = new PriorityQueue<>();
pq.add(3);
pq.add(1);
pq.add(2);
Integer head = pq.peek(); // head 的值为 1

自定义优先级

  1. 自然排序:如果元素类实现了 Comparable 接口,PriorityQueue 会按照 Comparable 接口定义的顺序来确定元素的优先级。例如,对于自定义的 Task 类:
class Task implements Comparable<Task> {
    private int priority;
    private String taskName;

    public Task(int priority, String taskName) {
        this.priority = priority;
        this.taskName = taskName;
    }

    @Override
    public int compareTo(Task other) {
        return this.priority - other.priority;
    }

    @Override
    public String toString() {
        return "Task{" +
                "priority=" + priority +
                ", taskName='" + taskName + '\'' +
                '}';
    }
}

使用该类创建 PriorityQueue 时:

PriorityQueue<Task> taskQueue = new PriorityQueue<>();
taskQueue.add(new Task(2, "Task2"));
taskQueue.add(new Task(1, "Task1"));
taskQueue.add(new Task(3, "Task3"));
Task task = taskQueue.poll();
System.out.println(task); // 输出: Task{priority=1, taskName='Task1'}
  1. 使用 Comparator:也可以在创建 PriorityQueue 时传入一个 Comparator 来定义元素的优先级。例如,对于上述 Task 类,我们可以通过 Comparator 来改变优先级规则:
PriorityQueue<Task> taskQueue = new PriorityQueue<>(Comparator.comparingInt(t -> t.priority));
taskQueue.add(new Task(2, "Task2"));
taskQueue.add(new Task(1, "Task1"));
taskQueue.add(new Task(3, "Task3"));
Task task = taskQueue.poll();
System.out.println(task); // 输出: Task{priority=1, taskName='Task1'}

Java 线程池概述

线程池的基本概念

线程池是一种管理和复用线程的机制,它维护着一组预先创建好的线程。当有任务需要执行时,线程池会从线程池中取出一个空闲线程来执行该任务,任务执行完毕后,线程不会被销毁,而是返回线程池等待下一个任务。这样可以避免频繁地创建和销毁线程带来的开销,提高系统的性能和响应速度。

线程池的优势

  1. 降低资源消耗:通过复用已有的线程,减少了线程创建和销毁的开销。线程的创建和销毁需要与操作系统进行交互,涉及到内核态和用户态的切换,开销较大。而线程池可以在任务执行完毕后将线程保留,下次有任务时直接使用,从而降低了系统资源的消耗。
  2. 提高响应速度:当有新任务到来时,无需等待线程的创建,直接从线程池中获取空闲线程执行任务,大大提高了系统的响应速度。特别是在高并发场景下,这种优势更加明显。
  3. 提高线程的可管理性:线程池可以对线程进行统一的管理,如线程的数量控制、线程的生命周期管理等。可以设置线程池的最大线程数、核心线程数等参数,避免线程过多导致系统资源耗尽,或者线程过少导致任务处理不及时。

Java 线程池的实现类

在 Java 中,线程池的主要实现类是 ThreadPoolExecutor。它提供了丰富的构造函数和方法来配置和管理线程池。其构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  1. corePoolSize:核心线程数,线程池中会一直存活的线程数量,即使这些线程处于空闲状态。当有新任务到来时,如果线程池中的线程数小于 corePoolSize,会创建新的线程来执行任务。
  2. maximumPoolSize:最大线程数,线程池中允许存在的最大线程数量。当任务队列已满,并且线程池中的线程数小于 maximumPoolSize 时,会创建新的线程来执行任务。
  3. keepAliveTime:线程的存活时间,当线程池中的线程数量超过 corePoolSize 时,多余的空闲线程在等待新任务到来的时间超过 keepAliveTime 后,会被销毁。
  4. unitkeepAliveTime 的时间单位,如 TimeUnit.SECONDSTimeUnit.MILLISECONDS 等。
  5. workQueue:任务队列,用于存储等待执行的任务。当线程池中的线程都在忙碌时,新的任务会被放入任务队列中等待执行。
  6. threadFactory:线程工厂,用于创建新的线程。可以通过自定义线程工厂来设置线程的名称、优先级等属性。
  7. handler:拒绝策略,当任务队列已满且线程池中的线程数达到 maximumPoolSize 时,新的任务会被拒绝。RejectedExecutionHandler 接口提供了几种默认的拒绝策略,如 AbortPolicy(抛出异常)、CallerRunsPolicy(在调用者线程中执行任务)、DiscardPolicy(丢弃任务)、DiscardOldestPolicy(丢弃队列中最老的任务)。

线程池的工作流程

  1. 当有新任务提交到线程池时,首先检查线程池中的线程数是否小于 corePoolSize。如果小于,则创建一个新的线程来执行任务。
  2. 如果线程池中的线程数已经达到 corePoolSize,则将任务放入任务队列 workQueue 中等待执行。
  3. 如果任务队列已满,再检查线程池中的线程数是否小于 maximumPoolSize。如果小于,则创建一个新的线程来执行任务。
  4. 如果线程池中的线程数已经达到 maximumPoolSize,则根据设置的拒绝策略来处理新提交的任务。

Java 优先级队列在 Java 线程池中的运用

为什么在线程池中使用优先级队列

在多任务处理的场景下,不同的任务可能具有不同的优先级。例如,在一个服务器应用中,处理系统监控相关的任务可能比处理普通用户请求的任务优先级更高。如果使用普通的任务队列,任务将按照提交的顺序依次执行,无法满足对高优先级任务优先处理的需求。而将优先级队列作为线程池的任务队列,可以让线程池优先处理高优先级的任务,从而提高系统的整体性能和响应能力。

如何在线程池中使用优先级队列

  1. 创建优先级队列作为任务队列:首先,需要创建一个 PriorityQueue 来存储任务。由于 PriorityQueue 实现了 BlockingQueue 接口,而 ThreadPoolExecutor 的构造函数中需要一个 BlockingQueue 类型的任务队列,所以可以直接将 PriorityQueue 作为参数传入。例如:
PriorityQueue<Runnable> priorityQueue = new PriorityQueue<>(Comparator.comparingInt(task -> ((PriorityTask) task).getPriority()));
ThreadPoolExecutor executor = new ThreadPoolExecutor(
        2,
        4,
        10,
        TimeUnit.SECONDS,
        priorityQueue
);
  1. 定义具有优先级的任务类:为了让 PriorityQueue 能够正确地对任务进行排序,需要定义一个实现了 Runnable 接口且具有优先级属性的任务类。例如:
class PriorityTask implements Runnable {
    private int priority;
    private String taskName;

    public PriorityTask(int priority, String taskName) {
        this.priority = priority;
        this.taskName = taskName;
    }

    public int getPriority() {
        return priority;
    }

    @Override
    public void run() {
        System.out.println("Executing task: " + taskName + " with priority " + priority);
    }
}
  1. 提交任务到线程池:创建好任务类和线程池后,就可以将任务提交到线程池中执行。例如:
executor.submit(new PriorityTask(2, "Task2"));
executor.submit(new PriorityTask(1, "Task1"));
executor.submit(new PriorityTask(3, "Task3"));

在上述代码中,PriorityTask 类实现了 Runnable 接口,并且具有一个 priority 属性来表示任务的优先级。PriorityQueue 通过 Comparator 根据任务的优先级对任务进行排序。当任务提交到线程池后,线程池会从优先级队列中取出优先级最高的任务来执行。

示例代码完整实现

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class PriorityTask implements Runnable {
    private int priority;
    private String taskName;

    public PriorityTask(int priority, String taskName) {
        this.priority = priority;
        this.taskName = taskName;
    }

    public int getPriority() {
        return priority;
    }

    @Override
    public void run() {
        System.out.println("Executing task: " + taskName + " with priority " + priority);
    }
}

public class ThreadPoolWithPriorityQueue {
    public static void main(String[] args) {
        BlockingQueue<Runnable> priorityQueue = new PriorityBlockingQueue<>(Comparator.comparingInt(task -> ((PriorityTask) task).getPriority()));
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                priorityQueue
        );

        executor.submit(new PriorityTask(2, "Task2"));
        executor.submit(new PriorityTask(1, "Task1"));
        executor.submit(new PriorityTask(3, "Task3"));

        executor.shutdown();
    }
}

在上述完整示例中,PriorityBlockingQueuePriorityQueue 的一个线程安全版本,适用于多线程环境。ThreadPoolExecutor 使用 PriorityBlockingQueue 作为任务队列,当任务提交到线程池后,会按照任务的优先级顺序依次执行。

注意事项

  1. 任务优先级的合理设置:在设置任务优先级时,需要根据实际业务需求进行合理的规划。如果优先级设置不合理,可能导致某些任务长时间得不到执行,出现饿死现象。例如,如果所有任务都设置为高优先级,那么优先级队列的作用就无法体现。
  2. 线程池参数的调整:当使用优先级队列作为线程池的任务队列时,线程池的其他参数(如 corePoolSizemaximumPoolSize 等)也需要根据任务的特点和系统资源进行合理的调整。如果线程池的线程数量过少,可能导致高优先级任务也无法及时得到执行;如果线程数量过多,可能会消耗过多的系统资源。
  3. 线程安全问题:如果在多线程环境中使用优先级队列,需要确保队列的操作是线程安全的。PriorityBlockingQueuePriorityQueue 的线程安全版本,可以直接在多线程环境中使用。如果使用 PriorityQueue,需要自行处理线程同步问题,否则可能会出现数据不一致等问题。

通过在 Java 线程池中合理运用优先级队列,可以有效地提高系统对不同优先级任务的处理能力,优化系统性能和响应速度,满足复杂业务场景下的需求。在实际应用中,需要根据具体的业务需求和系统环境,仔细调整线程池和优先级队列的相关参数,以达到最佳的效果。