MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture在多线程环境下的正确使用姿势

2024-07-192.3k 阅读

1. Java 多线程编程概述

在现代软件开发中,多线程编程是提高应用程序性能和响应性的关键技术之一。Java 作为一种广泛应用的编程语言,提供了丰富的多线程编程支持。传统的多线程编程方式,如使用 Thread 类和 Runnable 接口,虽然能够实现基本的多线程功能,但在处理复杂的异步任务和并发操作时,会面临诸多挑战,例如代码复杂度过高、线程同步问题以及难以处理异步任务的结果等。

2. CompletableFuture 简介

CompletableFuture 是 Java 8 引入的一个强大的类,用于支持异步计算。它实现了 Future 接口和 CompletionStage 接口,不仅提供了异步任务的执行和结果获取功能,还具备更强大的异步任务组合、链式调用以及错误处理能力。CompletableFuture 的设计理念使得在多线程环境下编写异步代码变得更加简洁和高效。

3. 创建 CompletableFuture 对象

3.1 直接创建已完成的 CompletableFuture

通过 CompletableFuture.completedFuture 方法可以直接创建一个已完成的 CompletableFuture 对象,其结果已经确定。例如:

CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Hello, CompletableFuture!");
completedFuture.thenAccept(System.out::println);

上述代码中,completedFuture 已经是完成状态,调用 thenAccept 方法会立即执行传入的消费者函数,打印出字符串 "Hello, CompletableFuture!"。

3.2 通过异步任务创建 CompletableFuture

使用 CompletableFuture.supplyAsync 方法可以提交一个异步任务并返回一个 CompletableFuture 对象,该对象在任务完成时会包含任务的返回结果。例如:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Task completed";
});
future.thenAccept(System.out::println);

在这个例子中,supplyAsync 方法接收一个 Supplier 接口的实现,在新的线程中执行该 Supplier 中的代码。任务完成后,CompletableFuture 对象的结果被设置为 Supplier 的返回值,thenAccept 方法用于处理这个结果。

4. 获取 CompletableFuture 的结果

4.1 使用 get 方法

get 方法用于获取 CompletableFuture 的结果,如果任务尚未完成,调用该方法的线程会阻塞,直到任务完成。例如:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Result");
try {
    String result = future.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

这里通过 future.get() 获取异步任务的结果,如果任务还没完成,主线程会一直等待,直到任务完成并返回结果。InterruptedException 是因为等待过程中线程可能被中断,ExecutionException 是因为任务执行过程中可能抛出异常。

4.2 使用 join 方法

join 方法也用于获取结果,与 get 方法类似,但它不会抛出 InterruptedExceptionExecutionException,而是将异常包装成 CompletionExceptionUndeclaredThrowableException。例如:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Result");
String result = future.join();
System.out.println(result);

join 方法在一些情况下使用起来更加简洁,不需要显式地捕获 InterruptedExceptionExecutionException,但需要注意异常的处理方式有所不同。

5. 处理 CompletableFuture 的完成事件

5.1 thenAccept 方法

thenAccept 方法在 CompletableFuture 完成时执行一个消费者函数,该函数接收任务的结果作为参数,但不返回新的结果。例如:

CompletableFuture.supplyAsync(() -> "Hello")
      .thenAccept(s -> System.out.println("The result is: " + s));

在这个例子中,当异步任务返回 "Hello" 时,thenAccept 中的消费者函数会被执行,打印出 "The result is: Hello"。

5.2 thenApply 方法

thenApply 方法在 CompletableFuture 完成时执行一个函数,该函数接收任务的结果作为参数,并返回一个新的结果,从而生成一个新的 CompletableFuture 对象。例如:

CompletableFuture.supplyAsync(() -> "Hello")
      .thenApply(String::toUpperCase)
      .thenAccept(System.out::println);

这里,首先异步任务返回 "Hello",thenApply 方法将其转换为大写 "HELLO",并返回一个新的 CompletableFuture,接着 thenAccept 打印出 "HELLO"。

5.3 thenRun 方法

thenRun 方法在 CompletableFuture 完成时执行一个 Runnable,它不接收任务的结果,也不返回新的结果。例如:

CompletableFuture.supplyAsync(() -> "Hello")
      .thenRun(() -> System.out.println("Task completed"));

当异步任务完成返回 "Hello" 后,thenRun 中的 Runnable 会被执行,打印出 "Task completed",但它不关心任务的具体返回值。

6. 组合 CompletableFuture

6.1 thenCompose 方法

thenCompose 方法用于将两个 CompletableFuture 组合起来,第一个 CompletableFuture 的结果作为参数传递给第二个 CompletableFuture 的生成函数。例如:

CompletableFuture.supplyAsync(() -> "Hello")
      .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ", World"))
      .thenAccept(System.out::println);

