MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步任务执行环境配置

2021-04-253.7k 阅读

Java 中 CompletableFuture 异步任务执行环境配置

CompletableFuture 基础概述

在 Java 并发编程领域,CompletableFuture 是一个强大的工具,它允许我们以异步方式执行任务,并处理任务完成后的结果。CompletableFuture 实现了 Future 接口和 CompletionStage 接口,这使得它既能够获取异步任务的结果(类似传统 Future),又能以链式调用的方式处理异步任务的结果(这是 CompletionStage 带来的特性)。

简单来说,CompletableFuture 代表一个异步计算的结果,它在计算完成时可以被获取。它提供了一系列方法,让我们可以在任务完成时执行回调,对结果进行转换、组合等操作。例如:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟异步任务
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

future.thenAccept(System.out::println).join();

在上述代码中,supplyAsync 方法以异步方式执行一个返回结果的任务,thenAccept 方法在任务完成后接收任务的结果并进行处理(这里只是简单打印),join 方法用于阻塞当前线程,直到 CompletableFuture 完成。

异步任务执行环境基础

在深入探讨 CompletableFuture 的异步任务执行环境配置之前,我们需要先了解一些基础概念。

线程池

线程池是一种管理和复用线程的机制。在 Java 中,java.util.concurrent.ExecutorService 接口及其实现类(如 ThreadPoolExecutor)提供了线程池的功能。线程池中的线程可以被复用,避免了频繁创建和销毁线程带来的开销,提高了系统的性能和资源利用率。

例如,创建一个固定大小的线程池:

ExecutorService executorService = Executors.newFixedThreadPool(10);

这里创建了一个包含 10 个线程的固定大小线程池。

执行环境

执行环境是指异步任务实际执行的上下文,通常与线程池相关。当我们提交一个异步任务时,任务会在特定的执行环境中被线程池中的线程执行。对于 CompletableFuture,默认情况下,它使用 ForkJoinPool.commonPool() 作为执行环境。

CompletableFuture 默认执行环境

CompletableFuture 的很多静态方法,如 supplyAsyncrunAsync 等,如果不指定执行器(Executor),会使用 ForkJoinPool.commonPool() 作为默认的执行环境。

ForkJoinPool.commonPool() 是一个共享的 ForkJoinPool 实例,它的线程数量通常根据 CPU 核心数来动态调整。例如,在多核 CPU 系统中,它会尝试充分利用多核资源来并行执行任务。

下面通过一个示例来展示默认执行环境的使用:

CompletableFuture.supplyAsync(() -> {
    System.out.println("Running in default executor: " + Thread.currentThread().getName());
    return "Default Executor Result";
}).thenAccept(System.out::println);

在上述代码中,supplyAsync 方法没有指定执行器,所以任务会在 ForkJoinPool.commonPool() 的线程中执行。运行代码时,我们可以看到输出的线程名是 ForkJoinPool.commonPool-worker-* 格式,表明任务在默认的 ForkJoinPool 中执行。

自定义执行环境的必要性

虽然 ForkJoinPool.commonPool() 提供了一种方便的默认执行环境,但在实际应用中,我们可能需要自定义执行环境,原因如下:

资源隔离

不同类型的任务可能对资源有不同的需求。例如,一些 I/O 密集型任务可能会占用大量的线程资源,如果与 CPU 密集型任务共用同一个线程池,可能会导致 CPU 密集型任务得不到足够的资源而性能下降。通过自定义执行环境,我们可以为不同类型的任务创建独立的线程池,实现资源隔离。

线程池参数优化

默认的 ForkJoinPool.commonPool() 有其自身的线程数量和调度策略等参数。但在某些场景下,我们可能需要根据具体业务需求调整这些参数。比如,对于一些实时性要求较高的任务,我们可能需要增加线程池的线程数量,以减少任务的等待时间。

更好的控制和监控

自定义执行环境可以让我们更好地控制和监控异步任务的执行。我们可以实现自己的线程池监控逻辑,了解任务的执行情况、线程池的负载等信息,以便及时发现和解决潜在的性能问题。

