MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 项目中避免使用 Executors 创建线程池的原因

2021-06-214.9k 阅读

一、线程池基础概述

1.1 线程池的概念与作用

在Java多线程编程中,线程池是一种重要的资源管理机制。线程的创建和销毁是相对昂贵的操作,涉及到操作系统内核态与用户态的切换等开销。线程池预先创建一定数量的线程,这些线程可以被重复使用来执行多个任务。当有新任务提交时,线程池会分配一个空闲线程来处理该任务,任务执行完毕后,线程并不会被销毁,而是返回线程池等待下一个任务。这种机制大大减少了线程创建和销毁的开销,提高了系统的性能和资源利用率。

例如,在一个Web服务器中,每一个HTTP请求都可以看作是一个任务。如果每次收到请求都创建一个新线程来处理,在高并发情况下,频繁的线程创建和销毁会导致系统性能急剧下降。而使用线程池,服务器可以预先创建一定数量的线程,将请求分配给这些线程处理,从而有效提升系统的并发处理能力。

1.2 Java线程池的核心类

Java通过java.util.concurrent包提供了丰富的线程池相关类。其中,ThreadPoolExecutor是线程池的核心实现类,它提供了灵活的线程池配置选项。通过构造函数,我们可以设置核心线程数、最大线程数、线程存活时间等关键参数。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 构造函数实现
}
  • corePoolSize:核心线程数,线程池会一直维护这些线程,即使它们处于空闲状态。
  • maximumPoolSize:最大线程数,线程池允许创建的最大线程数量。
  • keepAliveTime:当线程数大于核心线程数时,多余的空闲线程的存活时间。
  • unitkeepAliveTime的时间单位。
  • workQueue:任务队列,用于存放等待执行的任务。
  • threadFactory:线程工厂,用于创建线程。
  • handler:拒绝策略,当任务队列已满且线程数达到最大线程数时,新任务的处理策略。

Executors类则是一个工具类,它提供了一些静态方法来创建不同类型的线程池,如newFixedThreadPoolnewCachedThreadPoolnewSingleThreadExecutor等,这些方法底层实际上是通过ThreadPoolExecutor来实现的。

二、Executors创建线程池的方式

2.1 newFixedThreadPool

Executors.newFixedThreadPool(int nThreads)方法创建一个固定大小的线程池,该线程池中的线程数量始终保持不变,即核心线程数和最大线程数相等。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

在上述代码中,我们创建了一个固定大小为3的线程池,并提交了5个任务。由于线程池大小为3,所以一开始会有3个任务同时执行,另外2个任务会进入任务队列等待。当正在执行的任务完成后,线程池中的线程会从任务队列中取出任务继续执行。

2.2 newCachedThreadPool

Executors.newCachedThreadPool()方法创建一个可缓存的线程池。该线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE,即理论上可以创建无限多个线程。当有新任务提交时,如果线程池中有空闲线程,则复用空闲线程;如果没有空闲线程,则创建一个新线程来处理任务。如果某个线程在60秒内没有被使用,它将被回收。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

在这个例子中,由于线程池可动态创建线程,所以5个任务可能会同时由不同的线程执行。如果在一段时间内没有新任务提交,那些空闲的线程会在60秒后被回收。

2.3 newSingleThreadExecutor

Executors.newSingleThreadExecutor()方法创建一个单线程的线程池,即核心线程数和最大线程数都为1。它保证所有任务按照提交的顺序依次在一个线程中执行。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

在上述代码中,5个任务会依次在同一个线程中执行,前一个任务执行完毕后,下一个任务才会开始。

三、避免使用Executors创建线程池的原因

3.1 任务队列可能导致OOM(OutOfMemoryError)

3.1.1 newFixedThreadPool和newSingleThreadExecutor的任务队列问题

newFixedThreadPoolnewSingleThreadExecutor方法默认使用的任务队列是LinkedBlockingQueue,这是一个无界队列。当任务提交速度大于线程池处理速度时,任务会不断堆积在任务队列中。由于队列理论上可以无限增长,最终可能会耗尽系统内存,导致OutOfMemoryError

例如,在一个电商系统的订单处理模块中,如果使用newFixedThreadPool来处理订单任务,在促销活动期间,订单提交量可能会急剧增加。如果线程池处理订单的速度跟不上订单提交速度,订单任务就会在LinkedBlockingQueue中不断堆积,最终导致内存溢出。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class OOMExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " is processed.");
            });
        }
        // executorService.shutdown(); 故意不关闭,让任务持续堆积
    }
}

在这个代码示例中,我们创建了一个固定大小为1的线程池,并不断提交任务。由于任务处理速度较慢(每个任务睡眠1秒),而提交任务的速度很快,任务会在LinkedBlockingQueue中不断堆积,最终可能导致OutOfMemoryError

3.1.2 解决方案

