MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 线程池任务放入阻塞队列的逻辑

2021-07-147.7k 阅读

Java 线程池任务放入阻塞队列的逻辑

线程池与阻塞队列基础概念

在深入探讨任务放入阻塞队列的逻辑之前,我们先来回顾一下线程池和阻塞队列的基本概念。

线程池

线程池是一种管理和复用线程的机制,它维护着一组线程,用于执行提交的任务。通过线程池,我们可以避免频繁创建和销毁线程带来的开销,提高系统的性能和稳定性。Java 中提供了 ExecutorService 接口及其实现类(如 ThreadPoolExecutor)来实现线程池功能。

阻塞队列

阻塞队列是一种特殊的队列,当队列满时,往队列中添加元素的操作会被阻塞,直到队列有空闲位置;当队列空时,从队列中获取元素的操作会被阻塞,直到队列中有元素。Java 提供了多种阻塞队列的实现,如 ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue 等,这些阻塞队列在 java.util.concurrent 包中。

线程池与阻塞队列的关联

线程池中的任务提交过程与阻塞队列紧密相关。当我们向线程池提交一个任务时,线程池会按照一定的策略来决定如何处理这个任务,其中一个重要的环节就是将任务放入阻塞队列。

Java 线程池任务提交流程

ThreadPoolExecutor 为例,来看任务提交的整体流程。ThreadPoolExecutor 类提供了 execute 方法用于提交任务,其大致逻辑如下:

  1. 核心线程检查:首先检查当前运行的线程数是否小于核心线程数。如果小于,就创建一个新的线程来执行任务。
  2. 阻塞队列尝试添加任务:如果当前运行的线程数达到了核心线程数,那么就尝试将任务添加到阻塞队列中。
  3. 最大线程数检查:如果阻塞队列已满,再检查当前运行的线程数是否小于最大线程数。如果小于,就创建一个新的非核心线程来执行任务。
  4. 拒绝策略执行:如果当前运行的线程数已经达到最大线程数,且阻塞队列也已满,那么就会执行拒绝策略,拒绝该任务。

