MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java Executors 的任务执行流程

2023-06-254.9k 阅读

Java Executors 的任务执行流程基础概念

在Java并发编程领域,Executors类是一个关键的工具类,它提供了一系列静态方法用于创建各种类型的ExecutorService,而ExecutorService负责管理和执行提交的任务。理解Executors背后的任务执行流程,对于高效地编写并发程序至关重要。

线程池概念与Executors的关系

线程池是一种管理和复用线程的机制,它可以避免频繁创建和销毁线程带来的开销。Executors类为创建不同配置的线程池提供了便捷的方式。通过Executors创建的线程池本质上是ThreadPoolExecutor的实例,只不过进行了不同参数的预配置。例如,Executors.newFixedThreadPool(int nThreads)方法创建一个固定大小的线程池,池中线程数量固定为nThreads

Executors创建的常见线程池类型及其执行流程

固定大小线程池(newFixedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小为3的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " completed");
            });
        }

        executorService.shutdown();
    }
}

在上述代码中,我们创建了一个固定大小为3的线程池。当提交5个任务时,首先3个任务会立即被线程池中的3个线程执行。这3个线程并行运行任务,每个任务模拟执行2秒。由于线程池大小固定为3,剩下的2个任务会进入任务队列等待。一旦正在执行的任务中有一个完成,线程池中的线程会从任务队列中取出一个任务继续执行,直到所有任务完成。

缓存线程池(newCachedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " completed");
            });
        }

        executorService.shutdown();
    }
}

缓存线程池的特点是它会根据需要创建新线程。当提交任务时,如果线程池中有空闲线程,就会复用空闲线程执行任务。如果没有空闲线程,就会创建一个新线程来执行任务。在上述代码中,提交5个任务时,由于一开始没有空闲线程,会创建5个新线程分别执行这5个任务。缓存线程池还有一个特性,线程如果在60秒内没有任务执行,就会被回收。所以在任务执行完一段时间后,如果没有新任务提交,线程池中的线程数量会逐渐减少。

单线程线程池(newSingleThreadExecutor

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " completed");
            });
        }

        executorService.shutdown();
    }
}

单线程线程池只有一个线程来执行任务。当提交多个任务时,这些任务会按照提交顺序依次在这个唯一的线程中执行。在上述代码中,5个任务会逐个执行,前一个任务完成后,下一个任务才会开始执行,确保了任务执行的顺序性。

调度线程池(newScheduledThreadPool

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);

        scheduledExecutorService.schedule(() -> {
            System.out.println("Delayed task is running on thread " + Thread.currentThread().getName());
        }, 3, TimeUnit.SECONDS);

        scheduledExecutorService.scheduleAtFixedRate(() -> {
            System.out.println("Periodic task is running on thread " + Thread.currentThread().getName());
        }, 0, 2, TimeUnit.SECONDS);

        // 主线程睡眠10秒,确保任务有足够时间执行
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        scheduledExecutorService.shutdown();
    }
}

调度线程池可以执行延迟任务和周期性任务。在上述代码中,schedule方法用于提交一个延迟任务,该任务会在3秒后执行。scheduleAtFixedRate方法用于提交一个周期性任务,该任务会在初始延迟0秒后开始执行,然后每隔2秒执行一次。调度线程池中有2个线程,可以同时执行多个调度任务。

ThreadPoolExecutor内部执行流程深度剖析

Executors创建的线程池本质上是ThreadPoolExecutor的实例,深入理解ThreadPoolExecutor的执行流程对于掌握Executors至关重要。

核心线程与最大线程

ThreadPoolExecutor中有两个重要的线程数量参数:核心线程数(corePoolSize)和最大线程数(maximumPoolSize)。核心线程数是线程池中保持活动的最小线程数,即使这些线程暂时没有任务执行,它们也不会被销毁(除非设置了allowCoreThreadTimeOuttrue)。最大线程数是线程池能够容纳的最大线程数量。

当提交一个任务时,ThreadPoolExecutor首先会判断当前运行的线程数是否小于核心线程数。如果小于核心线程数,就会创建一个新的核心线程来执行任务。例如:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // 核心线程数
                4, // 最大线程数
                10, TimeUnit.SECONDS,
                taskQueue
        );

        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " completed");
            });
        }

        executor.shutdown();
    }
}

