MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 线程池的阻塞队列特点

2021-02-184.8k 阅读

Java 线程池阻塞队列简介

在Java线程池的体系中,阻塞队列扮演着至关重要的角色。线程池中的任务提交后,并非立即执行,而是会被放入一个队列中进行暂存,这个队列就是阻塞队列。阻塞队列的主要特点在于,当队列满时,继续往队列中添加元素的操作会被阻塞,直到队列有空间可用;当队列空时,从队列中获取元素的操作也会被阻塞,直到队列中有元素可获取。这种特性保证了线程池任务处理的有序性与稳定性,避免了资源竞争和任务丢失等问题。

常见阻塞队列实现

ArrayBlockingQueue

  1. 基于数组实现ArrayBlockingQueue是基于数组的有界阻塞队列,它在创建时需要指定队列的容量大小。一旦容量确定,在运行期间不能改变。例如:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
        try {
            queue.put(1);
            queue.put(2);
            queue.put(3);
            // 下面这行代码会阻塞,因为队列已满
            queue.put(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 公平性ArrayBlockingQueue支持公平和非公平两种模式。公平模式下,等待时间最长的线程会优先获取队列操作的机会,而非公平模式下,线程获取操作机会是随机的。默认情况下,ArrayBlockingQueue采用非公平模式。例如,通过构造函数设置公平模式:
BlockingQueue<Integer> fairQueue = new ArrayBlockingQueue<>(3, true);
  1. 性能:非公平模式下,ArrayBlockingQueue的性能通常比公平模式要好,因为公平模式需要维护一个等待线程的顺序,这增加了额外的开销。

LinkedBlockingQueue

  1. 基于链表实现LinkedBlockingQueue是基于链表的阻塞队列,它可以是有界的,也可以是无界的。如果不指定容量,默认容量为Integer.MAX_VALUE,即无界。例如,创建一个有界的LinkedBlockingQueue
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class LinkedBlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
        try {
            queue.put(1);
            queue.put(2);
            queue.put(3);
            // 下面这行代码会阻塞,因为队列已满
            queue.put(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 入队和出队操作LinkedBlockingQueue的入队和出队操作分别由puttake方法实现。当队列满时,put方法会阻塞;当队列空时,take方法会阻塞。例如:
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class LinkedBlockingQueueOperations {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
        Thread producer = new Thread(() -> {
            try {
                queue.put(1);
                Thread.sleep(1000);
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Thread consumer = new Thread(() -> {
            try {
                System.out.println(queue.take());
                System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        producer.start();
        consumer.start();
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 性能:由于是基于链表实现,LinkedBlockingQueue在处理大量元素时,内存分配相对灵活,但在频繁插入和删除操作时,由于链表节点的创建和销毁,可能会有一定的性能开销。

PriorityBlockingQueue

  1. 基于堆实现的优先队列PriorityBlockingQueue是一个无界的基于堆结构的优先队列。队列中的元素按照自然顺序或者自定义的比较器顺序进行排序。例如,创建一个PriorityBlockingQueue并添加元素:
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueExample {
    public static void main(String[] args) {
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
        queue.add(3);
        queue.add(1);
        queue.add(2);
        while (!queue.isEmpty()) {
            System.out.println(queue.poll());
        }
    }
}

在上述代码中,输出结果会是1、2、3,因为PriorityBlockingQueue会按照元素的自然顺序进行排序。 2. 获取元素PriorityBlockingQueuetakepoll方法会返回队列中优先级最高的元素。如果队列中没有元素,take方法会阻塞,poll方法会返回null。例如:

import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueTakePoll {
    public static void main(String[] args) {
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
        Thread producer = new Thread(() -> {
            try {
                queue.put(1);
                Thread.sleep(1000);
                queue.put(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Thread consumer = new Thread(() -> {
            try {
                System.out.println(queue.take());
                System.out.println(queue.take());
                // 下面这行代码会阻塞,因为队列已空
                System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        producer.start();
        consumer.start();
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 注意事项:由于PriorityBlockingQueue是无界的,在使用时需要注意内存的消耗,尤其是在大量元素不断加入队列时。

DelayQueue

  1. 基于时间延迟的队列DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中获取元素。队列中的元素必须实现Delayed接口,该接口继承自Comparable接口。例如,创建一个自定义的Delayed任务类:
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayedTask implements Delayed {
    private long delayTime;
    private long startTime;

    public DelayedTask(long delayTime) {
        this.delayTime = delayTime;
        this.startTime = System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        return unit.convert(delayTime - elapsedTime, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
    }
}
  1. 使用示例
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueExample {
    public static void main(String[] args) {
        DelayQueue<DelayedTask> queue = new DelayQueue<>();
        queue.add(new DelayedTask(3000));
        queue.add(new DelayedTask(1000));
        try {
            while (!queue.isEmpty()) {
                System.out.println(queue.take());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,先添加的延迟3秒的任务会在延迟1秒的任务之后被取出,因为DelayQueue会按照延迟时间的先后顺序来处理任务。 3. 应用场景:常用于实现定时任务,如缓存过期清理、定时消息发送等场景。

SynchronousQueue

  1. 无容量的队列SynchronousQueue是一个没有容量的阻塞队列,它不存储元素,每一个插入操作都必须等待另一个线程的移除操作,反之亦然。例如:
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.BlockingQueue;

public class SynchronousQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new SynchronousQueue<>();
        Thread producer = new Thread(() -> {
            try {
                System.out.println("Producer is trying to put 1");
                queue.put(1);
                System.out.println("Producer put 1 successfully");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Thread consumer = new Thread(() -> {
            try {
                System.out.println("Consumer is trying to take");
                Integer num = queue.take();
                System.out.println("Consumer took " + num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        producer.start();
        consumer.start();
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 性能特点:由于没有存储元素的空间,SynchronousQueue在高并发场景下,能够避免队列的维护开销,适用于传递性场景,如在线程之间传递数据。

线程池与阻塞队列的关系

  1. 任务缓冲:线程池使用阻塞队列来缓冲提交的任务。当线程池中的线程都在忙碌时,新提交的任务会被放入阻塞队列中等待执行。例如,使用ThreadPoolExecutor创建一个线程池,并设置LinkedBlockingQueue作为任务队列:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolWithBlockingQueue {
    public static void main(String[] args) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(3);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, 4, 10, TimeUnit.SECONDS, queue);
        for (int i = 0; i < 6; i++) {
            int taskNum = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNum + " is running");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}

在上述代码中,线程池的核心线程数为2,最大线程数为4,任务队列容量为3。当提交6个任务时,前2个任务会立即被核心线程执行,接下来3个任务会被放入队列,最后1个任务会触发线程池创建新的线程(因为队列已满且未达到最大线程数)。 2. 影响线程池行为:阻塞队列的类型和容量会影响线程池的整体行为。例如,使用SynchronousQueue作为任务队列时,线程池会倾向于创建更多的线程来处理任务,因为SynchronousQueue没有容量,任务不能在队列中等待。而使用有界队列(如ArrayBlockingQueue)时,当队列满且达到最大线程数后,新的任务可能会被拒绝。

阻塞队列的选择策略

  1. 任务类型:如果任务是时间敏感型的,比如需要按照特定顺序执行或者有时间限制,PriorityBlockingQueueDelayQueue可能是较好的选择。例如,在一个订单处理系统中,如果订单有优先级之分,PriorityBlockingQueue可以保证高优先级订单先被处理。
  2. 系统资源:如果系统内存有限,应避免使用无界队列(如LinkedBlockingQueue不指定容量),以免造成内存溢出。有界队列(如ArrayBlockingQueue)可以更好地控制内存使用。例如,在一个嵌入式系统中,内存资源紧张,使用有界队列可以确保系统稳定运行。
  3. 并发性能:在高并发场景下,如果任务传递性较强,SynchronousQueue可以减少队列维护开销,提高性能。而如果任务处理速度相对稳定,LinkedBlockingQueue可能更适合,因为它可以在一定程度上缓冲任务,减少线程创建和销毁的频率。

总结阻塞队列特点对线程池的意义

  1. 提高稳定性:阻塞队列的阻塞特性保证了在任务过多或过少时,线程池不会出现任务丢失或资源过度消耗的问题。例如,有界队列可以防止任务无限堆积导致内存溢出,从而提高了线程池的稳定性。
  2. 优化性能:合理选择阻塞队列类型可以优化线程池的性能。比如,PriorityBlockingQueue可以按照任务优先级分配资源,提高关键任务的处理效率;SynchronousQueue在高并发传递性任务场景下,可以减少队列维护开销,提升整体性能。
  3. 增强灵活性:不同的阻塞队列适用于不同的业务场景,使得线程池可以根据具体需求进行灵活配置。例如,在定时任务场景下,DelayQueue的使用为线程池提供了处理延迟任务的能力。

通过深入理解Java线程池阻塞队列的特点,开发者能够更加合理地设计和优化线程池,从而提高多线程应用程序的性能、稳定性和可扩展性。在实际开发中,应根据具体的业务需求和系统资源情况,精心选择合适的阻塞队列,以实现高效稳定的多线程处理。