MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 自定义线程池饱和策略

2021-07-057.1k 阅读

一、Java 线程池与饱和策略基础

在深入探讨 CompletableFuture 自定义线程池饱和策略之前,我们先来回顾一下 Java 线程池以及其饱和策略的基础知识。

1.1 Java 线程池概述

Java 的线程池通过 ExecutorService 接口及其实现类(如 ThreadPoolExecutor)来管理和复用线程。线程池的核心优势在于避免频繁创建和销毁线程带来的开销,提高应用程序的性能和资源利用率。例如,在一个高并发的 Web 应用中,大量的请求需要被处理,如果每次请求都创建一个新线程,系统资源很快就会耗尽,而线程池可以有效地管理这些线程,重复利用已创建的线程来处理任务。

ThreadPoolExecutor 构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:核心线程数,线程池在正常情况下保持的线程数量。即使这些线程处于空闲状态,也不会被销毁。
  • maximumPoolSize:最大线程数,线程池允许创建的最大线程数量。当任务队列已满且活跃线程数小于最大线程数时,线程池会创建新线程来处理任务。
  • keepAliveTime:线程存活时间,当线程池中的线程数量超过核心线程数时,多余的空闲线程在被销毁之前等待新任务的最长时间。
  • unitkeepAliveTime 的时间单位。
  • workQueue:任务队列,用于存储等待执行的任务。常见的任务队列实现有 ArrayBlockingQueueLinkedBlockingQueue 等。
  • threadFactory:线程工厂,用于创建新线程。可以通过自定义线程工厂来设置线程的名称、优先级等属性。
  • handler:饱和策略,当任务队列已满且线程池中的线程数达到最大线程数时,新任务到来时的处理策略。

1.2 内置饱和策略

Java 线程池提供了几种内置的饱和策略:

  • AbortPolicy:这是默认的饱和策略。当新任务无法提交时(任务队列已满且线程数达到最大线程数),会抛出 RejectedExecutionException 异常,拒绝执行该任务。
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
  • CallerRunsPolicy:当新任务无法提交时,该策略会让提交任务的线程(即调用 execute 方法的线程)直接执行该任务。这样做的好处是可以降低新任务提交的速度,同时利用调用线程的资源来处理任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
  • DiscardPolicy:当新任务无法提交时,直接丢弃该任务,不做任何处理。这种策略适用于那些对任务执行结果不敏感,且任务数量过多可能导致系统资源耗尽的场景。
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
  • DiscardOldestPolicy:当新任务无法提交时,该策略会丢弃任务队列中最老的一个任务(即队列头部的任务),然后尝试重新提交新任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

二、CompletableFuture 与线程池

2.1 CompletableFuture 简介

CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它实现了 Future 接口和 CompletionStage 接口。CompletableFuture 允许我们以一种更灵活、更易于组合的方式处理异步任务,包括链式调用、并行执行多个任务以及处理任务完成后的结果等。

例如,我们可以通过 CompletableFuture.supplyAsync 方法异步执行一个有返回值的任务:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});
future.thenAccept(System.out::println);

在上述代码中,supplyAsync 方法接收一个 Supplier 作为参数,并返回一个 CompletableFuture。任务在一个线程池中异步执行,当任务完成后,thenAccept 方法会处理任务的结果并打印输出。

2.2 CompletableFuture 与线程池的关系

CompletableFuture 在执行异步任务时,默认会使用 ForkJoinPool.commonPool() 作为线程池。ForkJoinPool 是 Java 7 引入的一种特殊的线程池,它采用工作窃取算法来提高并行处理能力。工作窃取算法允许空闲的线程从繁忙的线程队列中窃取任务来执行,从而提高系统的整体吞吐量。

然而,在某些情况下,默认的 ForkJoinPool.commonPool() 可能无法满足我们的需求。例如,在一个高并发的应用中,如果所有的异步任务都使用 ForkJoinPool.commonPool(),可能会导致线程资源竞争,影响任务的执行效率。此外,ForkJoinPool.commonPool() 的线程数量是根据 CPU 核心数动态调整的,可能无法精确控制线程池的大小。

为了更好地控制异步任务的执行,CompletableFuture 提供了一些方法来指定自定义的线程池。例如,supplyAsync(Supplier<U> supplier, Executor executor) 方法可以接收一个自定义的 Executor 作为参数,从而使用自定义线程池来执行异步任务。

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture with custom executor!";
}, executor);
future.thenAccept(System.out::println);
executor.shutdown();

在上述代码中,我们创建了一个固定大小为 10 的线程池 executor,并将其作为参数传递给 supplyAsync 方法,这样异步任务就会在这个自定义线程池中执行。

三、CompletableFuture 自定义线程池饱和策略实现