在这个例子中,第一个异步任务返回 "Hello",thenCompose 方法将 "Hello" 作为参数传递给第二个 CompletableFuture 的生成函数,第二个异步任务返回 "Hello, World",最后 thenAccept 打印出结果。

6.2 thenCombine 方法

thenCombine 方法用于将两个 CompletableFuture 的结果组合起来,通过一个函数将两个结果合并为一个新的结果。例如:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
future1.thenCombine(future2, (s1, s2) -> s1 + ", " + s2)
      .thenAccept(System.out::println);

这里,future1future2 两个异步任务并行执行,当它们都完成后,thenCombine 方法将两个任务的结果合并为 "Hello, World",并通过 thenAccept 打印出来。

7. 处理 CompletableFuture 的异常

7.1 exceptionally 方法

exceptionally 方法用于处理 CompletableFuture 执行过程中抛出的异常,它返回一个新的 CompletableFuture,如果原 CompletableFuture 正常完成,则返回原结果,否则返回异常处理函数的结果。例如:

CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("Simulated exception");
    }
    return "Success";
})
      .exceptionally(e -> {
            System.out.println("Caught exception: " + e.getMessage());
            return "Default value";
        })
      .thenAccept(System.out::println);

在这个例子中,异步任务有 50% 的概率抛出异常。如果抛出异常,exceptionally 方法中的异常处理函数会被执行,打印出异常信息并返回 "Default value";如果任务正常完成,则返回任务的正常结果。

7.2 handle 方法

handle 方法既可以处理正常结果,也可以处理异常情况。它接收一个 BiFunction,第一个参数是正常结果,第二个参数是异常(如果有)。例如:

CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("Simulated exception");
    }
    return "Success";
})
      .handle((result, e) -> {
            if (e != null) {
                System.out.println("Caught exception: " + e.getMessage());
                return "Default value";
            }
            return result;
        })
      .thenAccept(System.out::println);

handle 方法在任务正常完成时,result 为任务的返回值,enull;当任务抛出异常时,resultnulle 为异常对象。通过这种方式可以灵活地处理正常结果和异常情况。

8. CompletableFuture 的线程池使用

在默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 来执行异步任务。然而,在一些场景下,我们可能需要使用自定义的线程池,以更好地控制线程资源和性能。例如:

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> "Result", executor)
      .thenAccept(System.out::println)
      .whenComplete((r, e) -> executor.shutdown());

这里通过 Executors.newFixedThreadPool 创建了一个固定大小为 10 的线程池,并将其作为参数传递给 supplyAsync 方法,这样异步任务就会在这个自定义线程池中执行。任务完成后,通过 whenComplete 方法关闭线程池。

9. 实战案例:多任务并行处理与结果汇总