自定义执行环境的方式

使用 ExecutorService 实例

我们可以创建一个 ExecutorService 实例,并将其作为参数传递给 CompletableFuture 的相关方法。例如,使用 ThreadPoolExecutor 创建一个自定义线程池:

ExecutorService executorService = new ThreadPoolExecutor(
    5, // 核心线程数
    10, // 最大线程数
    10L, // 线程存活时间
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100) // 任务队列
);

CompletableFuture.supplyAsync(() -> {
    System.out.println("Running in custom executor: " + Thread.currentThread().getName());
    return "Custom Executor Result";
}, executorService).thenAccept(System.out::println);

在上述代码中,我们创建了一个 ThreadPoolExecutor,并将其传递给 supplyAsync 方法。这样,异步任务就会在我们自定义的线程池中执行。

使用 CompletableFuture 的静态方法 withExecutor

CompletableFuture 提供了 withExecutor 静态方法,允许我们为一组 CompletableFuture 操作指定执行器。例如:

ExecutorService executorService = Executors.newSingleThreadExecutor();

CompletableFuture.withExecutor(executorService)
               .supplyAsync(() -> {
                    System.out.println("Running in custom executor: " + Thread.currentThread().getName());
                    return "Custom Executor Result";
                })
               .thenApply(result -> result + " - Transformed")
               .thenAccept(System.out::println);

在这个示例中,withExecutor 方法为后续的 supplyAsyncthenApplythenAccept 操作指定了执行器。

配置线程池参数对异步任务的影响

核心线程数和最大线程数

核心线程数决定了线程池中始终保持活动的线程数量。当任务提交到线程池时,如果当前线程池中的线程数量小于核心线程数,会立即创建新的线程来执行任务。最大线程数则限制了线程池能够容纳的最大线程数量。

例如,在一个 I/O 密集型任务场景中,如果核心线程数设置过小,可能会导致任务长时间等待线程资源;而如果最大线程数设置过大,可能会消耗过多的系统资源。

ExecutorService executorService = new ThreadPoolExecutor(
    2, // 核心线程数
    5, // 最大线程数
    10L, // 线程存活时间
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100) // 任务队列
);

在上述代码中,核心线程数为 2,最大线程数为 5。当任务提交时,如果当前线程池中的线程数小于 2,会立即创建新线程;如果任务队列已满且线程数小于 5,也会创建新线程。

线程存活时间

线程存活时间是指当线程池中的线程数量超过核心线程数时,多余的线程在没有任务执行的情况下,能够保持存活的时间。如果这个时间设置过长,可能会导致线程资源长时间占用;如果设置过短,可能会导致频繁创建和销毁线程,增加系统开销。

在前面的 ThreadPoolExecutor 示例中,线程存活时间设置为 10 秒,意味着当线程数量超过核心线程数且该线程 10 秒内没有任务执行时,该线程会被销毁。

任务队列

任务队列用于存放等待执行的任务。常见的任务队列类型有 ArrayBlockingQueueLinkedBlockingQueue 等。ArrayBlockingQueue 是一个有界队列,而 LinkedBlockingQueue 可以是有界或无界的(默认无界)。

选择合适的任务队列类型和容量对系统性能有重要影响。例如,如果任务队列容量过小,可能会导致任务无法及时入队,进而触发创建更多线程;如果任务队列容量过大,可能会导致任务在队列中等待时间过长,影响实时性。

异常处理与执行环境

在异步任务执行过程中,可能会出现各种异常。CompletableFuture 提供了丰富的异常处理机制,并且这些异常处理与执行环境也有一定的关联。

未捕获异常处理

当异步任务抛出未捕获的异常时,如果没有显式的异常处理,默认情况下,异常会被 ForkJoinPool 或自定义线程池的未捕获异常处理器处理。

例如,在使用默认执行环境时:

CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Task failed");
});

运行上述代码,我们会在控制台看到类似如下的错误信息:

Exception in thread "ForkJoinPool.commonPool-worker-1" java.lang.RuntimeException: Task failed
	at com.example.CompletableFutureExample.lambda$main$0(CompletableFutureExample.java:12)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1600)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

这表明异常被 ForkJoinPool.commonPool() 的默认机制处理。

显式异常处理

CompletableFuture 提供了 exceptionallyhandle 等方法来显式处理异常。例如:

CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Task failed");
}).exceptionally(ex -> {
    System.out.println("Caught exception: " + ex.getMessage());
    return "Default value";
}).thenAccept(System.out::println);

在上述代码中,exceptionally 方法捕获到任务抛出的异常,并返回一个默认值。这样,即使任务执行失败,后续的处理也能继续进行。

异常处理与自定义执行环境

当使用自定义执行环境时,异常处理机制同样适用。例如,在自定义线程池的情况下:

ExecutorService executorService = Executors.newSingleThreadExecutor();

CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Task failed");
}, executorService).exceptionally(ex -> {
    System.out.println("Caught exception: " + ex.getMessage());
    return "Default value";
}).thenAccept(System.out::println);

这里,即使任务在自定义线程池中执行,exceptionally 方法依然能够捕获并处理异常。

异步任务执行环境的性能调优

线程池大小的优化

确定合适的线程池大小是性能调优的关键。对于 CPU 密集型任务,线程池大小一般设置为 CPU 核心数加 1,这样可以充分利用 CPU 资源,同时避免由于线程上下文切换带来的额外开销。

例如,获取 CPU 核心数并设置线程池大小:

int cpuCores = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = new ThreadPoolExecutor(
    cpuCores + 1,
    cpuCores + 1,
    0L,
    TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>()
);

对于 I/O 密集型任务,由于线程在等待 I/O 操作时会处于空闲状态,所以线程池大小可以适当增大,一般可以设置为 CPU 核心数的 2 到 3 倍。

任务队列的优化

选择合适的任务队列类型和容量对性能有显著影响。对于实时性要求较高的任务,建议使用有界队列,并将队列容量设置得较小,这样可以避免任务在队列中长时间等待。

例如,使用 ArrayBlockingQueue 作为任务队列:

ExecutorService executorService = new ThreadPoolExecutor(
    5,
    10,
    10L,
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(50)
);

这样可以限制任务队列的长度,当队列满时,会触发线程池创建新线程或拒绝任务,从而保证任务的实时性。

监控与调优

为了更好地优化异步任务执行环境的性能,我们需要对线程池和任务执行情况进行监控。可以通过实现 Thread.UncaughtExceptionHandler 接口来监控线程执行过程中的异常,通过 ThreadPoolExecutor 的一些方法(如 getActiveCountgetQueue 等)来获取线程池的运行状态。

例如,监控线程池的活动线程数:

ExecutorService executorService = new ThreadPoolExecutor(
    5,
    10,
    10L,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>()
);

executorService.submit(() -> {
    // 任务逻辑
});

System.out.println("Active threads: " + ((ThreadPoolExecutor) executorService).getActiveCount());

通过不断收集和分析这些监控数据,我们可以根据实际情况调整线程池参数,实现性能的优化。

与其他并发框架的结合使用

与 Spring Framework 的结合

在 Spring 应用中,我们可以很方便地将 CompletableFuture 与 Spring 的依赖注入、事务管理等特性结合使用。例如,通过 Spring 的配置文件或注解来创建和管理自定义线程池,并将其注入到需要使用 CompletableFuture 的服务中。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class ThreadPoolConfig {

    @Bean
    public Executor customExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.initialize();
        return executor;
    }
}

然后在服务类中使用:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

@Service
public class MyService {

    private final Executor customExecutor;

    @Autowired
    public MyService(Executor customExecutor) {
        this.customExecutor = customExecutor;
    }

    public CompletableFuture<String> asyncTask() {
        return CompletableFuture.supplyAsync(() -> {
            // 异步任务逻辑
            return "Async Task Result";
        }, customExecutor);
    }
}

