MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java ThreadPoolExecutor 的参数调优

2021-02-212.8k 阅读

Java ThreadPoolExecutor 的参数调优

在Java多线程编程中,ThreadPoolExecutor是一个功能强大且灵活的线程池实现类。合理地调优ThreadPoolExecutor的参数,对于提升应用程序的性能、资源利用率以及稳定性至关重要。接下来我们将深入探讨如何对其参数进行调优。

ThreadPoolExecutor 的构造函数与参数

ThreadPoolExecutor有多个构造函数,最常用的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

这里涉及到7个重要参数:

  1. corePoolSize:核心线程数,线程池中会一直存活的线程数量,即使这些线程处于空闲状态。当提交新任务时,如果当前线程数小于corePoolSize,则会创建新的线程来处理任务,即使线程池中有空闲线程。
  2. maximumPoolSize:线程池允许的最大线程数。当任务队列已满,并且当前线程数小于maximumPoolSize时,线程池会创建新的线程来处理任务。
  3. keepAliveTime:当线程数大于corePoolSize时,多余的空闲线程等待新任务的最长时间,超过这个时间,多余的线程将被终止。
  4. unitkeepAliveTime的时间单位,如TimeUnit.SECONDSTimeUnit.MILLISECONDS等。
  5. workQueue:任务队列,用于存放等待执行的任务。当线程池中的线程都在忙碌,且当前线程数达到corePoolSize时,新提交的任务将被放入任务队列中。常见的任务队列实现有ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue等。
  6. threadFactory:线程工厂,用于创建新线程。通过自定义线程工厂,可以设置线程的名称、优先级等属性。
  7. handler:拒绝策略,当任务队列已满且线程数达到maximumPoolSize时,新提交的任务将被拒绝,由拒绝策略来处理这些被拒绝的任务。常见的拒绝策略有ThreadPoolExecutor.AbortPolicy(默认策略,直接抛出异常)、ThreadPoolExecutor.CallerRunsPolicy(由调用者线程来执行任务)、ThreadPoolExecutor.DiscardPolicy(直接丢弃任务)、ThreadPoolExecutor.DiscardOldestPolicy(丢弃队列中最老的任务,然后尝试提交新任务)。

核心线程数 corePoolSize 的调优

  1. CPU 密集型任务 对于CPU密集型任务,线程主要在执行计算,很少有I/O操作等阻塞情况。一般来说,corePoolSize可以设置为CPU核心数加1。例如,在一个4核的CPU上,corePoolSize可以设置为5。这样在某个线程因为页缺失等原因阻塞时,还有一个额外的线程可以利用CPU资源。

示例代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class CpuIntensiveTask {
    public static void main(String[] args) {
        int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                corePoolSize,
                0L,
                java.util.concurrent.TimeUnit.MILLISECONDS,
                new java.util.concurrent.LinkedBlockingQueue<>());

        for (int i = 0; i < 10; i++) {
            executorService.submit(() -> {
                // 模拟CPU密集型任务
                long result = 0;
                for (int j = 0; j < 1000000000; j++) {
                    result += j;
                }
                System.out.println(Thread.currentThread().getName() + " 计算结果: " + result);
            });
        }

        executorService.shutdown();
    }
}
  1. I/O 密集型任务 I/O密集型任务中,线程大部分时间在等待I/O操作完成,如读取文件、网络请求等。这种情况下,corePoolSize可以设置为CPU核心数的2倍甚至更多。因为在I/O等待期间,CPU处于空闲状态,可以让更多的线程利用这些空闲时间。

示例代码:

import java.io.File;
import java.io.FileReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class IoIntensiveTask {
    public static void main(String[] args) {
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                corePoolSize,
                0L,
                java.util.concurrent.TimeUnit.MILLISECONDS,
                new java.util.concurrent.LinkedBlockingQueue<>());

        File file = new File("example.txt");
        for (int i = 0; i < 10; i++) {
            executorService.submit(() -> {
                try (FileReader fileReader = new FileReader(file)) {
                    int data;
                    while ((data = fileReader.read()) != -1) {
                        // 模拟I/O操作
                    }
                    System.out.println(Thread.currentThread().getName() + " 完成文件读取");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        executorService.shutdown();
    }
}
  1. 混合型任务 如果任务中既有CPU密集型部分,又有I/O密集型部分,需要根据实际情况进行测试和调整。可以先按照I/O密集型任务的方式设置corePoolSize,然后逐步调整,观察系统的性能指标,如吞吐量、响应时间等,找到一个最优值。

最大线程数 maximumPoolSize 的调优

  1. 结合任务队列分析 maximumPoolSize与任务队列的容量密切相关。如果任务队列容量较大,并且可以长时间容纳任务,那么maximumPoolSize可以适当减小。例如,使用LinkedBlockingQueue(无界队列)时,maximumPoolSize的设置可能就不是那么关键,因为任务可以无限期地在队列中等待。但如果使用ArrayBlockingQueue(有界队列),当队列满了之后,就需要考虑创建更多线程来处理任务,此时maximumPoolSize的设置就很重要。

假设我们有一个任务队列ArrayBlockingQueue,容量为100,且任务处理时间较长。如果corePoolSize设置为10,当有大量任务涌入,队列很快就会满。这时候就需要合理设置maximumPoolSize,以避免任务被拒绝。

示例代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MaximumPoolSizeExample {
    public static void main(String[] args) {
        int corePoolSize = 10;
        int maximumPoolSize = 50;
        int queueCapacity = 100;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                10L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(queueCapacity));

        for (int i = 0; i < 200; i++) {
            executorService.submit(() -> {
                // 模拟长时间运行任务
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 任务完成");
            });
        }

        executorService.shutdown();
    }
}
  1. 系统资源限制 在设置maximumPoolSize时,还需要考虑系统的资源限制,如内存、CPU使用率等。创建过多的线程会消耗大量的系统资源,可能导致系统性能下降甚至崩溃。可以通过监控系统的内存使用情况、CPU负载等指标,来确定一个合理的maximumPoolSize

线程存活时间 keepAliveTime 与时间单位 unit 的调优

  1. 任务提交频率 如果任务提交频率较高,且任务执行时间较短,那么可以适当减小keepAliveTime。因为频繁创建和销毁线程会带来额外的开销,而保持一定数量的空闲线程可以提高任务处理的效率。例如,在一个高并发的Web应用中,不断有短时间的请求任务,keepAliveTime可以设置为1 - 5秒。

示例代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class KeepAliveTimeExample {
    public static void main(String[] args) {
        int corePoolSize = 10;
        int maximumPoolSize = 20;
        long keepAliveTime = 2;
        TimeUnit unit = TimeUnit.SECONDS;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new java.util.concurrent.LinkedBlockingQueue<>());

        for (int i = 0; i < 100; i++) {
            executorService.submit(() -> {
                // 模拟短时间任务
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 任务完成");
            });
        }

        executorService.shutdown();
    }
}
  1. 资源回收需求 如果系统对资源回收比较敏感,希望在任务处理完后尽快释放多余的线程资源,那么可以适当增大keepAliveTime。例如,在一个内存有限的嵌入式系统中,长时间保持大量空闲线程会占用宝贵的内存资源,此时可以将keepAliveTime设置得较长,以便在任务处理完后及时回收线程。

任务队列 workQueue 的选择与调优

  1. ArrayBlockingQueue ArrayBlockingQueue是一个有界队列,它的容量在创建时就确定。这种队列适合在任务数量可预测,且希望对任务队列大小进行严格控制的场景。例如,在一个订单处理系统中,每个订单的处理任务可以放入ArrayBlockingQueue,如果队列满了,说明订单处理速度跟不上订单生成速度,此时可以根据拒绝策略进行处理。

示例代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueExample {
    public static void main(String[] args) {
        int corePoolSize = 5;
        int maximumPoolSize = 10;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        int queueCapacity = 20;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new ArrayBlockingQueue<>(queueCapacity));

        for (int i = 0; i < 30; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 处理任务");
            });
        }

        executorService.shutdown();
    }
}
  1. LinkedBlockingQueue LinkedBlockingQueue可以是有界的,也可以是无界的(默认是无界的)。无界队列在任务量较大时,不会因为队列满而拒绝任务,但可能会导致内存耗尽。有界队列则可以避免这种情况。在一些对任务处理能力要求较高,且任务量不会无限增长的场景中,可以使用有界的LinkedBlockingQueue。例如,在一个日志处理系统中,日志记录任务可以放入有界的LinkedBlockingQueue,当队列满时,可以根据拒绝策略丢弃较新的日志记录。

示例代码(有界LinkedBlockingQueue):

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class LinkedBlockingQueueExample {
    public static void main(String[] args) {
        int corePoolSize = 3;
        int maximumPoolSize = 6;
        long keepAliveTime = 5;
        TimeUnit unit = TimeUnit.SECONDS;
        int queueCapacity = 15;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new LinkedBlockingQueue<>(queueCapacity));

        for (int i = 0; i < 20; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 处理日志任务");
            });
        }

        executorService.shutdown();
    }
}
  1. SynchronousQueue SynchronousQueue没有容量,它不存储任务,而是直接将任务传递给线程处理。如果没有空闲线程,新提交的任务会等待,直到有线程准备好接收任务。这种队列适用于任务处理速度较快,且希望尽快处理任务,避免任务在队列中积压的场景。例如,在一个实时数据处理系统中,每个数据处理任务可以通过SynchronousQueue直接交给线程处理。

示例代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueExample {
    public static void main(String[] args) {
        int corePoolSize = 4;
        int maximumPoolSize = 8;
        long keepAliveTime = 5;
        TimeUnit unit = TimeUnit.SECONDS;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new SynchronousQueue<>());

        for (int i = 0; i < 10; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 处理实时数据任务");
            });
        }

        executorService.shutdown();
    }
}