在上述代码中,核心线程数为2。当提交5个任务时,前2个任务会分别由新创建的2个核心线程执行。

任务队列

如果当前运行的线程数达到了核心线程数,那么新提交的任务会被放入任务队列。ThreadPoolExecutor支持多种类型的任务队列,如ArrayBlockingQueueLinkedBlockingQueue等。在上述代码中,我们使用了LinkedBlockingQueue,它是一个无界队列(这里设置了容量为10,实际也可设置为无界)。当核心线程都在忙碌时,后续任务会进入任务队列等待。如果任务队列也满了,并且当前运行的线程数小于最大线程数,ThreadPoolExecutor会创建新的非核心线程来执行任务。

拒绝策略

当任务队列已满,且线程数达到了最大线程数,此时再提交任务,ThreadPoolExecutor会采用拒绝策略来处理。常见的拒绝策略有以下几种:

  1. AbortPolicy:这是默认的拒绝策略。当任务被拒绝时,会抛出RejectedExecutionException异常。
  2. CallerRunsPolicy:当任务被拒绝时,会在调用execute方法的线程中直接执行被拒绝的任务。
  3. DiscardPolicy:直接丢弃被拒绝的任务,不做任何处理。
  4. DiscardOldestPolicy:丢弃任务队列中最老的任务,然后尝试将新任务加入任务队列。

例如,我们可以在创建ThreadPoolExecutor时指定拒绝策略:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.RejectedExecutionHandler;

public class ThreadPoolExecutorRejectionExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(2);
        RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10, TimeUnit.SECONDS,
                taskQueue,
                rejectionHandler
        );

        for (int i = 0; i < 7; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " completed");
            });
        }

        executor.shutdown();
    }
}

在上述代码中,我们设置了任务队列容量为2,当提交7个任务时,由于核心线程数为2,任务队列容量为2,最大线程数为4,前4个任务会被核心线程和新创建的非核心线程执行,任务队列会满。第5个任务会触发拒绝策略,这里采用了CallerRunsPolicy,所以第5个任务会在主线程中执行。

任务执行过程中的线程生命周期管理

ThreadPoolExecutor中,线程的生命周期管理是任务执行流程的重要部分。

线程创建与启动

当需要创建新线程时,ThreadPoolExecutor会通过ThreadFactory来创建线程。默认的ThreadFactory会创建一个新的线程,并设置线程的名称。创建好的线程会调用start方法启动,然后进入RUNNABLE状态,开始执行任务。

线程空闲与回收

当一个线程执行完任务后,它会尝试从任务队列中获取新的任务。如果任务队列中有任务,线程会继续执行任务。如果任务队列中没有任务,且当前运行的线程数大于核心线程数(当allowCoreThreadTimeOutfalse时),那么这个空闲线程会在keepAliveTime时间后被回收。例如,在我们之前创建的ThreadPoolExecutor中,设置了keepAliveTime为10秒,当一个非核心线程空闲10秒后,就会被回收。

Executors在实际项目中的应用场景

Web服务器中的请求处理

在Web服务器中,大量的客户端请求需要处理。使用Executors创建的线程池可以高效地处理这些请求。例如,使用固定大小线程池可以控制并发处理请求的数量,避免资源耗尽。缓存线程池可以根据请求的数量动态调整线程数量,适用于请求量波动较大的场景。

数据处理与计算任务

在数据处理和计算密集型任务中,如大数据分析、机器学习模型训练等,Executors可以用于并行处理任务。调度线程池可以用于定时执行数据采集任务,或者周期性地运行模型评估任务。

分布式系统中的任务调度

在分布式系统中,Executors同样发挥着重要作用。例如,在一个分布式文件系统中,Executors可以用于调度文件的上传、下载和备份任务。单线程线程池可以确保某些关键任务(如元数据更新)按顺序执行,避免数据一致性问题。

优化Executors任务执行流程的技巧