3.1 自定义饱和策略类

当我们为 CompletableFuture 使用自定义线程池时,同样需要考虑线程池的饱和策略。如果不设置饱和策略,线程池将使用默认的 AbortPolicy。下面我们通过实现 RejectedExecutionHandler 接口来自定义一个饱和策略。

假设我们希望在任务被拒绝时,将任务信息记录到日志中,并尝试在一段时间后重新提交任务。我们可以创建如下自定义饱和策略类:

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    private static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class.getName());

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        LOGGER.log(Level.WARNING, "Task {0} rejected from {1}", new Object[]{r, e});
        try {
            // 等待一段时间后尝试重新提交任务
            TimeUnit.SECONDS.sleep(5);
            e.submit(r);
        } catch (InterruptedException | RejectedExecutionException ex) {
            LOGGER.log(Level.SEVERE, "Failed to resubmit task", ex);
        }
    }
}

在上述代码中,CustomRejectedExecutionHandler 类实现了 RejectedExecutionHandler 接口的 rejectedExecution 方法。当任务被拒绝时,首先记录任务被拒绝的日志信息,然后线程等待 5 秒,之后尝试重新提交任务。如果重新提交任务时再次出现异常(如线程池已关闭),则记录严重错误日志。

3.2 使用自定义线程池和饱和策略

接下来,我们将自定义的饱和策略应用到 CompletableFuture 使用的自定义线程池中。

import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CompletableFutureCustomThreadPoolExample {
    private static final Logger LOGGER = Logger.getLogger(CompletableFutureCustomThreadPoolExample.class.getName());

    public static void main(String[] args) {
        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                10,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(5),
                Executors.defaultThreadFactory(),
                new CustomRejectedExecutionHandler()
        );

        for (int i = 0; i < 20; i++) {
            int taskNumber = i;
            CompletableFuture.supplyAsync(() -> {
                LOGGER.log(Level.INFO, "Task {0} is running", taskNumber);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Task " + taskNumber + " completed";
            }, executor)
           .thenAccept(result -> LOGGER.log(Level.INFO, result));
        }

        // 关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    LOGGER.log(Level.SEVERE, "Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在上述代码中,我们创建了一个 ThreadPoolExecutor 线程池,核心线程数为 5,最大线程数为 10,任务队列容量为 5,并使用了我们自定义的 CustomRejectedExecutionHandler 饱和策略。然后,通过循环提交 20 个异步任务,每个任务模拟耗时 2 秒的操作。当任务提交数量超过线程池和任务队列的承载能力时,自定义的饱和策略将被触发。

最后,在程序结束时,我们通过调用 executor.shutdown() 方法来启动线程池的关闭过程,并使用 awaitTermination 方法等待所有任务执行完毕或超时。如果超时仍有任务未完成,则调用 executor.shutdownNow() 尝试中断正在执行的任务。

四、实际应用场景分析

4.1 高并发 Web 应用

在高并发的 Web 应用中,大量的请求需要被异步处理。例如,一个电商网站在促销活动期间,可能会有大量的订单处理请求、库存更新请求等。如果使用默认的 ForkJoinPool.commonPool(),可能会导致线程资源竞争,影响用户体验。通过为 CompletableFuture 使用自定义线程池,并设置合适的饱和策略,可以更好地控制任务的执行。

假设我们的电商应用有一个订单处理模块,当用户下单后,需要异步处理订单,包括扣除库存、更新订单状态等操作。我们可以创建一个专门的线程池来处理订单相关的异步任务,并设置自定义饱和策略。如果订单处理任务被拒绝,我们可以记录日志并尝试重新提交,以确保订单的正常处理。

4.2 大数据处理

在大数据处理场景中,常常需要对大量的数据进行并行计算。例如,在一个数据分析系统中,需要对海量的用户行为数据进行清洗、统计和分析。使用 CompletableFuture 结合自定义线程池可以有效地提高数据处理的效率。

假设我们有一个任务是对用户行为数据进行实时分析,计算每个用户的活跃度。我们可以将数据分割成多个小块,使用 CompletableFuture 并行处理这些小块数据。由于数据量可能非常大,线程池可能会面临饱和的情况。此时,我们可以设置一个合适的饱和策略,如 DiscardOldestPolicy,丢弃较早的数据块处理任务,以保证最新的数据能够及时得到处理。

4.3 分布式系统

在分布式系统中,各个节点之间需要进行异步通信和任务协作。例如,一个微服务架构的系统中,不同的微服务之间可能需要异步调用对方的接口来完成业务逻辑。使用 CompletableFuture 可以方便地处理这些异步调用。

假设我们有一个订单微服务和一个支付微服务,订单微服务在接收到订单后,需要异步调用支付微服务进行支付处理。我们可以使用 CompletableFuture 来处理这个异步调用,并为其使用自定义线程池。在分布式系统中,网络波动等原因可能导致任务处理时间变长,线程池更容易饱和。通过设置合适的饱和策略,如 CallerRunsPolicy,可以在一定程度上缓解系统压力,确保关键任务能够得到执行。

五、性能优化与注意事项

5.1 线程池参数调优

在使用自定义线程池时,合理调整线程池的参数对于性能优化至关重要。

  • 核心线程数:应根据任务的类型和系统的资源情况来确定。如果任务是 CPU 密集型的,核心线程数可以设置为 CPU 核心数;如果是 I/O 密集型的,核心线程数可以适当增大,以充分利用 CPU 资源等待 I/O 操作完成。
  • 最大线程数:不宜设置过大,否则可能导致系统资源耗尽。一般来说,可以根据系统的内存、CPU 等资源情况来估算最大线程数。例如,在一个内存有限的服务器上,如果每个线程占用一定的内存空间,过多的线程可能会导致内存溢出。
  • 任务队列:选择合适的任务队列类型和容量也很重要。对于有界队列(如 ArrayBlockingQueue),需要根据任务的流量和处理速度来设置合适的容量,避免队列溢出。而无界队列(如 LinkedBlockingQueue)虽然不会溢出,但可能会导致任务堆积,占用大量内存。

5.2 饱和策略选择

不同的饱和策略适用于不同的场景,需要根据业务需求来选择。

  • AbortPolicy:适用于对任务执行失败敏感的场景,如金融交易系统,任何一笔交易任务的失败都可能导致严重后果,此时抛出异常可以及时发现问题。
  • CallerRunsPolicy:适用于希望降低新任务提交速度的场景,同时可以利用调用线程的资源来处理任务。例如,在一个单机应用中,当线程池饱和时,让调用线程执行任务可以避免任务丢失。
  • DiscardPolicy:适用于对任务执行结果不敏感,且任务数量过多可能导致系统资源耗尽的场景。例如,在一些日志记录任务中,如果日志记录任务过多导致线程池饱和,丢弃部分任务可能不会对系统造成严重影响。
  • DiscardOldestPolicy:适用于希望优先处理新任务的场景。例如,在实时数据分析系统中,新的数据可能更有价值,丢弃较老的数据处理任务可以保证系统对最新数据的响应速度。

5.3 异常处理

在使用 CompletableFuture 和自定义线程池时,需要注意异常处理。当任务在异步执行过程中抛出异常时,如果不进行处理,可能会导致程序出现难以排查的问题。CompletableFuture 提供了 exceptionally 方法来处理异常。

CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("Simulated exception");
    }
    return "Task completed successfully";
}, executor)
.exceptionally(ex -> {
    LOGGER.log(Level.SEVERE, "Task failed", ex);
    return "Task failed: " + ex.getMessage();
})
.thenAccept(System.out::println);

