MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 自定义线程池 runAsync 方法

2021-10-053.1k 阅读

CompletableFuture 的概述

在 Java 编程领域,随着多核处理器的广泛应用以及对异步编程需求的不断增长,高效的异步任务处理成为了关键。CompletableFuture 作为 Java 8 引入的一个强大工具,极大地简化了异步编程。它不仅实现了 Future 接口,还提供了大量用于异步任务编排和组合的方法,使得处理复杂的异步操作变得更加直观和便捷。

CompletableFuture 代表一个异步计算的结果,它可以在计算完成后被获取,并且支持链式调用和组合操作。与传统的 Future 相比,CompletableFuture 具有更高的灵活性,能够处理异步任务的完成、失败和取消等各种情况,同时还支持异步任务之间的依赖关系和结果传递。

runAsync 方法的基本概念

CompletableFuture 类提供了两个 runAsync 方法,用于异步执行一个无返回值的任务。这两个方法的定义如下:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

第一个方法 runAsync(Runnable runnable) 使用 ForkJoinPool.commonPool() 作为默认的线程池来异步执行任务。ForkJoinPool.commonPool() 是一个共享的通用线程池,适用于大多数计算密集型任务。然而,在某些特定场景下,比如 I/O 密集型任务或者对线程池有特殊配置需求时,使用默认线程池可能无法满足要求。

第二个方法 runAsync(Runnable runnable, Executor executor) 则允许我们传入一个自定义的 Executor,从而使用我们自己配置的线程池来执行异步任务。这种灵活性使得我们能够根据应用程序的具体需求来优化线程资源的使用。

使用默认线程池的 runAsync 方法

下面通过一个简单的示例来展示使用默认线程池的 runAsync 方法的用法:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureRunAsyncDefaultExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务在默认线程池中执行完成");
        });

        try {
            future.get();
            System.out.println("主线程获取到任务完成的结果");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,我们调用 CompletableFuture.runAsync(() -> {...}) 方法,传入一个 Runnable 任务。该任务在默认的 ForkJoinPool.commonPool() 线程池中异步执行。主线程通过调用 future.get() 方法等待异步任务的完成,然后打印相应的信息。

自定义线程池的必要性

虽然 ForkJoinPool.commonPool() 适用于许多场景,但在以下情况下,自定义线程池是非常必要的:

  1. 资源隔离:不同类型的任务可能对线程资源有不同的需求。例如,一些 I/O 密集型任务可能会占用大量的线程资源,影响其他计算密集型任务的执行。通过自定义线程池,可以将不同类型的任务分配到不同的线程池中,实现资源隔离。

  2. 线程池参数优化:默认线程池的参数可能无法满足特定应用程序的需求。例如,我们可能需要调整线程池的核心线程数、最大线程数、队列容量等参数,以优化系统性能。

  3. 线程上下文管理:在某些情况下,我们可能需要在每个线程执行任务时设置特定的上下文信息,如日志记录、安全认证等。通过自定义线程池,可以更好地管理线程上下文。

创建自定义线程池

在 Java 中,我们可以使用 ThreadPoolExecutor 类来创建自定义线程池。ThreadPoolExecutor 提供了丰富的构造函数,允许我们设置线程池的各种参数。下面是一个创建自定义线程池的示例:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个阻塞队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);

        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // 核心线程数
                4, // 最大线程数
                10, // 线程存活时间
                TimeUnit.SECONDS,
                workQueue);

        // 使用自定义线程池执行任务
        executor.submit(() -> {
            System.out.println("任务在自定义线程池中执行");
        });

        // 关闭线程池
        executor.shutdown();
    }
}

在上述代码中,我们创建了一个 LinkedBlockingQueue 作为工作队列,并使用 ThreadPoolExecutor 构造函数设置了核心线程数为 2,最大线程数为 4,线程存活时间为 10 秒。然后,我们使用该线程池提交了一个任务,并在最后调用 executor.shutdown() 方法关闭线程池。

使用自定义线程池的 runAsync 方法

接下来,我们将自定义线程池与 CompletableFuture.runAsync 方法结合使用。以下是一个示例代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CompletableFutureRunAsyncCustomExecutorExample {
    public static void main(String[] args) {
        // 创建一个阻塞队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);

        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // 核心线程数
                4, // 最大线程数
                10, // 线程存活时间
                TimeUnit.SECONDS,
                workQueue);

        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务在自定义线程池中执行完成");
        }, executor);

        try {
            future.get();
            System.out.println("主线程获取到任务完成的结果");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        // 关闭线程池
        executor.shutdown();
    }
}

在这个示例中,我们首先创建了一个自定义线程池 executor。然后,通过调用 CompletableFuture.runAsync(Runnable runnable, Executor executor) 方法,将任务提交到自定义线程池中执行。主线程通过 future.get() 方法等待任务完成,并在最后关闭自定义线程池。