下面通过代码示例来具体说明:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个阻塞队列
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(5);
        // 创建线程池,核心线程数为 2,最大线程数为 4,空闲线程存活时间为 10 秒
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                taskQueue);

        // 提交任务
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.execute(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,我们创建了一个 ThreadPoolExecutor,核心线程数为 2,最大线程数为 4,阻塞队列为 LinkedBlockingQueue,容量为 5。当我们提交 10 个任务时,前 2 个任务会立即由核心线程执行,接下来 5 个任务会被放入阻塞队列,再接下来 2 个任务会由新创建的非核心线程执行,最后 1 个任务由于线程数达到最大且队列已满,会根据拒绝策略进行处理(默认的拒绝策略是 AbortPolicy,会抛出 RejectedExecutionException)。

阻塞队列在任务提交中的细节

阻塞队列的添加操作

当线程池尝试将任务添加到阻塞队列时,具体的操作取决于阻塞队列的类型。以 LinkedBlockingQueue 为例,其 offer 方法(非阻塞添加)和 put 方法(阻塞添加)有着不同的行为。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);

        try {
            // 尝试非阻塞添加元素
            boolean added = queue.offer(1);
            System.out.println("Offer 1: " + added);

            added = queue.offer(2);
            System.out.println("Offer 2: " + added);

            added = queue.offer(3);
            System.out.println("Offer 3: " + added);

            // 队列已满,非阻塞添加失败
            added = queue.offer(4);
            System.out.println("Offer 4: " + added);

            // 阻塞添加元素
            queue.put(4);
            System.out.println("Put 4: Element added after blocking");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,offer 方法尝试将元素添加到队列,如果队列已满则返回 false,不会阻塞。而 put 方法会在队列已满时阻塞,直到有空闲位置可以添加元素。

阻塞队列的获取操作

在线程池的工作线程从阻塞队列获取任务时,同样有非阻塞和阻塞的获取方法。以 LinkedBlockingQueue 为例,poll 方法(非阻塞获取)和 take 方法(阻塞获取)。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueGetExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
        try {
            queue.put(1);
            queue.put(2);

            // 非阻塞获取元素
            Integer element = queue.poll();
            System.out.println("Poll: " + element);

            element = queue.poll();
            System.out.println("Poll: " + element);

            // 队列已空,非阻塞获取返回 null
            element = queue.poll();
            System.out.println("Poll: " + element);

            // 阻塞获取元素
            element = queue.take();
            System.out.println("Take: " + element);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个代码示例中,poll 方法尝试从队列获取元素,如果队列为空则返回 null,不会阻塞。而 take 方法会在队列为空时阻塞,直到队列中有元素可以获取。

不同类型阻塞队列对任务放入逻辑的影响

ArrayBlockingQueue

ArrayBlockingQueue 是基于数组实现的有界阻塞队列。它在创建时需要指定容量,一旦创建,容量不可改变。当线程池使用 ArrayBlockingQueue 时,任务放入队列的逻辑如下:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueThreadPoolExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(3);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                taskQueue);

        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.execute(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

由于 ArrayBlockingQueue 容量固定,当队列满时,后续任务添加操作会阻塞(如果使用 put 方法)或返回 false(如果使用 offer 方法)。这会影响线程池处理任务的节奏,当队列满且线程数未达到最大线程数时,线程池会创建新的线程来执行任务。

LinkedBlockingQueue

LinkedBlockingQueue 可以是有界的,也可以是无界的(默认是无界的,即容量为 Integer.MAX_VALUE)。当线程池使用有界的 LinkedBlockingQueue 时,任务放入逻辑与 ArrayBlockingQueue 类似,只是它是基于链表实现,在内存使用上相对更灵活。

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class LinkedBlockingQueueThreadPoolExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(3);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                taskQueue);

        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.execute(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

当使用无界的 LinkedBlockingQueue 时,由于队列理论上不会满,所以任务总是可以成功添加到队列中(不会因为队列满而触发创建新的非核心线程,除非核心线程都在忙碌),这可能导致线程池长时间保持核心线程数运行,而不会根据负载动态调整线程数量。

PriorityBlockingQueue

PriorityBlockingQueue 是一个无界的阻塞队列,它按照元素的自然顺序或者自定义的比较器顺序进行排序。当线程池使用 PriorityBlockingQueue 时,任务放入队列的逻辑会根据任务的优先级来决定。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class PriorityTask implements Runnable, Comparable<PriorityTask> {
    private int priority;
    private String taskName;

    public PriorityTask(int priority, String taskName) {
        this.priority = priority;
        this.taskName = taskName;
    }

    @Override
    public void run() {
        System.out.println("Task " + taskName + " with priority " + priority + " is running on thread " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public int compareTo(PriorityTask other) {
        return this.priority - other.priority;
    }
}

public class PriorityBlockingQueueThreadPoolExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> taskQueue = new PriorityBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                taskQueue);

        executor.execute(new PriorityTask(3, "Task 1"));
        executor.execute(new PriorityTask(1, "Task 2"));
        executor.execute(new PriorityTask(2, "Task 3"));

        executor.shutdown();
    }
}

在上述代码中,PriorityTask 实现了 Comparable 接口,根据优先级进行排序。线程池从 PriorityBlockingQueue 中获取任务时,会按照优先级顺序执行任务,这与普通的先进先出队列有所不同。

线程池任务放入阻塞队列的异常处理

在任务放入阻塞队列的过程中,可能会出现各种异常情况。例如,当使用 put 方法添加任务到阻塞队列时,如果当前线程在阻塞等待过程中被中断,会抛出 InterruptedException

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class InterruptedExceptionExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);

        Thread producerThread = new Thread(() -> {
            try {
                queue.put(1);
                System.out.println("Produced 1");
                queue.put(2);
                System.out.println("Produced 2");
                queue.put(3);
                System.out.println("Produced 3");
                // 模拟中断
                Thread.currentThread().interrupt();
                queue.put(4);
                System.out.println("Produced 4");
            } catch (InterruptedException e) {
                System.out.println("Producer thread interrupted");
                Thread.currentThread().interrupt();
            }
        });

        producerThread.start();

        try {
            Thread.sleep(2000);
            producerThread.interrupt();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,当 producerThread 在执行 queue.put(4) 时被中断,会捕获 InterruptedException 并进行相应处理。在实际的线程池应用中,对于这种异常需要妥善处理,通常可以选择重新尝试添加任务、终止当前任务处理流程或者进行其他合适的操作。

另外,当使用拒绝策略时,如果任务被拒绝,不同的拒绝策略会有不同的处理方式。例如,默认的 AbortPolicy 会抛出 RejectedExecutionExceptionCallerRunsPolicy 会在调用者线程中执行任务,DiscardPolicy 会直接丢弃任务,DiscardOldestPolicy 会丢弃队列中最老的任务并尝试重新提交当前任务。

线程池任务放入阻塞队列的性能优化

合理设置队列容量

根据应用的负载和任务特性,合理设置阻塞队列的容量是性能优化的关键。如果队列容量过小,可能导致任务频繁被拒绝或者线程池频繁创建新线程,增加系统开销;如果队列容量过大,可能导致任务在队列中长时间等待,影响响应时间。

例如,对于 I/O 密集型任务,可以适当设置较大的队列容量,因为 I/O 操作相对耗时,线程在执行 I/O 时会释放 CPU,不会导致线程池长时间繁忙。而对于 CPU 密集型任务,队列容量应该相对较小,以避免任务堆积过多影响系统性能。

选择合适的阻塞队列类型

不同类型的阻塞队列适用于不同的场景。ArrayBlockingQueue 适用于需要严格控制队列大小且对内存使用较为敏感的场景;LinkedBlockingQueue 在有界和无界场景下都有较好的适用性,尤其适用于需要动态调整队列大小的情况;PriorityBlockingQueue 则适用于任务有优先级之分的场景。

在实际应用中,需要根据任务的特点和系统的需求来选择合适的阻塞队列类型,以提高线程池的整体性能。

优化线程池参数

除了阻塞队列相关的优化,合理调整线程池的核心线程数、最大线程数、线程存活时间等参数也能对任务放入阻塞队列的逻辑产生积极影响。例如,适当增加核心线程数可以减少任务在队列中的等待时间,但也会占用更多的系统资源;合理设置线程存活时间可以避免线程频繁创建和销毁,提高线程复用率。

多线程环境下阻塞队列的并发问题及解决

在多线程环境下,多个线程同时操作阻塞队列可能会引发并发问题。例如,多个线程同时尝试添加任务到已满的队列或者同时获取任务时,可能会出现竞争条件。

Java 的阻塞队列实现(如 ArrayBlockingQueueLinkedBlockingQueue 等)内部已经通过锁机制(如 ReentrantLock)来保证线程安全。然而,在一些特殊情况下,如自定义阻塞队列或者对阻塞队列进行复杂操作时,仍然需要注意并发问题。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CustomBlockingQueue<T> {
    private T[] queue;
    private int head;
    private int tail;
    private int size;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public CustomBlockingQueue(int capacity) {
        queue = (T[]) new Object[capacity];
    }

    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            while (size == queue.length) {
                notFull.await();
            }
            queue[tail] = item;
            tail = (tail + 1) % queue.length;
            size++;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (size == 0) {
                notEmpty.await();
            }
            T item = queue[head];
            head = (head + 1) % queue.length;
            size--;
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }
}

