MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture whenComplete的执行机制与应用

2022-03-126.4k 阅读

Java CompletableFuture whenComplete 的执行机制

在Java的异步编程中,CompletableFuture提供了强大的功能来处理异步操作的结果。whenComplete方法是CompletableFuture中用于处理异步任务完成(无论是正常完成还是异常完成)时的回调机制。

1. 基本定义与签名

whenComplete方法有两个重载形式:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  • whenComplete(BiConsumer<? super T,? super Throwable> action):当CompletableFuture完成(正常或异常)时,会同步执行给定的BiConsumer动作。这里的BiConsumer接受两个参数,第一个是异步任务的结果(如果任务正常完成),第二个是任务抛出的异常(如果任务异常完成)。
  • whenCompleteAsync(BiConsumer<? super T,? super Throwable> action):与whenComplete不同,这个方法会异步执行给定的BiConsumer动作。具体来说,它会使用ForkJoinPool.commonPool()作为线程池来执行该动作。

2. 同步执行机制

当使用whenComplete方法时,回调动作会在触发该CompletableFuture完成的线程中同步执行。例如,假设我们有一个简单的异步任务,通过supplyAsync创建一个CompletableFuture,并在其完成时使用whenComplete进行处理:

import java.util.concurrent.CompletableFuture;

public class WhenCompleteSyncExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("Async task is running in thread: " + Thread.currentThread().getName());
            return "Hello, CompletableFuture!";
        }).whenComplete((result, exception) -> {
            if (exception == null) {
                System.out.println("Task completed successfully. Result: " + result);
            } else {
                System.out.println("Task completed with an exception: " + exception);
            }
            System.out.println("whenComplete is running in thread: " + Thread.currentThread().getName());
        });

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,supplyAsync创建了一个异步任务,该任务在一个线程池中线程运行。当这个任务完成后,whenComplete的回调动作会在触发任务完成的同一个线程中执行。从输出结果可以看到,Async task is running in thread:whenComplete is running in thread:打印的线程名是相同的(在默认情况下,supplyAsync使用ForkJoinPool.commonPool()线程池,whenComplete同步执行也在该线程池中的线程)。

3. 异步执行机制

whenCompleteAsync方法会将回调动作提交到ForkJoinPool.commonPool()线程池异步执行。这意味着回调动作不会阻塞触发CompletableFuture完成的线程。下面是一个示例:

import java.util.concurrent.CompletableFuture;

public class WhenCompleteAsyncExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("Async task is running in thread: " + Thread.currentThread().getName());
            return "Hello, CompletableFuture!";
        }).whenCompleteAsync((result, exception) -> {
            if (exception == null) {
                System.out.println("Task completed successfully. Result: " + result);
            } else {
                System.out.println("Task completed with an exception: " + exception);
            }
            System.out.println("whenCompleteAsync is running in thread: " + Thread.currentThread().getName());
        });

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,supplyAsync创建的异步任务和whenCompleteAsync执行的回调动作会在不同的线程中运行。输出结果中,Async task is running in thread:whenCompleteAsync is running in thread:打印的线程名通常是不同的,因为whenCompleteAsync将回调动作提交到了ForkJoinPool.commonPool()线程池中的其他线程执行。

Java CompletableFuture whenComplete 的应用场景

1. 异常处理与结果处理统一

在异步编程中,通常需要分别处理任务成功和失败的情况。whenComplete方法提供了一个统一的入口来处理这两种情况。例如,在进行网络请求的异步操作时,可能会遇到网络异常或者请求成功但返回数据不符合预期的情况。

import java.util.concurrent.CompletableFuture;

