MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java ThreadPoolExecutor 的拒绝策略

2022-09-215.2k 阅读

Java ThreadPoolExecutor 的拒绝策略基础概念

在Java的多线程编程中,ThreadPoolExecutor是一个非常强大且常用的线程池实现类。线程池允许我们复用已有的线程,而不是每次执行任务都创建新线程,这大大提高了系统的性能和资源利用率。然而,当线程池的任务队列已满,并且线程池中的线程数量也达到了最大限制时,新提交的任务就需要有相应的处理方式,这就是拒绝策略发挥作用的地方。

ThreadPoolExecutor提供了四种内置的拒绝策略,这些策略定义在RejectedExecutionHandler接口中,当ThreadPoolExecutor无法接受新任务时,就会调用RejectedExecutionHandlerrejectedExecution方法来处理拒绝的任务。

AbortPolicy(默认拒绝策略)

  1. 策略描述 AbortPolicyThreadPoolExecutor的默认拒绝策略。当任务无法被执行时,它会抛出RejectedExecutionException异常,阻止系统正常运行。这种策略比较“激进”,适用于对任务执行失败非常敏感的场景,一旦任务无法执行,系统就认为出现了严重问题,不能继续运行。

  2. 代码示例

import java.util.concurrent.*;

public class AbortPolicyExample {
    public static void main(String[] args) {
        // 创建一个线程池,核心线程数为2,最大线程数为4,队列容量为3
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                new ThreadPoolExecutor.AbortPolicy()
        );

        // 提交10个任务
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}

在上述代码中,线程池的核心线程数为2,最大线程数为4,任务队列容量为3。当提交10个任务时,前2 + 3 = 5个任务会被正常处理,第6个任务开始会触发拒绝策略,抛出RejectedExecutionException异常。

CallerRunsPolicy

  1. 策略描述 CallerRunsPolicy策略比较特殊,当任务被拒绝时,它不会抛出异常,也不会抛弃任务,而是将被拒绝的任务在调用execute方法的线程中直接执行。这种策略可以在一定程度上减轻线程池的压力,因为它让提交任务的线程来分担一部分任务执行工作。同时,它也可以保证所有提交的任务最终都会被执行,适用于对任务执行顺序和完整性要求较高的场景。

  2. 代码示例

import java.util.concurrent.*;

public class CallerRunsPolicyExample {
    public static void main(String[] args) {
        // 创建一个线程池,核心线程数为2,最大线程数为4,队列容量为3
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 提交10个任务
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}

在这个示例中,当任务超过线程池和队列的承载能力时,被拒绝的任务会在主线程(调用execute方法的线程)中执行。我们可以从打印的线程名称中看到,当任务被拒绝时,会出现主线程执行任务的情况。

DiscardPolicy

  1. 策略描述 DiscardPolicy是一种比较“温和”的拒绝策略。当任务被拒绝时,它会直接丢弃该任务,不会抛出任何异常,也不会对任务进行任何处理。这种策略适用于那些对任务执行与否不是特别敏感,允许部分任务丢失的场景,例如一些统计性质的任务,偶尔丢失几个任务对整体统计结果影响不大。

  2. 代码示例

import java.util.concurrent.*;

public class DiscardPolicyExample {
    public static void main(String[] args) {
        // 创建一个线程池,核心线程数为2,最大线程数为4,队列容量为3
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                new ThreadPoolExecutor.DiscardPolicy()
        );

        // 提交10个任务
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}

在上述代码中,当任务超过线程池和队列的承载能力时,被拒绝的任务会被默默丢弃,不会有任何异常或提示。通过观察任务的执行情况,可以发现部分任务没有被执行。

DiscardOldestPolicy

  1. 策略描述 DiscardOldestPolicy策略在任务被拒绝时,会丢弃任务队列中最老的一个任务(即最先进入队列的任务),然后尝试将新任务加入队列。如果再次失败,会继续重复这个过程,直到成功将新任务加入队列或者最终无法加入(例如队列已满且线程池已达到最大线程数)。这种策略适用于希望优先处理新任务的场景,通过牺牲旧任务来为新任务腾出空间。

  2. 代码示例

import java.util.concurrent.*;

public class DiscardOldestPolicyExample {
    public static void main(String[] args) {
        // 创建一个线程池,核心线程数为2,最大线程数为4,队列容量为3
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );

        // 提交10个任务
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}

在这个示例中,当任务超过线程池和队列的承载能力时,会不断丢弃队列中最老的任务,尝试让新任务进入队列。通过观察任务的执行顺序,可以发现较早提交的任务可能会被丢弃,而新提交的任务有更多机会被执行。

自定义拒绝策略

除了上述四种内置的拒绝策略,ThreadPoolExecutor还允许我们自定义拒绝策略。通过实现RejectedExecutionHandler接口,我们可以根据业务需求灵活地定义任务被拒绝时的处理逻辑。