为了避免这种情况,我们应该使用有界队列来代替LinkedBlockingQueue。例如,可以使用ArrayBlockingQueue,它在创建时需要指定队列的容量。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class FixedThreadPoolWithBoundedQueue {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                3,
                3,
                0L,
                TimeUnit.MILLISECONDS,
                workQueue
        );
        for (int i = 0; i < 20; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

在上述代码中,我们创建了一个容量为10的ArrayBlockingQueue作为任务队列。当任务队列满且线程池达到最大线程数时,后续提交的任务会根据拒绝策略进行处理,从而避免了任务无限堆积导致的内存溢出问题。

3.2 可能创建过多线程导致系统资源耗尽

3.2.1 newCachedThreadPool的线程创建问题

newCachedThreadPool方法创建的线程池最大线程数为Integer.MAX_VALUE,在高并发情况下,可能会创建大量的线程。过多的线程会占用大量的系统资源,如内存、CPU等,导致系统性能下降甚至崩溃。

例如,在一个分布式爬虫系统中,如果使用newCachedThreadPool来处理网页抓取任务,当需要抓取的网页数量非常庞大时,可能会瞬间创建大量线程。每个线程都需要占用一定的内存空间,过多的线程会导致内存耗尽,同时大量线程竞争CPU资源,也会使系统的整体性能急剧下降。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadOverheadExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100000; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " is running.");
            });
        }
        // executorService.shutdown(); 故意不关闭,让线程持续创建
    }
}

在这个代码示例中,我们尝试提交10万个任务到newCachedThreadPool。由于线程池可以无限创建线程,在实际运行中,可能会因为创建过多线程而导致系统资源耗尽。

3.2.2 解决方案

我们应该根据系统的实际资源情况,设置合理的最大线程数。通过直接使用ThreadPoolExecutor,我们可以精确控制线程池的参数。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CachedThreadPoolWithLimit {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                0,
                100,
                60L,
                TimeUnit.SECONDS,
                workQueue
        );
        for (int i = 0; i < 1000; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

在上述代码中,我们设置最大线程数为100,这样即使在高并发情况下,线程池最多也只会创建100个线程,从而有效避免了因创建过多线程导致的系统资源耗尽问题。

3.3 缺乏对线程池状态的精确控制

3.3.1 Executors创建线程池的局限性

通过Executors创建的线程池,开发者对线程池的状态控制相对有限。例如,对于newFixedThreadPoolnewCachedThreadPool,我们很难直接获取线程池当前的任务队列大小、活动线程数等详细信息,这在进行系统监控和性能调优时会带来不便。

假设我们正在开发一个实时数据分析系统,需要实时了解线程池的任务处理情况,以便及时调整系统参数。如果使用Executors.newFixedThreadPool,我们无法直接获取任务队列中等待处理的任务数量,也就难以准确评估系统的负载情况。

3.3.2 解决方案

当我们直接使用ThreadPoolExecutor时,可以方便地获取线程池的各种状态信息。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolStatusExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                3,
                5,
                10L,
                TimeUnit.SECONDS,
                workQueue
        );
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        System.out.println("Active threads: " + executorService.getActiveCount());
        System.out.println("Queue size: " + executorService.getQueue().size());
        executorService.shutdown();
    }
}

在上述代码中,我们通过ThreadPoolExecutorgetActiveCount方法获取当前活动线程数,通过getQueue().size()方法获取任务队列的大小。这些信息对于系统的监控和调优非常重要,而通过Executors创建的线程池获取这些信息相对困难。

3.4 拒绝策略的默认设置可能不符合业务需求

3.4.1 Executors创建线程池的默认拒绝策略

Executors创建的线程池默认使用的拒绝策略是ThreadPoolExecutor.AbortPolicy,当任务队列已满且线程数达到最大线程数时,新提交的任务会被拒绝,并抛出RejectedExecutionException

在一些业务场景中,这种默认的拒绝策略可能并不合适。例如,在一个金融交易系统中,交易订单任务如果被拒绝并抛出异常,可能会导致交易失败,给用户带来损失。

3.4.2 解决方案

我们可以根据业务需求选择合适的拒绝策略。Java提供了几种内置的拒绝策略,除了AbortPolicy,还有CallerRunsPolicyDiscardPolicyDiscardOldestPolicy,也可以自定义拒绝策略。

  • CallerRunsPolicy:当任务被拒绝时,由提交任务的线程来执行该任务。这种策略可以降低新任务的提交速度。
  • DiscardPolicy:直接丢弃被拒绝的任务,不做任何处理。
  • DiscardOldestPolicy:丢弃任务队列中最老的任务,然后尝试提交新任务。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task " + r + " is rejected. Executor status: " + executor.toString());
    }
}

public class CustomRejectedPolicyExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new DelayQueue<>();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                workQueue,
                new CustomRejectedExecutionHandler()
        );
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