public class NetworkRequestExample {
    public static CompletableFuture<String> sendNetworkRequest() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟网络请求
            if (Math.random() > 0.5) {
                return "Successfully fetched data";
            } else {
                throw new RuntimeException("Network error");
            }
        });
    }

    public static void main(String[] args) {
        sendNetworkRequest().whenComplete((result, exception) -> {
            if (exception == null) {
                System.out.println("Network request success: " + result);
            } else {
                System.out.println("Network request failed: " + exception);
            }
        });

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,sendNetworkRequest方法模拟了一个网络请求的异步操作。whenComplete方法统一处理了请求成功时的结果和请求失败时的异常,使得代码结构更加清晰,无需在不同的地方分别处理成功和失败的逻辑。

2. 链式异步操作中的中间结果处理

在复杂的异步操作链中,whenComplete可以用于处理中间结果,同时不影响后续的异步操作。例如,假设有一系列的异步数据处理步骤,我们可能需要在某一步完成后记录一些日志或者进行一些简单的验证。

import java.util.concurrent.CompletableFuture;

public class ChainedAsyncOperationExample {
    public static CompletableFuture<Integer> step1() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Step 1 is running in thread: " + Thread.currentThread().getName());
            return 10;
        });
    }

    public static CompletableFuture<Integer> step2(int input) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Step 2 is running in thread: " + Thread.currentThread().getName());
            return input * 2;
        });
    }

    public static CompletableFuture<Integer> step3(int input) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Step 3 is running in thread: " + Thread.currentThread().getName());
            return input + 5;
        });
    }

    public static void main(String[] args) {
        step1()
              .whenComplete((result, exception) -> {
                    if (exception == null) {
                        System.out.println("Step 1 result: " + result);
                    } else {
                        System.out.println("Step 1 failed: " + exception);
                    }
                })
              .thenApply(ChainedAsyncOperationExample::step2)
              .whenComplete((result, exception) -> {
                    if (exception == null) {
                        System.out.println("Step 2 result: " + result.join());
                    } else {
                        System.out.println("Step 2 failed: " + exception);
                    }
                })
              .thenApply(ChainedAsyncOperationExample::step3)
              .whenComplete((result, exception) -> {
                    if (exception == null) {
                        System.out.println("Final result: " + result.join());
                    } else {
                        System.out.println("Final step failed: " + exception);
                    }
                });

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,step1step2step3构成了一个异步操作链。通过在每一步之后使用whenComplete,我们可以在不中断操作链的情况下,对每一步的结果进行处理(如打印日志或验证结果)。

3. 资源清理

在异步任务完成后,可能需要清理相关的资源,如关闭文件、释放数据库连接等。whenComplete可以方便地在任务完成时执行这些资源清理操作。

import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class ResourceCleanupExample {
    public static CompletableFuture<Void> writeToFileAsync(String content, String filePath) {
        return CompletableFuture.runAsync(() -> {
            try (FileWriter writer = new FileWriter(filePath)) {
                writer.write(content);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).whenComplete((result, exception) -> {
            if (exception != null) {
                System.out.println("Writing to file failed: " + exception);
            }
            // 这里可以进行其他资源清理操作,例如关闭数据库连接等
            System.out.println("Resource cleanup completed.");
        });
    }

    public static void main(String[] args) {
        writeToFileAsync("This is some content", "test.txt");

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,writeToFileAsync方法异步地将内容写入文件。whenComplete方法在任务完成后,无论成功与否,都会执行资源清理相关的操作(这里简单打印了清理完成的信息,实际应用中可以进行真正的资源释放操作)。

4. 并发任务汇总结果处理

当有多个并发的异步任务,并且需要在所有任务完成后对结果进行汇总处理时,whenComplete可以与CompletableFuture.allOf结合使用。例如,假设有多个异步任务分别计算不同部分的数据,最后需要将这些数据汇总成一个结果。

import java.util.concurrent.CompletableFuture;

public class ConcurrentTaskAggregationExample {
    public static CompletableFuture<Integer> task1() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 1 is running in thread: " + Thread.currentThread().getName());
            return 10;
        });
    }

    public static CompletableFuture<Integer> task2() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 2 is running in thread: " + Thread.currentThread().getName());
            return 20;
        });
    }

    public static CompletableFuture<Integer> task3() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 3 is running in thread: " + Thread.currentThread().getName());
            return 30;
        });
    }

    public static void main(String[] args) {
        CompletableFuture[] tasks = {task1(), task2(), task3()};
        CompletableFuture.allOf(tasks).whenComplete((result, exception) -> {
            if (exception == null) {
                int total = 0;
                for (CompletableFuture<Integer> task : tasks) {
                    total += task.join();
                }
                System.out.println("Total result: " + total);
            } else {
                System.out.println("Tasks failed: " + exception);
            }
        });

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,task1task2task3是并发执行的异步任务。CompletableFuture.allOf等待所有任务完成,然后whenComplete方法在所有任务完成后,对各个任务的结果进行汇总计算,并处理可能出现的异常。

深入理解 whenComplete 的细节与注意事项

1. 异常传递与处理

CompletableFuture在执行过程中发生异常时,异常会传递给whenComplete的回调函数。如果在whenComplete中没有对异常进行适当处理,异常不会被自动抛出到上层调用栈。例如:

import java.util.concurrent.CompletableFuture;

public class ExceptionPropagationExample {
    public static CompletableFuture<String> faultyTask() {
        return CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Task failed");
        });
    }

    public static void main(String[] args) {
        faultyTask().whenComplete((result, exception) -> {
            // 这里只是打印异常信息,没有进一步处理
            if (exception != null) {
                System.out.println("Exception in whenComplete: " + exception);
            }
        });

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,faultyTask抛出了一个运行时异常。whenComplete捕获到这个异常并打印了信息,但异常并没有向上层调用栈传播。如果希望异常继续向上传播,可以在whenComplete中再次抛出异常,或者使用exceptionally方法进行更全面的异常处理。

import java.util.concurrent.CompletableFuture;

public class ExceptionHandlingExample {
    public static CompletableFuture<String> faultyTask() {
        return CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Task failed");
        });
    }

    public static void main(String[] args) {
        faultyTask().whenComplete((result, exception) -> {
            if (exception != null) {
                // 再次抛出异常
                throw new RuntimeException(exception);
            }
        }).exceptionally(exception -> {
            System.out.println("Caught exception in exceptionally: " + exception);
            return "Default value";
        });

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个改进的代码中,whenComplete再次抛出异常,然后exceptionally方法捕获并处理了这个异常,同时返回了一个默认值。

2. 线程安全性

whenComplete的回调函数中,如果访问和修改共享资源,需要注意线程安全问题。由于whenCompleteAsync可能在不同线程中执行回调,共享资源的并发访问可能导致数据竞争和不一致。例如:

import java.util.concurrent.CompletableFuture;

public class ThreadSafetyExample {
    private static int sharedValue = 0;

    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("Async task is running in thread: " + Thread.currentThread().getName());
            return 10;
        }).whenCompleteAsync((result, exception) -> {
            if (exception == null) {
                // 这里访问和修改共享资源
                sharedValue += result;
                System.out.println("Shared value updated to: " + sharedValue);
            }
        });

        CompletableFuture.supplyAsync(() -> {
            System.out.println("Another async task is running in thread: " + Thread.currentThread().getName());
            return 20;
        }).whenCompleteAsync((result, exception) -> {
            if (exception == null) {
                // 这里访问和修改共享资源
                sharedValue += result;
                System.out.println("Shared value updated to: " + sharedValue);
            }
        });

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,两个异步任务都在whenCompleteAsync回调中访问和修改sharedValue。由于这两个回调可能在不同线程中执行,可能会出现数据竞争问题,导致最终的sharedValue值不符合预期。为了解决这个问题,可以使用线程安全的类(如AtomicInteger)或者使用同步机制(如synchronized关键字)。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadSafetyFixedExample {
    private static AtomicInteger sharedValue = new AtomicInteger(0);

    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("Async task is running in thread: " + Thread.currentThread().getName());
            return 10;
        }).whenCompleteAsync((result, exception) -> {
            if (exception == null) {
                // 使用AtomicInteger保证线程安全
                sharedValue.addAndGet(result);
                System.out.println("Shared value updated to: " + sharedValue.get());
            }
        });

        CompletableFuture.supplyAsync(() -> {
            System.out.println("Another async task is running in thread: " + Thread.currentThread().getName());
            return 20;
        }).whenCompleteAsync((result, exception) -> {
            if (exception == null) {
                // 使用AtomicInteger保证线程安全
                sharedValue.addAndGet(result);
                System.out.println("Shared value updated to: " + sharedValue.get());
            }
        });

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在改进后的代码中,使用AtomicInteger代替普通的int,通过其原子操作方法addAndGet保证了对sharedValue的线程安全访问和修改。

