Java 中 CompletableFuture 异步任务执行错误恢复
CompletableFuture 基础回顾
在深入探讨 CompletableFuture 的错误恢复机制之前,我们先来简要回顾一下 CompletableFuture 的基本概念和用法。
CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它实现了 Future 和 CompletionStage 接口。通过 CompletableFuture,我们可以更方便地进行异步计算、处理异步结果以及组合多个异步操作。
创建 CompletableFuture
- 使用 supplyAsync 创建有返回值的异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, CompletableFuture!";
});
在上述代码中,supplyAsync
方法接受一个 Supplier
作为参数,该 Supplier
的 get
方法定义了异步执行的任务。supplyAsync
方法会在一个默认的 Fork/Join 线程池中异步执行这个任务,并返回一个 CompletableFuture
,其泛型类型与 Supplier
的返回类型一致。
- 使用 runAsync 创建无返回值的异步任务
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("无返回值的异步任务执行完成");
});
runAsync
方法接受一个 Runnable
作为参数,同样在默认的 Fork/Join 线程池中异步执行任务。由于 Runnable
没有返回值,所以 CompletableFuture
的泛型类型为 Void
。
获取 CompletableFuture 的结果
- 使用 get 方法阻塞获取结果
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
get
方法会阻塞当前线程,直到 CompletableFuture
完成并获取到结果。如果异步任务抛出异常,get
方法会将该异常包装成 ExecutionException
重新抛出,同时还可能抛出 InterruptedException
表示当前线程在等待过程中被中断。
- 使用 join 方法阻塞获取结果
String result2 = future.join();
System.out.println(result2);
join
方法与 get
方法类似,也是阻塞当前线程获取结果。不同之处在于,join
方法不会抛出 InterruptedException
和 ExecutionException
,而是直接抛出原始的异常(如果有)。
CompletableFuture 异步任务中的错误情况
在异步任务执行过程中,可能会出现各种错误情况。了解这些错误情况及其产生的原因,对于有效地进行错误恢复至关重要。
任务执行时抛出异常
当在 supplyAsync
或 runAsync
定义的任务逻辑中抛出异常时,CompletableFuture
会进入异常完成状态。
CompletableFuture<String> futureWithException = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("任务执行出错");
});
try {
String result = futureWithException.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
在上述代码中,supplyAsync
定义的任务直接抛出了 RuntimeException
。当调用 get
方法获取结果时,会捕获到 ExecutionException
,其内部包含了原始抛出的 RuntimeException
。
依赖的 CompletableFuture 出错
在 CompletableFuture 的组合操作中,如果某个依赖的 CompletableFuture
出现异常,也会影响到后续的操作。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "第一步结果");
CompletableFuture<String> future2 = future1.thenApplyAsync(result -> {
if ("第一步结果".equals(result)) {
throw new RuntimeException("第二步出错");
}
return "第二步处理后结果";
});
try {
String finalResult = future2.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
在这个例子中,future2
依赖于 future1
的结果。future1
正常完成,但 future2
在 thenApplyAsync
操作中抛出了异常。此时,当获取 future2
的结果时,同样会捕获到 ExecutionException
。
CompletableFuture 的错误恢复机制
Java 的 CompletableFuture 提供了一系列方法来处理异步任务执行过程中的错误,实现错误恢复。
exceptionally 方法
exceptionally
方法用于在 CompletableFuture
出现异常时提供一个替代结果。
CompletableFuture<String> futureWithException2 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("任务执行出错");
});
CompletableFuture<String> recoveredFuture = futureWithException2.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "默认结果";
});
try {
String result = recoveredFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
在上述代码中,exceptionally
方法接受一个 Function
,该 Function
的参数为 Throwable
,即捕获到的异常。在 Function
中,我们可以根据异常情况返回一个替代结果。这样,即使原始的 CompletableFuture
出现异常,通过 exceptionally
处理后,仍然可以得到一个结果,避免了因为异常而导致后续流程中断。
handle 方法
handle
方法可以同时处理正常完成和异常完成的情况。
CompletableFuture<String> futureWithException3 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("任务执行出错");
});
CompletableFuture<String> handledFuture = futureWithException3.handle((result, ex) -> {
if (ex != null) {
System.out.println("捕获到异常: " + ex.getMessage());
return "异常时的默认结果";
} else {
return "正常结果: " + result;
}
});
try {
String finalResult = handledFuture.get();
System.out.println(finalResult);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
handle
方法接受一个 BiFunction
,第一个参数为 CompletableFuture
正常完成时的结果(如果出现异常则为 null
),第二个参数为异常(如果正常完成则为 null
)。通过 handle
方法,我们可以更灵活地根据任务执行的不同情况进行处理。
whenComplete 方法
whenComplete
方法用于在 CompletableFuture
完成(无论是正常完成还是异常完成)时执行一个操作,但它不会改变 CompletableFuture
的结果。
CompletableFuture<String> futureWithException4 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("任务执行出错");
});
CompletableFuture<Void> whenCompleteFuture = futureWithException4.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("捕获到异常: " + ex.getMessage());
} else {
System.out.println("正常结果: " + result);
}
});
try {
whenCompleteFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
whenComplete
方法接受一个 BiConsumer
,同样,第一个参数为正常结果(异常时为 null
),第二个参数为异常(正常时为 null
)。与 handle
方法不同,whenComplete
方法不会返回一个新的 CompletableFuture
来替换原始的 CompletableFuture
,它主要用于在任务完成时执行一些额外的操作,比如记录日志等。
复杂异步任务链中的错误恢复
在实际应用中,我们常常会构建复杂的异步任务链,涉及多个 CompletableFuture
的组合操作。在这种情况下,正确处理错误恢复显得尤为重要。
链式操作中的错误传递与恢复
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("A 任务出错");
});
CompletableFuture<String> futureB = futureA.thenApplyAsync(result -> {
// 这里不会执行,因为 futureA 出错
return "B 任务处理 " + result;
}).exceptionally(ex -> {
System.out.println("捕获到 A 任务异常: " + ex.getMessage());
return "A 任务异常时 B 任务的默认结果";
});
CompletableFuture<String> futureC = futureB.thenApplyAsync(result -> {
return "C 任务处理 " + result;
});
try {
String finalResult = futureC.get();
System.out.println(finalResult);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
在上述代码中,futureA
抛出异常,futureB
通过 exceptionally
方法捕获到 futureA
的异常并提供了替代结果。futureC
依赖于 futureB
的结果,即使 futureA
出错,由于 futureB
的错误恢复,futureC
仍然可以继续执行。
多个 CompletableFuture 并行执行时的错误处理
当多个 CompletableFuture
并行执行时,我们需要处理其中某个或多个任务出错的情况。
CompletableFuture<String> futureX = CompletableFuture.supplyAsync(() -> {
return "X 任务结果";
});
CompletableFuture<String> futureY = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Y 任务出错");
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureX, futureY);
CompletableFuture<List<String>> resultsFuture = CompletableFuture.supplyAsync(() -> {
List<String> results = new ArrayList<>();
try {
results.add(futureX.get());
results.add(futureY.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return results;
});
resultsFuture.exceptionally(ex -> {
System.out.println("捕获到并行任务中的异常: " + ex.getMessage());
return Collections.singletonList("默认结果");
}).thenAccept(results -> {
System.out.println("最终结果: " + results);
});
在这个例子中,futureX
和 futureY
并行执行,futureY
抛出异常。通过 CompletableFuture.allOf
方法等待所有任务完成,在获取结果时捕获异常,并通过 exceptionally
方法进行错误恢复。
自定义线程池与错误处理
在实际应用中,我们可能需要使用自定义的线程池来执行 CompletableFuture 任务,并且要确保在自定义线程池环境下的错误处理机制正常工作。
使用自定义线程池执行 CompletableFuture 任务
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> futureWithCustomExecutor = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("任务执行出错");
}, executor);
futureWithCustomExecutor.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "默认结果";
}).thenAccept(result -> {
System.out.println(result);
});
executor.shutdown();
在上述代码中,我们创建了一个固定大小为 5 的线程池 executor
,并将其作为参数传递给 supplyAsync
方法。这样,异步任务将在自定义线程池中执行。在任务出现异常时,仍然可以通过 exceptionally
方法进行错误恢复。
自定义线程池中的异常处理策略
class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
CustomThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
t.setUncaughtExceptionHandler((thread, ex) -> {
System.out.println("线程 " + thread.getName() + " 抛出异常: " + ex.getMessage());
});
return t;
}
}
ExecutorService customExecutor = new ThreadPoolExecutor(
2,
5,
10L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new CustomThreadFactory("自定义线程池"));
CompletableFuture<String> futureWithCustomExecutor2 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("任务执行出错");
}, customExecutor);
futureWithCustomExecutor2.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "默认结果";
}).thenAccept(result -> {
System.out.println(result);
});
customExecutor.shutdown();
在这个例子中,我们自定义了一个 ThreadFactory
,并在其中设置了 UncaughtExceptionHandler
来处理线程执行过程中未捕获的异常。通过这种方式,我们可以在自定义线程池环境下更好地监控和处理异常,同时结合 CompletableFuture 自身的错误恢复机制,确保异步任务的健壮性。
与其他异步编程模型结合时的错误恢复
在实际项目中,CompletableFuture 可能会与其他异步编程模型(如 RxJava、Guava 的 ListenableFuture 等)结合使用。在这种情况下,需要妥善处理不同模型之间的错误转换和恢复。
CompletableFuture 与 RxJava 结合时的错误处理
import io.reactivex.Completable;
import io.reactivex.CompletableObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
throw new RuntimeException("CompletableFuture 任务出错");
});
Completable.fromFuture(completableFuture)
.subscribeOn(Schedulers.io())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
System.out.println("RxJava 任务正常完成");
}
@Override
public void onError(Throwable e) {
System.out.println("捕获到 CompletableFuture 转换到 RxJava 的异常: " + e.getMessage());
}
});
在上述代码中,我们将 CompletableFuture
转换为 RxJava 的 Completable
。通过 Completable
的 onError
方法捕获 CompletableFuture
抛出的异常,实现了从 CompletableFuture 到 RxJava 的错误处理衔接。
CompletableFuture 与 Guava 的 ListenableFuture 结合时的错误处理
import com.google.common.util.concurrent.*;
ExecutorService guavaExecutor = Executors.newFixedThreadPool(3);
ListenableFutureTask<String> listenableFutureTask = ListenableFutureTask.create(() -> {
throw new RuntimeException("ListenableFutureTask 任务出错");
});
guavaExecutor.submit(listenableFutureTask);
Futures.addCallback(listenableFutureTask, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("Guava ListenableFuture 任务正常完成: " + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("捕获到 Guava ListenableFuture 任务异常: " + t.getMessage());
}
}, guavaExecutor);
guavaExecutor.shutdown();
在这个例子中,我们创建了一个 Guava 的 ListenableFutureTask
,并将其提交到线程池执行。通过 Futures.addCallback
方法添加回调,在 onFailure
方法中捕获任务执行过程中的异常,实现了 Guava 异步任务的错误处理。如果要与 CompletableFuture 结合,需要进行适当的转换,并在转换过程中处理好错误传递和恢复。
通过上述详细的介绍和代码示例,相信你对 Java 中 CompletableFuture 的异步任务执行错误恢复有了全面而深入的理解。在实际开发中,合理运用这些错误恢复机制,可以大大提高异步程序的健壮性和稳定性。无论是简单的异步任务,还是复杂的异步任务链与并行任务,都能够有效地应对各种错误情况,确保程序的正常运行。