  1. 实现步骤

    • 实现RejectedExecutionHandler接口,重写rejectedExecution方法。
    • rejectedExecution方法中编写自定义的处理逻辑,例如记录日志、将任务存储到其他地方等。
    • 将自定义的拒绝策略实例传递给ThreadPoolExecutor的构造函数。
  2. 代码示例

import java.util.concurrent.*;

class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task " + r + " is rejected. Executor status: " + executor.toString());
        // 这里可以添加自定义处理逻辑,例如记录日志到文件
    }
}

public class CustomRejectionPolicyExample {
    public static void main(String[] args) {
        // 创建一个线程池,核心线程数为2,最大线程数为4,队列容量为3
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                new CustomRejectedExecutionHandler()
        );

        // 提交10个任务
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}

在上述代码中,我们实现了一个自定义的拒绝策略CustomRejectedExecutionHandler,当任务被拒绝时,它会打印任务被拒绝的信息以及线程池的当前状态。在实际应用中,我们可以根据业务需求在rejectedExecution方法中添加更复杂的处理逻辑,如将被拒绝的任务持久化到数据库,以便后续重试等。

选择合适的拒绝策略

选择合适的拒绝策略对于线程池的性能和系统的稳定性至关重要。以下是一些在选择拒绝策略时需要考虑的因素:

  1. 任务的重要性
    • 如果任务非常重要,不能丢失,那么AbortPolicyCallerRunsPolicy可能更合适。AbortPolicy可以立即发现任务执行失败的问题,而CallerRunsPolicy可以保证任务最终被执行。
    • 对于一些相对不那么重要的任务,如统计任务、日志记录任务等,可以考虑使用DiscardPolicyDiscardOldestPolicy,允许部分任务丢失。
  2. 系统的性能要求
    • CallerRunsPolicy会让提交任务的线程执行被拒绝的任务,这可能会影响主线程的性能,因此在对主线程性能要求较高的场景下,需要谨慎使用。
    • DiscardPolicyDiscardOldestPolicy虽然不会对系统性能产生额外的压力,但可能会导致部分任务丢失,需要根据业务对任务丢失的容忍度来选择。
  3. 任务的执行顺序
    • 如果希望优先处理新任务,可以选择DiscardOldestPolicy,它会丢弃队列中最老的任务,为新任务腾出空间。
    • 如果对任务的执行顺序有严格要求,那么CallerRunsPolicy可能更合适,因为它会在调用线程中执行被拒绝的任务,保证任务的提交顺序与执行顺序一致。

拒绝策略在实际项目中的应用场景

  1. 高并发订单处理系统 在电商的高并发订单处理系统中,订单的处理任务非常重要,不能轻易丢弃。可以使用AbortPolicy作为拒绝策略,当线程池无法处理新订单任务时,抛出异常,系统可以通过捕获异常进行相应的处理,如提示用户稍后重试,或者将订单任务记录到日志中,以便后续人工处理。同时,也可以考虑结合CallerRunsPolicy,在系统负载不是特别高的情况下,让提交订单的线程暂时处理被拒绝的任务,保证订单能够及时处理。
  2. 日志收集系统 在日志收集系统中,日志记录任务相对来说没有那么高的重要性,允许部分日志丢失。可以使用DiscardPolicy作为拒绝策略,当线程池和队列满时,直接丢弃新的日志记录任务,不会对系统造成太大影响。这样可以保证系统在高负载情况下仍然能够正常运行,不会因为处理过多的日志任务而导致性能下降。
  3. 实时数据分析系统 在实时数据分析系统中,通常希望优先处理最新的数据,以保证分析结果的实时性。可以使用DiscardOldestPolicy作为拒绝策略,当线程池和队列满时,丢弃最早进入队列的数据处理任务,为新的数据处理任务腾出空间。这样可以确保系统始终处理最新的数据,提高分析结果的时效性。

总结

ThreadPoolExecutor的拒绝策略是多线程编程中一个非常重要的概念,合理选择拒绝策略可以使线程池在面对高负载任务时更加稳定和高效。通过了解AbortPolicyCallerRunsPolicyDiscardPolicyDiscardOldestPolicy这四种内置拒绝策略以及自定义拒绝策略的原理和应用场景,开发者可以根据具体的业务需求,灵活选择和定制适合的拒绝策略,从而优化系统的性能和稳定性。在实际项目中,需要综合考虑任务的重要性、系统性能要求以及任务执行顺序等因素,选择最合适的拒绝策略,确保多线程系统能够稳定、高效地运行。

以上就是关于Java ThreadPoolExecutor拒绝策略的详细介绍,希望对大家在多线程编程中有所帮助。在实际应用中,需要不断实践和调整,以找到最适合业务场景的线程池配置和拒绝策略。