在上述代码中,exceptionally 方法接收一个 Function 作为参数,当任务抛出异常时,该 Function 会被执行,我们可以在其中记录异常信息并返回一个默认结果。

此外,在线程池饱和策略中,如果重新提交任务时抛出异常,也需要进行适当的处理,如我们在 CustomRejectedExecutionHandler 类中记录了重新提交任务失败的异常日志。

5.4 资源管理

在使用自定义线程池时,要注意资源的管理和释放。在程序结束时,一定要正确关闭线程池,避免线程泄漏。如我们在前面的示例中,通过调用 executor.shutdown()awaitTermination 方法来确保线程池的正确关闭。

同时,对于自定义的线程工厂,如果创建了一些需要释放的资源(如文件句柄、数据库连接等),也需要在适当的时候进行释放。例如,可以在自定义线程工厂的 newThread 方法中创建资源,在 Threadrun 方法结束时释放资源。

六、总结与展望

通过深入了解 Java 线程池的饱和策略以及 CompletableFuture 与线程池的关系,我们能够为 CompletableFuture 定制合适的线程池饱和策略,以满足不同应用场景的需求。在实际开发中,合理地选择和配置线程池参数、饱和策略,并做好异常处理和资源管理,能够显著提高应用程序的性能和稳定性。

随着技术的不断发展,异步编程和并发处理在软件开发中的重要性日益凸显。未来,我们可能会看到更多针对不同场景优化的线程池实现和饱和策略,以及更便捷的异步编程工具和框架。作为开发者,我们需要不断学习和掌握这些新技术,以提升我们的编程能力和解决复杂问题的能力。

希望本文对您理解和应用 Java 中 CompletableFuture 自定义线程池饱和策略有所帮助。在实际项目中,根据具体业务需求灵活运用这些知识,能够打造出更高效、更稳定的应用程序。