MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java Executors 框架的深度解析

2024-08-091.9k 阅读

Java Executors 框架简介

在Java并发编程领域,Executors框架扮演着至关重要的角色。它为开发者提供了一种高效且便捷的方式来管理和执行线程任务。Executors框架是Java 5.0引入的,其核心目的是简化线程池的创建与使用,同时提供对异步任务执行的有效控制。

Executors框架主要由以下几个关键组件构成:

  1. Executor:这是一个顶层接口,它定义了一个简单的方法execute(Runnable task),该方法用于提交一个可运行的任务。其作用是将任务的提交和执行分离,使得代码的结构更加清晰,同时也为线程池等高级实现提供了基础。
  2. ExecutorService:它继承自Executor接口,扩展了一些用于管理任务执行和控制线程池生命周期的方法。例如,submit(Callable<T> task)方法可以提交一个返回值的任务,shutdown()方法用于启动线程池的关闭过程,isTerminated()方法用于判断线程池是否已经终止等。
  3. ScheduledExecutorService:进一步继承自ExecutorService,主要用于支持定时和周期性任务的执行。例如,可以安排一个任务在指定延迟后执行,或者按照固定的时间间隔重复执行。
  4. ThreadPoolExecutor:这是ExecutorService接口的一个具体实现类,是线程池的核心实现。它提供了高度可配置的线程池参数,如核心线程数、最大线程数、线程存活时间等,使得开发者能够根据不同的应用场景来优化线程池的性能。
  5. ScheduledThreadPoolExecutor:是ScheduledExecutorService接口的具体实现,用于实现定时和周期性任务的执行。

创建线程池的常见方式

  1. 通过Executors工厂方法创建固定大小线程池 使用Executors.newFixedThreadPool(int nThreads)方法可以创建一个固定大小的线程池。该线程池的核心线程数和最大线程数都等于传入的参数nThreads。一旦线程池创建完成,它会预先创建并启动nThreads个线程来等待任务的到来。
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class FixedThreadPoolExample {
        public static void main(String[] args) {
            // 创建一个固定大小为3的线程池
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 5; i++) {
                final int taskNumber = i;
                executorService.submit(() -> {
                    System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Task " + taskNumber + " has finished.");
                });
            }
            executorService.shutdown();
        }
    }
    
    在上述代码中,我们创建了一个大小为3的固定线程池,并向其中提交了5个任务。由于线程池大小为3,所以在同一时间最多有3个任务可以并行执行。当一个任务执行完毕后,线程池中的线程会被复用去执行下一个任务。
  2. 创建单线程化的线程池 Executors.newSingleThreadExecutor()方法创建的线程池只有一个线程。这个线程会按照任务提交的顺序依次执行任务,这样可以保证任务的顺序性。
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class SingleThreadExecutorExample {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 5; i++) {
                final int taskNumber = i;
                executorService.submit(() -> {
                    System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Task " + taskNumber + " has finished.");
                });
            }
            executorService.shutdown();
        }
    }
    
    上述代码中,所有的任务都会由同一个线程依次执行,不会出现并行执行的情况,这对于一些需要顺序执行的任务场景非常适用,比如数据库的某些事务操作,需要按顺序执行以保证数据的一致性。
  3. 创建可缓存线程池 Executors.newCachedThreadPool()方法创建的线程池是可缓存的。如果线程池中有空闲线程,就复用空闲线程执行任务;如果没有空闲线程,就创建新的线程来执行任务。而且,如果一个线程在60秒内没有被使用,就会被回收。
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CachedThreadPoolExample {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < 5; i++) {
                final int taskNumber = i;
                executorService.submit(() -> {
                    System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Task " + taskNumber + " has finished.");
                });
            }
            executorService.shutdown();
        }
    }
    
    在这个例子中,如果提交任务的速度很快,线程池可能会创建较多的线程来处理任务。但当任务执行完毕,闲置线程在60秒后会被回收,这样可以有效地控制资源的使用。