合理设置线程池参数

  1. 核心线程数:应根据任务的类型和系统资源来设置。对于CPU密集型任务,核心线程数一般设置为CPU核心数;对于I/O密集型任务,可以适当增加核心线程数,以充分利用CPU资源等待I/O操作完成。
  2. 最大线程数:要考虑系统的最大资源限制,避免创建过多线程导致系统资源耗尽。
  3. 任务队列容量:根据任务的数量和处理速度来设置。如果任务队列容量过小,可能会频繁触发拒绝策略;如果容量过大,可能会导致任务长时间等待。

选择合适的线程池类型

  1. 如果任务执行时间短且数量多,缓存线程池可能是一个不错的选择。
  2. 对于需要严格控制并发数量的场景,固定大小线程池更为合适。
  3. 当需要按顺序执行任务时,单线程线程池是首选。
  4. 对于有延迟或周期性执行需求的任务,调度线程池是最佳选择。

监控与调优

在实际应用中,应该对线程池的运行状态进行监控,如线程池中的线程数量、任务队列的大小、任务的执行时间等。通过监控数据,可以及时发现性能瓶颈,并对线程池参数进行调整,以优化任务执行流程。例如,可以使用Java的ManagementFactory获取线程池的相关指标:

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolMonitoringExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10, TimeUnit.SECONDS,
                taskQueue
        );

        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " completed");
            });
        }

        // 监控线程池状态
        Thread monitoringThread = new Thread(() -> {
            while (true) {
                ThreadInfo threadInfo = ManagementFactory.getThreadMXBean().getThreadInfo(executor.getActiveCount());
                System.out.println("Active threads: " + executor.getActiveCount());
                System.out.println("Task queue size: " + executor.getQueue().size());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        monitoringThread.setDaemon(true);
        monitoringThread.start();

        executor.shutdown();
    }
}

在上述代码中,我们启动了一个监控线程,每隔5秒打印一次线程池中的活动线程数和任务队列大小,通过这些信息可以进一步优化线程池的配置。

并发任务执行中的异常处理

Executors执行任务过程中,异常处理是不可忽视的环节。

submit方法与异常处理

当使用ExecutorServicesubmit方法提交任务时,任务执行过程中抛出的异常不会直接在主线程中显示。例如:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class SubmitExceptionExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        Future<?> future = executorService.submit(() -> {
            throw new RuntimeException("Task failed");
        });

        try {
            future.get();
        } catch (Exception e) {
            System.out.println("Caught exception: " + e.getMessage());
        }

        executorService.shutdown();
    }
}

在上述代码中,任务抛出了一个RuntimeException。如果不调用future.get()方法,这个异常不会被捕获。调用future.get()方法时,如果任务已经完成并抛出异常,get方法会重新抛出这个异常,从而可以在try - catch块中捕获并处理。

execute方法与异常处理

使用ExecutorServiceexecute方法提交任务时,如果任务在执行过程中抛出异常,默认情况下这个异常会导致执行任务的线程终止,但不会在主线程中直接显示。可以通过自定义Thread.UncaughtExceptionHandler来捕获这种异常:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecuteExceptionExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
            System.out.println("Caught exception in thread " + t.getName() + ": " + e.getMessage());
        });

        executorService.execute(() -> {
            throw new RuntimeException("Task failed");
        });

        executorService.shutdown();
    }
}

在上述代码中,我们设置了默认的UncaughtExceptionHandler,当执行任务的线程抛出未捕获的异常时,会在UncaughtExceptionHandler中进行处理。

总结Executors任务执行流程的要点

  1. 线程池类型选择:根据任务特性选择合适的线程池类型,如固定大小线程池用于控制并发量,缓存线程池用于动态调整线程数量等。
  2. 参数设置:合理设置核心线程数、最大线程数、任务队列容量等参数,以优化任务执行效率和系统资源利用。
  3. 异常处理:了解submitexecute方法在异常处理上的差异,采取相应的异常捕获和处理机制。
  4. 监控与调优:通过监控线程池的运行状态,如线程数、任务队列大小等指标,对线程池进行动态调优,确保任务执行流程的高效性和稳定性。

通过深入理解Executors的任务执行流程,开发者可以在并发编程中充分发挥Java多线程的优势,编写高效、稳定的应用程序。无论是在小型应用还是大型分布式系统中,合理运用Executors都能显著提升系统性能。