这样,我们就可以在 Spring 环境中更好地管理和使用 CompletableFuture 的异步任务执行环境。

与 Akka 的结合

Akka 是一个基于 Actor 模型的高性能并发框架。我们可以将 CompletableFuture 与 Akka 结合,利用 Akka 的分布式计算、容错等特性。例如,在 Akka 中发送异步消息,并使用 CompletableFuture 来处理消息处理结果。

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.PatternsCS;
import akka.util.Timeout;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class AkkaCompletableFutureExample {

    public static void main(String[] args) throws Exception {
        ActorSystem system = ActorSystem.create("MySystem");
        ActorRef actorRef = system.actorOf(Props.create(MyActor.class), "myActor");

        Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
        Future<Object> future = PatternsCS.ask(actorRef, "Hello", timeout);

        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        future.onComplete((result, ex) -> {
            if (ex == null) {
                completableFuture.complete((String) result);
            } else {
                completableFuture.completeExceptionally(ex);
            }
        }, system.dispatcher());

        String response = completableFuture.get();
        System.out.println("Response: " + response);

        system.terminate();
    }
}

在上述代码中,我们使用 Akka 发送异步消息,并通过 CompletableFuture 来处理消息处理结果,实现了两个框架的结合使用。

不同场景下的最佳实践

高并发读场景

在高并发读场景中,如缓存读取、数据库查询等,我们可以创建一个较大线程数的线程池,以充分利用系统资源,提高并发处理能力。同时,使用有界队列来避免任务队列无限增长导致的内存问题。

ExecutorService executorService = new ThreadPoolExecutor(
    20, // 核心线程数
    50, // 最大线程数
    10L, // 线程存活时间
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000) // 任务队列
);

然后在 CompletableFuture 中使用这个线程池执行读任务:

CompletableFuture.supplyAsync(() -> {
    // 读任务逻辑,如数据库查询
    return "Read Result";
}, executorService).thenAccept(System.out::println);

批处理任务场景

对于批处理任务,如数据导入、文件处理等,我们可以根据任务的性质和系统资源情况来调整线程池参数。如果任务是 CPU 密集型,可以设置较小的核心线程数;如果是 I/O 密集型,可以适当增大核心线程数。

例如,对于一个文件处理的批处理任务:

ExecutorService executorService = new ThreadPoolExecutor(
    5, // 核心线程数,根据实际情况调整
    10, // 最大线程数
    10L, // 线程存活时间
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>()
);

for (int i = 0; i < 100; i++) {
    int taskIndex = i;
    CompletableFuture.runAsync(() -> {
        // 文件处理任务逻辑
        System.out.println("Processing task " + taskIndex);
    }, executorService);
}

在这个示例中,我们通过 CompletableFuture.runAsync 方法提交多个批处理任务到自定义线程池执行。

实时性要求高的场景

在实时性要求高的场景中,如实时数据处理、即时通讯等,我们需要确保任务能够及时执行,避免任务在队列中长时间等待。可以使用较小容量的有界队列,并适当增大线程池的核心线程数和最大线程数。

ExecutorService executorService = new ThreadPoolExecutor(
    10, // 核心线程数
    20, // 最大线程数
    5L, // 线程存活时间
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100) // 任务队列
);

然后在 CompletableFuture 中使用这个线程池执行实时任务:

CompletableFuture.supplyAsync(() -> {
    // 实时任务逻辑
    return "Real - time Result";
}, executorService).thenAccept(System.out::println);

通过这样的配置,可以尽量减少任务的等待时间,满足实时性要求。

通过以上对 CompletableFuture 异步任务执行环境配置的深入探讨,我们了解了其基础概念、默认执行环境、自定义执行环境的方式、异常处理、性能调优以及与其他框架的结合使用等方面的知识。在实际应用中,我们需要根据具体的业务场景和需求,合理配置异步任务执行环境,以提高系统的性能和稳定性。