在上述自定义阻塞队列的实现中,通过 ReentrantLockCondition 来实现线程安全的阻塞和唤醒操作。在实际应用中,如果需要自定义阻塞队列或者对现有阻塞队列进行扩展,要充分考虑并发安全问题,确保多线程环境下的正确性和稳定性。

线程池任务放入阻塞队列在实际项目中的应用案例

Web 服务器中的请求处理

在 Web 服务器中,通常会使用线程池来处理客户端的请求。当客户端发送请求时,这些请求会被封装成任务提交到线程池。阻塞队列在这里起到缓冲请求的作用,避免瞬间大量请求导致服务器过载。

例如,在一个基于 Servlet 的 Web 应用中,可以使用 ThreadPoolExecutorLinkedBlockingQueue 来处理 HTTP 请求:

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@WebServlet("/")
public class RequestHandlerServlet extends HttpServlet {
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 20;
    private static final long KEEP_ALIVE_TIME = 10;

    private ThreadPoolExecutor executor;

    @Override
    public void init() throws ServletException {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
        executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                taskQueue);
    }

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        executor.execute(() -> {
            // 处理请求逻辑
            try {
                // 模拟请求处理时间
                Thread.sleep(2000);
                response.getWriter().println("Request processed successfully");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    @Override
    public void destroy() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在这个案例中,LinkedBlockingQueue 作为请求任务的缓冲区,线程池根据队列中的任务数量和线程池的状态来动态调整线程数量,从而高效地处理客户端请求。

大数据处理中的任务调度

在大数据处理场景中,如 MapReduce 任务,通常会有大量的计算任务需要执行。可以使用线程池和阻塞队列来进行任务调度。

假设我们有一个简单的大数据计算任务,将一组数据进行并行处理:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class DataProcessor implements Runnable {
    private int data;

    public DataProcessor(int data) {
        this.data = data;
    }

    @Override
    public void run() {
        // 模拟数据处理
        int result = data * data;
        System.out.println("Processed data " + data + " result: " + result);
    }
}

public class BigDataProcessingExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                3,
                5,
                10,
                TimeUnit.SECONDS,
                taskQueue);

        List<Integer> dataList = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            dataList.add(i);
        }

        for (int data : dataList) {
            executor.execute(new DataProcessor(data));
        }

        executor.shutdown();
    }
}

在这个案例中,任务被放入阻塞队列,线程池中的线程从队列中获取任务并执行数据处理操作。通过合理设置线程池参数和阻塞队列,可以有效地管理大数据处理任务的并发执行,提高处理效率。

通过以上对 Java 线程池任务放入阻塞队列逻辑的详细分析、代码示例以及实际应用案例,相信你对这一重要机制有了更深入的理解。在实际开发中,根据具体的业务需求和系统特点,合理运用线程池和阻塞队列,能够显著提升系统的性能和稳定性。