3. 与其他 CompletableFuture 方法的组合使用

whenComplete通常与其他CompletableFuture方法(如thenApplythenAcceptthenRun等)组合使用,以构建复杂的异步操作链。例如,thenApply用于对异步任务的结果进行转换,whenComplete可以在转换完成后进行结果验证或日志记录。

import java.util.concurrent.CompletableFuture;

public class MethodCombinationExample {
    public static CompletableFuture<String> fetchData() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Fetching data in thread: " + Thread.currentThread().getName());
            return "原始数据";
        });
    }

    public static String processData(String data) {
        System.out.println("Processing data in thread: " + Thread.currentThread().getName());
        return "处理后的数据: " + data;
    }

    public static void main(String[] args) {
        fetchData()
              .thenApply(MethodCombinationExample::processData)
              .whenComplete((result, exception) -> {
                    if (exception == null) {
                        System.out.println("Final result: " + result);
                    } else {
                        System.out.println("Task failed: " + exception);
                    }
                });

        // 防止主线程提前退出
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,fetchData异步获取数据,thenApply对获取到的数据进行处理,whenComplete在处理完成后对最终结果进行处理(打印结果或处理异常)。这种组合使用方式使得异步操作的流程更加清晰和灵活。

4. 性能考虑

虽然whenCompleteAsync提供了异步执行回调的功能,但频繁地使用whenCompleteAsync可能会对性能产生一定影响。因为每次调用whenCompleteAsync都会将回调任务提交到ForkJoinPool.commonPool()线程池,这会带来线程调度和上下文切换的开销。在性能敏感的场景中,需要权衡是否真正需要异步执行回调。如果回调任务执行时间较短,同步执行(即使用whenComplete)可能是更好的选择,以减少线程池的压力和上下文切换开销。例如,在一个高并发的实时数据处理系统中,如果每个异步任务的whenComplete回调只是简单地记录日志,同步执行可能更高效,因为记录日志的操作通常执行时间较短,不会阻塞触发任务完成的线程太长时间。但如果回调任务涉及到复杂的计算或I/O操作,异步执行可以避免阻塞其他任务,提高系统的整体并发性能。

通过深入理解Java CompletableFuture whenComplete的执行机制和应用场景,并注意其细节与注意事项,开发者可以更加灵活和高效地使用这一强大的异步编程工具,构建出健壮、高性能的异步应用程序。无论是在处理网络请求、进行复杂的异步任务链,还是在资源清理和并发任务汇总等方面,whenComplete都能发挥重要作用,帮助开发者更好地应对现代应用程序中对异步处理的需求。同时,合理地使用whenComplete与其他CompletableFuture方法的组合,以及关注线程安全性和性能问题,将有助于编写高质量的异步代码。