自定义线程池参数对 runAsync 方法的影响

  1. 核心线程数:核心线程数决定了线程池在正常情况下保持活动的线程数量。如果任务数量小于核心线程数,新任务会立即分配到空闲的核心线程执行。例如,在上面的示例中,如果我们将核心线程数设置为 1,当有多个任务同时提交时,只有一个任务会立即执行,其他任务需要等待核心线程空闲。

  2. 最大线程数:最大线程数限制了线程池能够创建的最大线程数量。当任务队列已满且所有核心线程都在忙碌时,线程池会创建新的线程,直到达到最大线程数。如果任务数量超过了最大线程数和队列容量之和,新任务将被拒绝。例如,如果我们将最大线程数设置为 2,而队列容量为 10,当有 13 个任务提交时,前 2 个任务会立即分配到核心线程执行,接下来 10 个任务会进入队列,最后 1 个任务会因为线程池已满而被拒绝。

  3. 线程存活时间:线程存活时间定义了非核心线程在空闲状态下能够存活的最长时间。当线程池中的线程数量超过核心线程数,并且有线程空闲时间达到线程存活时间时,这些非核心线程会被销毁。例如,如果我们将线程存活时间设置为 5 秒,当一个非核心线程空闲 5 秒后,它会被销毁,以释放资源。

  4. 工作队列:工作队列用于存储等待执行的任务。不同类型的工作队列具有不同的特性,如 LinkedBlockingQueue 是一个无界队列,理论上可以存储无限数量的任务,而 ArrayBlockingQueue 是一个有界队列,需要在创建时指定容量。选择合适的工作队列对于线程池的性能和稳定性至关重要。例如,使用无界队列可能会导致内存耗尽的问题,而使用有界队列可以避免这种情况,但可能会导致任务被拒绝的情况增多。

异常处理

在使用 CompletableFuture.runAsync 方法时,异步任务可能会抛出异常。CompletableFuture 提供了几种处理异常的方式。

  1. 使用 exceptionally 方法exceptionally 方法用于在异步任务抛出异常时提供一个默认值或执行一个备用逻辑。以下是一个示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExceptionHandlingExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟一个抛出异常的任务
            throw new RuntimeException("任务执行过程中发生异常");
        });

        CompletableFuture<Void> result = future.exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return null;
        });

        try {
            result.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,exceptionally 方法捕获到异步任务抛出的异常,并打印异常信息,然后返回一个 null 值。

  1. 使用 whenComplete 方法whenComplete 方法用于在异步任务完成(无论是正常完成还是异常完成)时执行一个回调函数。以下是一个示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureWhenCompleteExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟一个抛出异常的任务
            throw new RuntimeException("任务执行过程中发生异常");
        });

        future.whenComplete((v, ex) -> {
            if (ex != null) {
                System.out.println("捕获到异常: " + ex.getMessage());
            } else {
                System.out.println("任务正常完成");
            }
        });

        try {
            future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,whenComplete 方法通过检查 ex 参数是否为 null 来判断任务是否正常完成。如果 ex 不为 null,则表示任务抛出了异常,我们可以在回调函数中处理异常。

线程池的监控与调优

  1. 线程池监控:为了确保线程池的性能和稳定性,我们需要对线程池进行监控。ThreadPoolExecutor 类提供了一些方法来获取线程池的运行状态信息,例如:
    • getPoolSize():返回当前线程池中的线程数量。
    • getActiveCount():返回当前正在执行任务的线程数量。
    • getQueue().size():返回工作队列中的任务数量。
    • getCompletedTaskCount():返回线程池已经完成的任务数量。

我们可以定期获取这些信息,并通过日志记录或者监控工具进行分析,以便及时发现线程池的性能问题。

  1. 线程池调优:根据监控数据,我们可以对线程池的参数进行调优。例如,如果发现工作队列经常满,可能需要增加队列容量或者调整核心线程数和最大线程数,以提高线程池的处理能力。另外,如果发现线程池中的线程经常处于空闲状态,可能需要减少核心线程数,以避免资源浪费。

在进行线程池调优时,需要综合考虑应用程序的业务特点、硬件资源等因素,通过不断的测试和调整,找到最优的线程池配置。

与其他异步编程模型的比较

  1. 与传统 Future 的比较:传统的 Future 接口只能获取异步任务的结果,并且在获取结果时需要阻塞主线程。而 CompletableFuture 不仅可以获取结果,还支持异步任务的链式调用、组合操作以及异常处理等功能,使得异步编程更加灵活和便捷。

  2. 与 RxJava 的比较:RxJava 是一个基于观察者模式的异步编程框架,提供了丰富的操作符来处理异步数据流。CompletableFuture 则更侧重于异步任务的执行和结果处理。虽然两者都可以实现异步编程,但使用场景和编程模型有所不同。CompletableFuture 更适合简单的异步任务处理,而 RxJava 则在处理复杂的异步数据流和事件驱动场景下具有更大的优势。

应用场景

  1. 并行计算:在进行并行计算任务时,可以使用 CompletableFuture.runAsync 方法将不同的计算任务分配到不同的线程中执行,从而充分利用多核处理器的性能,提高计算效率。

  2. I/O 操作:对于 I/O 密集型任务,如文件读取、网络请求等,使用自定义线程池的 runAsync 方法可以避免阻塞主线程,提高应用程序的响应性。

  3. 任务编排CompletableFuture 的链式调用和组合操作使得我们可以方便地编排多个异步任务的执行顺序,实现复杂的业务逻辑。例如,在一个电商系统中,我们可以先异步查询商品信息,然后异步计算商品价格,最后异步生成订单,通过 CompletableFuture 可以很方便地将这些任务组合起来。

总结

CompletableFuturerunAsync 方法为我们提供了一种简单而强大的异步任务执行方式。通过使用自定义线程池,我们可以根据应用程序的需求优化线程资源的使用,提高系统性能。在实际应用中,我们需要充分理解线程池的参数设置、异常处理以及监控调优等方面的知识,以确保异步任务的高效、稳定执行。同时,与其他异步编程模型的比较也有助于我们根据具体场景选择最合适的技术方案。希望通过本文的介绍,读者能够对 JavaCompletableFuturerunAsync 方法以及自定义线程池的使用有更深入的理解和掌握。