线程工厂 threadFactory 的定制

通过自定义线程工厂,可以为线程设置有意义的名称、优先级等属性,方便在调试和监控时进行区分。

示例代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadFactoryExample {
    public static void main(String[] args) {
        int corePoolSize = 3;
        int maximumPoolSize = 6;
        long keepAliveTime = 5;
        TimeUnit unit = TimeUnit.SECONDS;

        ThreadFactory threadFactory = new ThreadFactory() {
            private int counter = 1;

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("CustomThread-" + counter++);
                thread.setPriority(Thread.NORM_PRIORITY);
                return thread;
            }
        };

        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new java.util.concurrent.LinkedBlockingQueue<>(),
                threadFactory);

        for (int i = 0; i < 5; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 执行任务");
            });
        }

        executorService.shutdown();
    }
}

拒绝策略 handler 的选择与定制

  1. AbortPolicy AbortPolicy是默认的拒绝策略,当任务被拒绝时,它会直接抛出RejectedExecutionException。这种策略适用于对任务丢失非常敏感的场景,例如在金融交易系统中,任何一个交易任务都不能被丢弃,否则可能导致数据不一致等问题。

示例代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class AbortPolicyExample {
    public static void main(String[] args) {
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 5;
        TimeUnit unit = TimeUnit.SECONDS;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new java.util.concurrent.LinkedBlockingQueue<>(2),
                new ThreadPoolExecutor.AbortPolicy());

        try {
            for (int i = 0; i < 6; i++) {
                executorService.submit(() -> {
                    System.out.println(Thread.currentThread().getName() + " 处理任务");
                });
            }
        } catch (RejectedExecutionException e) {
            System.err.println("任务被拒绝: " + e.getMessage());
        }

        executorService.shutdown();
    }
}
  1. CallerRunsPolicy CallerRunsPolicy会将被拒绝的任务交给调用者线程来执行。这种策略可以减轻线程池的压力,但可能会影响调用者线程的正常工作。例如,在一个Web应用中,如果使用CallerRunsPolicy,当线程池满时,新的HTTP请求任务会在Web服务器的主线程中执行,可能会导致服务器响应变慢。

示例代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CallerRunsPolicyExample {
    public static void main(String[] args) {
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 5;
        TimeUnit unit = TimeUnit.SECONDS;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new java.util.concurrent.LinkedBlockingQueue<>(2),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 6; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 处理任务");
            });
        }

        executorService.shutdown();
    }
}
  1. DiscardPolicy DiscardPolicy直接丢弃被拒绝的任务,不做任何处理。这种策略适用于对任务丢失不敏感,且任务处理不重要的场景,例如一些统计类的任务,如果因为系统繁忙而丢失一些统计数据,对整体业务影响不大。

示例代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DiscardPolicyExample {
    public static void main(String[] args) {
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 5;
        TimeUnit unit = TimeUnit.SECONDS;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new java.util.concurrent.LinkedBlockingQueue<>(2),
                new ThreadPoolExecutor.DiscardPolicy());

        for (int i = 0; i < 6; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 处理任务");
            });
        }

        executorService.shutdown();
    }
}
  1. DiscardOldestPolicy DiscardOldestPolicy会丢弃任务队列中最老的任务,然后尝试提交新任务。这种策略适用于希望优先处理新任务的场景,例如在一个实时消息处理系统中,新的消息可能比旧的消息更重要,当线程池和任务队列满时,丢弃旧消息,处理新消息。

示例代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DiscardOldestPolicyExample {
    public static void main(String[] args) {
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 5;
        TimeUnit unit = TimeUnit.SECONDS;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new java.util.concurrent.LinkedBlockingQueue<>(2),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        for (int i = 0; i < 6; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 处理任务");
            });
        }

        executorService.shutdown();
    }
}
  1. 自定义拒绝策略 在某些特殊场景下,上述内置的拒绝策略可能都不满足需求,此时可以自定义拒绝策略。例如,在一个分布式系统中,当任务被拒绝时,可能希望将任务发送到其他节点进行处理。

示例代码:

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 自定义处理逻辑,例如将任务发送到其他节点
        System.err.println("任务被拒绝,尝试发送到其他节点: " + r.toString());
    }
}

使用自定义拒绝策略的示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomRejectedExecutionHandlerExample {
    public static void main(String[] args) {
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 5;
        TimeUnit unit = TimeUnit.SECONDS;
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new java.util.concurrent.LinkedBlockingQueue<>(2),
                new CustomRejectedExecutionHandler());

        for (int i = 0; i < 6; i++) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 处理任务");
            });
        }

        executorService.shutdown();
    }
}

通过对ThreadPoolExecutor各个参数的深入理解和合理调优,可以使应用程序在多线程环境下更高效、稳定地运行。在实际应用中,需要结合具体的业务场景、系统资源情况以及性能指标要求,不断测试和调整参数,以达到最佳的效果。