假设我们有一个需求,需要从多个数据源获取数据并汇总结果。可以使用 CompletableFuture 来实现这个功能。例如:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CompletableFutureExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> fetchDataFromSource1(), executor);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> fetchDataFromSource2(), executor);
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> fetchDataFromSource3(), executor);

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

        CompletableFuture<List<String>> combinedFuture = allFutures.thenApply(v -> {
            List<String> results = new ArrayList<>();
            try {
                results.add(future1.get());
                results.add(future2.get());
                results.add(future3.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            return results;
        });

        combinedFuture.thenAccept(System.out::println).whenComplete((r, e) -> executor.shutdown());
    }

    private static String fetchDataFromSource1() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Data from source 1";
    }

    private static String fetchDataFromSource2() {
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Data from source 2";
    }

    private static String fetchDataFromSource3() {
        try {
            Thread.sleep(2500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Data from source 3";
    }
}

在这个例子中,我们创建了三个异步任务,分别从不同数据源获取数据。CompletableFuture.allOf 方法用于等待所有任务完成,然后通过 thenApply 方法将所有任务的结果汇总到一个列表中,最后打印出汇总结果。在任务完成后关闭线程池。

10. CompletableFuture 的性能优化

10.1 合理设置线程池大小

线程池大小的设置对 CompletableFuture 的性能至关重要。如果线程池过大,会增加线程上下文切换的开销;如果线程池过小,会导致任务排队等待,降低系统的并发处理能力。一般来说,可以根据任务的类型(CPU 密集型或 I/O 密集型)来设置线程池大小。对于 CPU 密集型任务,线程池大小可以设置为 CPU 核心数加 1;对于 I/O 密集型任务,线程池大小可以适当增大,例如 CPU 核心数的 2 到 3 倍。

10.2 减少不必要的异步任务嵌套

在编写 CompletableFuture 代码时,应尽量避免不必要的异步任务嵌套。过多的嵌套会导致代码逻辑复杂,增加调试难度,同时也可能影响性能。可以通过合理使用 thenComposethenCombine 等方法来扁平化异步任务,使代码结构更加清晰,性能更优。

10.3 及时处理异常

CompletableFuture 执行过程中,及时处理异常可以避免异常在异步任务链中传播,导致难以定位问题。使用 exceptionallyhandle 等方法可以有效地处理异常,确保程序的稳定性和可靠性。

11. 避免 CompletableFuture 使用中的常见错误

11.1 忘记处理异常

在使用 CompletableFuture 时,很容易忘记处理异步任务中抛出的异常。如果不处理异常,异常可能会在异步线程中被忽略,导致程序出现难以排查的错误。因此,在编写 CompletableFuture 代码时,一定要确保对可能出现的异常进行妥善处理,使用 exceptionallyhandle 方法来捕获和处理异常。

11.2 错误的线程池使用

如前文所述,合理使用线程池对 CompletableFuture 的性能至关重要。如果错误地设置线程池大小,或者在不恰当的时机创建和销毁线程池,都可能导致性能问题。在使用自定义线程池时,要根据具体的业务场景和任务类型来设置线程池参数,并确保线程池的生命周期管理正确。

11.3 过度依赖阻塞方法

虽然 getjoin 方法可以获取 CompletableFuture 的结果,但过度依赖这些阻塞方法会破坏异步编程的优势,导致程序性能下降。应尽量使用 thenAcceptthenApply 等非阻塞方法来处理异步任务的结果,以充分发挥 CompletableFuture 的异步特性。

12. 与其他异步编程模型的比较

12.1 与 Future 的比较

Future 是 Java 早期提供的异步编程接口,它允许我们异步执行任务并获取任务的结果。然而,Future 存在一些局限性,例如获取结果时需要阻塞线程,缺乏对异步任务组合和错误处理的支持等。相比之下,CompletableFuture 不仅实现了 Future 接口,还提供了更强大的异步任务组合、链式调用以及错误处理功能,使得异步编程更加灵活和高效。

12.2 与 RxJava 的比较

RxJava 是一个基于观察者模式的异步编程库,它提供了丰富的操作符来处理异步数据流。与 CompletableFuture 相比,RxJava 的功能更加强大,适用于处理复杂的异步事件流场景。但 RxJava 的学习曲线相对较陡,代码复杂度较高。而 CompletableFuture 作为 Java 标准库的一部分,具有更好的集成性和简单性,对于大多数常规的异步编程场景已经足够。

13. 总结 CompletableFuture 的适用场景

CompletableFuture 适用于多种多线程异步编程场景,例如:

  1. 并行任务执行与结果汇总:当需要同时执行多个独立的任务,并在所有任务完成后汇总结果时,CompletableFutureallOf 方法和结果处理方法可以很方便地实现这个功能。
  2. 异步任务的链式调用:如果异步任务之间存在依赖关系,需要一个任务的结果作为另一个任务的输入,可以使用 thenCompose 等方法实现链式调用,使代码更加简洁和易读。
  3. 异步任务的错误处理CompletableFuture 提供了强大的错误处理机制,exceptionallyhandle 等方法可以有效地捕获和处理异步任务中的异常,确保程序的稳定性。
  4. 多数据源数据获取:在从多个数据源获取数据并进行合并处理的场景中,CompletableFuture 可以并行执行数据获取任务,并在所有数据获取完成后进行合并操作,提高数据获取的效率。

通过合理使用 CompletableFuture,开发人员可以在多线程环境下编写更加高效、简洁和健壮的异步代码,提升应用程序的性能和响应性。在实际项目中,应根据具体的业务需求和场景,选择合适的异步编程方式,充分发挥 CompletableFuture 的优势。同时,要注意避免常见的错误,合理优化性能,以确保异步代码的质量和稳定性。在不断的实践和探索中,深入理解 CompletableFuture 的特性和用法,将有助于我们更好地应对复杂的多线程编程挑战。