ThreadPoolExecutor的深入剖析

  1. ThreadPoolExecutor的构造函数 ThreadPoolExecutor提供了多个构造函数,其中最常用的一个构造函数如下:
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    
    • corePoolSize:核心线程数,线程池会尽量保持这个数量的线程处于活动状态。即使这些线程暂时没有任务执行,它们也不会被销毁,除非设置了allowCoreThreadTimeOuttrue
    • maximumPoolSize:线程池允许的最大线程数。当任务队列已满,且活跃线程数小于最大线程数时,线程池会创建新的线程来处理任务。
    • keepAliveTime:当线程数大于核心线程数时,多余的空闲线程的存活时间。即如果一个线程空闲了keepAliveTime这么长时间,且当前线程数大于核心线程数,那么这个线程将会被终止。
    • unitkeepAliveTime的时间单位,如TimeUnit.SECONDSTimeUnit.MILLISECONDS等。
    • workQueue:任务队列,用于存放等待执行的任务。常见的任务队列有ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue等。
    • threadFactory:线程工厂,用于创建新的线程。通过自定义线程工厂,可以设置线程的名称、优先级、是否为守护线程等属性。
    • handler:拒绝策略,当任务队列已满且线程数达到最大线程数时,新提交的任务会被拒绝,此时就会使用这个拒绝策略来处理被拒绝的任务。常见的拒绝策略有AbortPolicy(默认策略,直接抛出RejectedExecutionException异常)、CallerRunsPolicy(将任务交给调用者线程执行)、DiscardPolicy(直接丢弃任务)、DiscardOldestPolicy(丢弃队列中最老的任务,然后尝试提交新任务)。
  2. 任务执行流程 当一个任务提交到ThreadPoolExecutor时,其执行流程如下:
    • 如果当前线程数小于核心线程数,无论核心线程是否有空闲,都会创建一个新的线程来执行任务。
    • 如果当前线程数等于核心线程数,任务会被放入任务队列中等待执行。
    • 如果任务队列已满,且当前线程数小于最大线程数,会创建新的线程来执行任务。
    • 如果任务队列已满,且当前线程数达到最大线程数,新提交的任务会根据拒绝策略进行处理。 下面通过一个示例代码来演示ThreadPoolExecutor的任务执行流程:
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolExecutorExample {
        public static void main(String[] args) {
            BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    2,
                    4,
                    10,
                    TimeUnit.SECONDS,
                    workQueue,
                    new ThreadPoolExecutor.CallerRunsPolicy());
            for (int i = 0; i < 6; i++) {
                final int taskNumber = i;
                executor.submit(() -> {
                    System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Task " + taskNumber + " has finished.");
                });
            }
            executor.shutdown();
        }
    }
    
    在上述代码中,核心线程数为2,最大线程数为4,任务队列容量为2。当提交6个任务时,前2个任务会立即由核心线程执行,接下来2个任务会被放入任务队列,再接下来2个任务由于任务队列已满且当前线程数小于最大线程数,会创建新的线程来执行。如果继续提交任务,由于达到最大线程数且任务队列已满,会根据CallerRunsPolicy拒绝策略,由调用者线程来执行任务。