在上述代码中,我们自定义了一个拒绝策略CustomRejectedExecutionHandler,当任务被拒绝时,会打印出任务和线程池的相关信息。通过这种方式,我们可以根据业务需求灵活处理被拒绝的任务,而不是简单地抛出异常。

四、正确创建和使用线程池的建议

4.1 根据业务场景选择合适的线程池参数

在创建线程池时,需要根据业务场景来确定核心线程数、最大线程数、任务队列等参数。如果任务是CPU密集型的,核心线程数可以设置为CPU核心数或略多一些,以充分利用CPU资源;如果任务是I/O密集型的,由于线程在I/O操作时会阻塞,核心线程数可以设置得相对多一些,以提高系统的并发处理能力。

例如,在一个图像识别系统中,图像识别任务通常是CPU密集型的。假设系统运行的服务器有8个CPU核心,我们可以将核心线程数设置为8或9,最大线程数也可以设置为相近的值,同时选择一个合适的有界任务队列,如ArrayBlockingQueue

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CPUIntensiveThreadPool {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                8,
                9,
                10L,
                TimeUnit.SECONDS,
                workQueue
        );
        // 提交CPU密集型任务
        executorService.shutdown();
    }
}

而在一个文件上传下载系统中,任务主要是I/O密集型的。我们可以将核心线程数设置为一个相对较大的值,如20,最大线程数可以设置为30,任务队列同样选择有界队列。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class IOIntensiveThreadPool {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(20);
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                20,
                30,
                10L,
                TimeUnit.SECONDS,
                workQueue
        );
        // 提交I/O密集型任务
        executorService.shutdown();
    }
}

4.2 监控和调优线程池

为了确保线程池的性能和稳定性,需要对线程池进行监控。可以通过ThreadPoolExecutor提供的方法获取线程池的各种状态信息,如活动线程数、任务队列大小、已完成任务数等。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolMonitoring {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                3,
                5,
                10L,
                TimeUnit.SECONDS,
                workQueue
        );
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        System.out.println("Active threads: " + executorService.getActiveCount());
        System.out.println("Queue size: " + executorService.getQueue().size());
        System.out.println("Completed tasks: " + executorService.getCompletedTaskCount());
        executorService.shutdown();
    }
}

根据监控数据,我们可以对线程池的参数进行调优。如果发现任务队列经常满,可能需要增加线程数或调整任务队列的大小;如果发现线程池中有大量空闲线程,可能需要减少核心线程数。

4.3 合理处理线程池中的异常

在线程池执行任务过程中,可能会出现各种异常。对于未捕获的异常,ThreadPoolExecutor默认会将其打印到标准错误输出。然而,在实际应用中,我们可能需要更优雅地处理这些异常,例如记录异常日志、进行重试等。

可以通过实现Thread.UncaughtExceptionHandler接口来处理线程池中的未捕获异常。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExceptionHandlingInThreadPool {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadFactory threadFactory = r -> {
            Thread thread = new Thread(r);
            thread.setUncaughtExceptionHandler((t, e) -> {
                System.out.println("Uncaught exception in thread " + t.getName() + ": " + e.getMessage());
            });
            return thread;
        };
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                3,
                5,
                10L,
                TimeUnit.SECONDS,
                workQueue,
                threadFactory
        );
        executorService.submit(() -> {
            throw new RuntimeException("Simulated exception");
        });
        executorService.shutdown();
    }
}

在上述代码中,我们通过自定义ThreadFactory,为每个线程设置了UncaughtExceptionHandler,当线程执行任务抛出未捕获异常时,会打印出异常信息。这样可以帮助我们更好地定位和处理线程池中的异常情况。

4.4 正确关闭线程池

当不再需要使用线程池时,应该正确关闭线程池,以释放资源。ThreadPoolExecutor提供了shutdownshutdownNow方法来关闭线程池。

  • shutdown方法:启动一个有序关闭过程,不再接受新任务,但会继续执行已提交的任务。
  • shutdownNow方法:尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。

在大多数情况下,建议先调用shutdown方法,如果在一定时间内线程池没有正常关闭,可以再调用shutdownNow方法。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ShutdownThreadPoolExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                3,
                5,
                10L,
                TimeUnit.SECONDS,
                workQueue
        );
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在上述代码中,我们先调用shutdown方法启动关闭过程,然后通过awaitTermination方法等待线程池在60秒内完成任务。如果60秒后线程池仍未关闭,我们调用shutdownNow方法尝试强制停止线程池,并再次等待60秒。这样可以确保线程池在程序结束时能正确关闭,避免资源泄漏。

通过以上对Java项目中避免使用Executors创建线程池原因的分析,以及正确创建和使用线程池的建议,开发者可以更好地利用线程池这一强大的工具,提升Java应用程序的性能和稳定性,避免因线程池使用不当而带来的各种问题。在实际开发中,应根据具体的业务场景,精心设计和调整线程池的参数,以充分发挥线程池的优势,满足系统的并发处理需求。