ScheduledExecutorService的使用

  1. 延迟任务的执行 ScheduledExecutorService可以用于安排一个任务在指定的延迟时间后执行。通过schedule(Callable<V> callable, long delay, TimeUnit unit)方法可以提交一个有返回值的任务,通过schedule(Runnable command, long delay, TimeUnit unit)方法可以提交一个无返回值的任务。
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class DelayedTaskExample {
        public static void main(String[] args) {
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
            executorService.schedule(() -> {
                System.out.println("Delayed task is running on thread " + Thread.currentThread().getName());
            }, 5, TimeUnit.SECONDS);
            executorService.shutdown();
        }
    }
    
    在上述代码中,我们创建了一个单线程的ScheduledExecutorService,并安排了一个任务在5秒后执行。
  2. 周期性任务的执行 ScheduledExecutorService还支持周期性任务的执行。有两种方式来实现周期性任务:scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
    • scheduleAtFixedRate:该方法会按照固定的速率执行任务。即从任务开始执行的时间点起,每隔period时间就执行一次任务,无论上一次任务执行的时间长短。
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class FixedRateTaskExample {
        public static void main(String[] args) {
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
            executorService.scheduleAtFixedRate(() -> {
                System.out.println("Fixed - rate task is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, 0, 2, TimeUnit.SECONDS);
            // 这里不调用shutdown,任务会持续执行
        }
    }
    
    在上述代码中,任务会立即开始执行,然后每隔2秒执行一次。由于任务执行时间为3秒,大于间隔时间2秒,所以实际执行间隔会以任务执行完毕的时间点为起点,每3秒执行一次。
    • scheduleWithFixedDelay:该方法会按照固定的延迟时间执行任务。即上一次任务执行完毕后,等待delay时间再执行下一次任务。
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class FixedDelayTaskExample {
        public static void main(String[] args) {
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
            executorService.scheduleWithFixedDelay(() -> {
                System.out.println("Fixed - delay task is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, 0, 2, TimeUnit.SECONDS);
            // 这里不调用shutdown,任务会持续执行
        }
    }
    
    在这个例子中,任务同样立即开始执行,上一次任务执行完毕(耗时3秒)后,等待2秒再执行下一次任务,所以实际执行间隔为5秒。

线程池的性能调优

  1. 合理设置核心线程数和最大线程数 核心线程数和最大线程数的设置需要根据应用程序的类型和硬件环境来决定。
    • CPU密集型任务:对于CPU密集型任务,线程数过多会导致频繁的上下文切换,降低性能。一般来说,核心线程数可以设置为CPU核心数 + 1。例如,在一个4核的CPU上,核心线程数可以设置为5。这样可以充分利用CPU资源,同时在某个线程因为偶尔的I/O操作等原因阻塞时,还有额外的线程可以继续使用CPU。
    • I/O密集型任务:I/O密集型任务大部分时间都在等待I/O操作完成,所以可以设置较多的线程数。通常可以将核心线程数设置为CPU核心数的2倍甚至更多。比如,在4核CPU上,可以将核心线程数设置为8或10,以便在等待I/O的同时,有足够的线程去处理其他任务。
  2. 选择合适的任务队列
    • ArrayBlockingQueue:是一个有界队列,其容量在创建时就确定。它适合任务数量可以预估的场景,并且可以通过设置合适的容量来避免队列过大导致内存占用过高。例如,在一个已知任务量不会太大的系统中,可以使用ArrayBlockingQueue来控制任务队列的大小,防止任务堆积过多。
    • LinkedBlockingQueue:可以是有界或无界的(默认无界)。无界队列在任务较多时可能会导致内存溢出问题,所以在使用无界队列时需要谨慎。有界的LinkedBlockingQueue则可以在一定程度上控制任务队列的大小。它适用于任务量波动较大,但又希望避免任务被立即拒绝的场景。
    • SynchronousQueue:这个队列没有容量,每个插入操作必须等待另一个线程的移除操作,反之亦然。它适合处理非常高并发的任务,因为它不会缓存任务,而是直接将任务交给线程执行。但如果线程池中的线程数不足,可能会导致任务提交时阻塞。
  3. 优化拒绝策略 选择合适的拒绝策略对于系统的稳定性和可靠性非常重要。
    • AbortPolicy:默认策略,适用于一些对任务执行成功率要求较高,不允许轻易丢弃任务的场景。当任务被拒绝时,会抛出异常,开发者可以通过捕获异常来进行相应的处理,比如记录日志、尝试重新提交任务等。
    • CallerRunsPolicy:该策略将任务交给调用者线程执行,这样可以减轻线程池的压力,同时保证任务不会被丢弃。适合于对任务执行时间要求不高,且调用者线程有空闲资源来处理任务的场景,例如在一些后台管理系统中,偶尔的任务执行可以由调用者线程处理。
    • DiscardPolicy:直接丢弃任务,适用于一些对任务执行结果不敏感,且任务量较大的场景,比如一些日志记录任务,如果因为系统繁忙导致部分日志任务被丢弃,对系统整体功能影响不大。
    • DiscardOldestPolicy:丢弃队列中最老的任务,然后尝试提交新任务。适用于希望优先处理新任务,且对任务顺序要求不是特别严格的场景。例如在一些实时数据处理系统中,新的数据可能比旧数据更有价值,所以可以丢弃旧任务以处理新任务。

线程池与资源管理

  1. 线程池与内存管理 线程池中的线程本身会占用一定的内存空间,包括线程栈等。如果线程池设置不合理,创建过多的线程,可能会导致内存溢出问题。特别是在使用可缓存线程池或者没有合理设置最大线程数的情况下,当任务提交速度过快时,线程池可能会不断创建新线程,耗尽内存资源。 例如,在一个内存有限的服务器上,如果使用Executors.newCachedThreadPool()且任务提交非常频繁,可能在短时间内创建大量线程,最终导致内存不足。因此,在使用线程池时,需要根据系统的内存情况来合理设置线程池的参数,避免内存过度消耗。
  2. 线程池与CPU资源管理 线程池的线程数量与CPU资源的利用密切相关。如果线程数过少,CPU资源可能无法充分利用;而线程数过多,又会导致频繁的上下文切换,增加CPU的额外开销。例如,在一个CPU密集型任务的应用中,如果核心线程数设置远小于CPU核心数,那么CPU的部分核心可能处于空闲状态,降低了整体性能;相反,如果设置的线程数过多,每个线程获得的CPU时间片会减少,上下文切换频繁,也会影响性能。 为了优化CPU资源的利用,需要根据任务类型(CPU密集型还是I/O密集型)来合理调整线程池的线程数量,同时可以通过操作系统的性能监控工具(如Linux下的top命令、Windows下的任务管理器等)来观察CPU的使用情况,进一步调整线程池参数。
  3. 线程池与I/O资源管理 在涉及I/O操作的任务中,线程池的管理也很重要。如果线程池中的线程在执行I/O操作时没有合理的资源分配和管理,可能会导致I/O瓶颈。例如,在一个多线程读取文件的应用中,如果所有线程同时竞争文件资源,可能会导致文件读取效率低下。 可以通过使用连接池等技术来管理I/O资源,例如数据库连接池。在多线程访问数据库时,使用连接池可以避免每个线程都创建新的数据库连接,从而提高数据库访问的效率。同时,在线程执行I/O操作时,可以采用异步I/O等方式,让线程在等待I/O完成时可以执行其他任务,提高线程的利用率。

实战案例:使用Java Executors框架实现高性能任务处理

  1. 案例背景 假设我们有一个电商系统,需要处理大量的订单任务,包括订单的创建、支付处理、库存更新等。这些任务可以并行处理,以提高系统的处理效率。同时,为了保证系统的稳定性,需要对任务执行进行有效的管理和控制。
  2. 实现方案
    • 线程池的选择与配置:由于订单处理任务属于I/O密集型任务(涉及数据库操作、网络通信等I/O操作),我们可以选择创建一个固定大小的线程池,并根据服务器的CPU核心数和内存情况进行配置。假设服务器是8核CPU,我们可以将核心线程数设置为16,最大线程数设置为20,任务队列选择LinkedBlockingQueue,容量设置为100。这样可以充分利用系统资源,同时避免任务堆积过多。
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class OrderProcessingSystem {
        private static final int CORE_POOL_SIZE = 16;
        private static final int MAX_POOL_SIZE = 20;
        private static final int QUEUE_CAPACITY = 100;
        private static final long KEEP_ALIVE_TIME = 10;
    
        private static ThreadPoolExecutor executor;
    
        static {
            BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
            executor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE,
                    MAX_POOL_SIZE,
                    KEEP_ALIVE_TIME,
                    TimeUnit.SECONDS,
                    workQueue,
                    new ThreadPoolExecutor.CallerRunsPolicy());
        }
    
        public static void processOrder(Order order) {
            executor.submit(() -> {
                // 模拟订单处理逻辑
                System.out.println("Processing order " + order.getOrderId() + " on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                    System.out.println("Order " + order.getOrderId() + " processed successfully.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    
        public static void shutdown() {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                        System.err.println("Pool did not terminate");
                    }
                }
            } catch (InterruptedException ie) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
    
    • 任务提交与执行:在系统中,当有新订单到来时,调用processOrder方法将订单任务提交到线程池。
    public class Main {
        public static void main(String[] args) {
            for (int i = 0; i < 200; i++) {
                Order order = new Order(i);
                OrderProcessingSystem.processOrder(order);
            }
            OrderProcessingSystem.shutdown();
        }
    }
    
    class Order {
        private int orderId;
    
        public Order(int orderId) {
            this.orderId = orderId;
        }
    
        public int getOrderId() {
            return orderId;
        }
    }
    
    在上述代码中,我们模拟了200个订单的提交和处理。线程池会根据配置的参数来管理任务的执行,当任务队列已满且线程数达到最大线程数时,新的订单任务会根据CallerRunsPolicy拒绝策略由调用者线程处理,保证订单不会被丢弃。同时,在系统关闭时,通过调用shutdown方法来优雅地关闭线程池,等待所有任务执行完毕或超时强制终止。

通过以上对Java Executors框架的深度解析,从框架的基本概念、线程池的创建与使用、核心类的剖析、性能调优到实战案例,我们全面地了解了Executors框架在Java并发编程中的重要性和强大功能。合理使用Executors框架可以显著提高应用程序的性能和稳定性,充分利用系统资源来处理大量的并发任务。