MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步任务执行错误恢复

2023-02-273.2k 阅读

CompletableFuture 基础回顾

在深入探讨 CompletableFuture 的错误恢复机制之前,我们先来简要回顾一下 CompletableFuture 的基本概念和用法。

CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它实现了 Future 和 CompletionStage 接口。通过 CompletableFuture,我们可以更方便地进行异步计算、处理异步结果以及组合多个异步操作。

创建 CompletableFuture

  1. 使用 supplyAsync 创建有返回值的异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

在上述代码中,supplyAsync 方法接受一个 Supplier 作为参数,该 Supplierget 方法定义了异步执行的任务。supplyAsync 方法会在一个默认的 Fork/Join 线程池中异步执行这个任务,并返回一个 CompletableFuture,其泛型类型与 Supplier 的返回类型一致。

  1. 使用 runAsync 创建无返回值的异步任务
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
    // 模拟耗时操作
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("无返回值的异步任务执行完成");
});

runAsync 方法接受一个 Runnable 作为参数,同样在默认的 Fork/Join 线程池中异步执行任务。由于 Runnable 没有返回值,所以 CompletableFuture 的泛型类型为 Void

获取 CompletableFuture 的结果

  1. 使用 get 方法阻塞获取结果
try {
    String result = future.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

get 方法会阻塞当前线程,直到 CompletableFuture 完成并获取到结果。如果异步任务抛出异常,get 方法会将该异常包装成 ExecutionException 重新抛出,同时还可能抛出 InterruptedException 表示当前线程在等待过程中被中断。

  1. 使用 join 方法阻塞获取结果
String result2 = future.join();
System.out.println(result2);

join 方法与 get 方法类似,也是阻塞当前线程获取结果。不同之处在于,join 方法不会抛出 InterruptedExceptionExecutionException,而是直接抛出原始的异常(如果有)。

CompletableFuture 异步任务中的错误情况

在异步任务执行过程中,可能会出现各种错误情况。了解这些错误情况及其产生的原因,对于有效地进行错误恢复至关重要。

任务执行时抛出异常

当在 supplyAsyncrunAsync 定义的任务逻辑中抛出异常时,CompletableFuture 会进入异常完成状态。

CompletableFuture<String> futureWithException = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("任务执行出错");
});
try {
    String result = futureWithException.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在上述代码中,supplyAsync 定义的任务直接抛出了 RuntimeException。当调用 get 方法获取结果时,会捕获到 ExecutionException,其内部包含了原始抛出的 RuntimeException

依赖的 CompletableFuture 出错

在 CompletableFuture 的组合操作中,如果某个依赖的 CompletableFuture 出现异常,也会影响到后续的操作。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "第一步结果");
CompletableFuture<String> future2 = future1.thenApplyAsync(result -> {
    if ("第一步结果".equals(result)) {
        throw new RuntimeException("第二步出错");
    }
    return "第二步处理后结果";
});
try {
    String finalResult = future2.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在这个例子中,future2 依赖于 future1 的结果。future1 正常完成,但 future2thenApplyAsync 操作中抛出了异常。此时,当获取 future2 的结果时,同样会捕获到 ExecutionException

CompletableFuture 的错误恢复机制

Java 的 CompletableFuture 提供了一系列方法来处理异步任务执行过程中的错误,实现错误恢复。

exceptionally 方法

exceptionally 方法用于在 CompletableFuture 出现异常时提供一个替代结果。

CompletableFuture<String> futureWithException2 = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("任务执行出错");
});
CompletableFuture<String> recoveredFuture = futureWithException2.exceptionally(ex -> {
    System.out.println("捕获到异常: " + ex.getMessage());
    return "默认结果";
});
try {
    String result = recoveredFuture.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在上述代码中,exceptionally 方法接受一个 Function,该 Function 的参数为 Throwable,即捕获到的异常。在 Function 中,我们可以根据异常情况返回一个替代结果。这样,即使原始的 CompletableFuture 出现异常,通过 exceptionally 处理后,仍然可以得到一个结果,避免了因为异常而导致后续流程中断。

handle 方法

handle 方法可以同时处理正常完成和异常完成的情况。

CompletableFuture<String> futureWithException3 = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("任务执行出错");
});
CompletableFuture<String> handledFuture = futureWithException3.handle((result, ex) -> {
    if (ex != null) {
        System.out.println("捕获到异常: " + ex.getMessage());
        return "异常时的默认结果";
    } else {
        return "正常结果: " + result;
    }
});
try {
    String finalResult = handledFuture.get();
    System.out.println(finalResult);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

handle 方法接受一个 BiFunction,第一个参数为 CompletableFuture 正常完成时的结果(如果出现异常则为 null),第二个参数为异常(如果正常完成则为 null)。通过 handle 方法,我们可以更灵活地根据任务执行的不同情况进行处理。

whenComplete 方法

whenComplete 方法用于在 CompletableFuture 完成(无论是正常完成还是异常完成)时执行一个操作,但它不会改变 CompletableFuture 的结果。

CompletableFuture<String> futureWithException4 = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("任务执行出错");
});
CompletableFuture<Void> whenCompleteFuture = futureWithException4.whenComplete((result, ex) -> {
    if (ex != null) {
        System.out.println("捕获到异常: " + ex.getMessage());
    } else {
        System.out.println("正常结果: " + result);
    }
});
try {
    whenCompleteFuture.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

whenComplete 方法接受一个 BiConsumer,同样,第一个参数为正常结果(异常时为 null),第二个参数为异常(正常时为 null)。与 handle 方法不同,whenComplete 方法不会返回一个新的 CompletableFuture 来替换原始的 CompletableFuture,它主要用于在任务完成时执行一些额外的操作,比如记录日志等。

复杂异步任务链中的错误恢复

在实际应用中,我们常常会构建复杂的异步任务链,涉及多个 CompletableFuture 的组合操作。在这种情况下,正确处理错误恢复显得尤为重要。

链式操作中的错误传递与恢复

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("A 任务出错");
});
CompletableFuture<String> futureB = futureA.thenApplyAsync(result -> {
    // 这里不会执行,因为 futureA 出错
    return "B 任务处理 " + result;
}).exceptionally(ex -> {
    System.out.println("捕获到 A 任务异常: " + ex.getMessage());
    return "A 任务异常时 B 任务的默认结果";
});
CompletableFuture<String> futureC = futureB.thenApplyAsync(result -> {
    return "C 任务处理 " + result;
});
try {
    String finalResult = futureC.get();
    System.out.println(finalResult);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在上述代码中,futureA 抛出异常,futureB 通过 exceptionally 方法捕获到 futureA 的异常并提供了替代结果。futureC 依赖于 futureB 的结果,即使 futureA 出错,由于 futureB 的错误恢复,futureC 仍然可以继续执行。

多个 CompletableFuture 并行执行时的错误处理

当多个 CompletableFuture 并行执行时,我们需要处理其中某个或多个任务出错的情况。

CompletableFuture<String> futureX = CompletableFuture.supplyAsync(() -> {
    return "X 任务结果";
});
CompletableFuture<String> futureY = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Y 任务出错");
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureX, futureY);
CompletableFuture<List<String>> resultsFuture = CompletableFuture.supplyAsync(() -> {
    List<String> results = new ArrayList<>();
    try {
        results.add(futureX.get());
        results.add(futureY.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    return results;
});
resultsFuture.exceptionally(ex -> {
    System.out.println("捕获到并行任务中的异常: " + ex.getMessage());
    return Collections.singletonList("默认结果");
}).thenAccept(results -> {
    System.out.println("最终结果: " + results);
});

在这个例子中,futureXfutureY 并行执行,futureY 抛出异常。通过 CompletableFuture.allOf 方法等待所有任务完成,在获取结果时捕获异常,并通过 exceptionally 方法进行错误恢复。

自定义线程池与错误处理

在实际应用中,我们可能需要使用自定义的线程池来执行 CompletableFuture 任务,并且要确保在自定义线程池环境下的错误处理机制正常工作。

使用自定义线程池执行 CompletableFuture 任务

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> futureWithCustomExecutor = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("任务执行出错");
}, executor);
futureWithCustomExecutor.exceptionally(ex -> {
    System.out.println("捕获到异常: " + ex.getMessage());
    return "默认结果";
}).thenAccept(result -> {
    System.out.println(result);
});
executor.shutdown();

在上述代码中,我们创建了一个固定大小为 5 的线程池 executor,并将其作为参数传递给 supplyAsync 方法。这样,异步任务将在自定义线程池中执行。在任务出现异常时,仍然可以通过 exceptionally 方法进行错误恢复。

自定义线程池中的异常处理策略

class CustomThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    CustomThreadFactory(String namePrefix) {
        this.namePrefix = namePrefix;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
        t.setUncaughtExceptionHandler((thread, ex) -> {
            System.out.println("线程 " + thread.getName() + " 抛出异常: " + ex.getMessage());
        });
        return t;
    }
}
ExecutorService customExecutor = new ThreadPoolExecutor(
        2,
        5,
        10L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(),
        new CustomThreadFactory("自定义线程池"));
CompletableFuture<String> futureWithCustomExecutor2 = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("任务执行出错");
}, customExecutor);
futureWithCustomExecutor2.exceptionally(ex -> {
    System.out.println("捕获到异常: " + ex.getMessage());
    return "默认结果";
}).thenAccept(result -> {
    System.out.println(result);
});
customExecutor.shutdown();

在这个例子中,我们自定义了一个 ThreadFactory,并在其中设置了 UncaughtExceptionHandler 来处理线程执行过程中未捕获的异常。通过这种方式,我们可以在自定义线程池环境下更好地监控和处理异常,同时结合 CompletableFuture 自身的错误恢复机制,确保异步任务的健壮性。

与其他异步编程模型结合时的错误恢复

在实际项目中,CompletableFuture 可能会与其他异步编程模型(如 RxJava、Guava 的 ListenableFuture 等)结合使用。在这种情况下,需要妥善处理不同模型之间的错误转换和恢复。

CompletableFuture 与 RxJava 结合时的错误处理

import io.reactivex.Completable;
import io.reactivex.CompletableObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
    throw new RuntimeException("CompletableFuture 任务出错");
});
Completable.fromFuture(completableFuture)
      .subscribeOn(Schedulers.io())
      .subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onComplete() {
                System.out.println("RxJava 任务正常完成");
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("捕获到 CompletableFuture 转换到 RxJava 的异常: " + e.getMessage());
            }
        });

在上述代码中,我们将 CompletableFuture 转换为 RxJava 的 Completable。通过 CompletableonError 方法捕获 CompletableFuture 抛出的异常,实现了从 CompletableFuture 到 RxJava 的错误处理衔接。

CompletableFuture 与 Guava 的 ListenableFuture 结合时的错误处理

import com.google.common.util.concurrent.*;

ExecutorService guavaExecutor = Executors.newFixedThreadPool(3);
ListenableFutureTask<String> listenableFutureTask = ListenableFutureTask.create(() -> {
    throw new RuntimeException("ListenableFutureTask 任务出错");
});
guavaExecutor.submit(listenableFutureTask);
Futures.addCallback(listenableFutureTask, new FutureCallback<String>() {
    @Override
    public void onSuccess(String result) {
        System.out.println("Guava ListenableFuture 任务正常完成: " + result);
    }
    @Override
    public void onFailure(Throwable t) {
        System.out.println("捕获到 Guava ListenableFuture 任务异常: " + t.getMessage());
    }
}, guavaExecutor);
guavaExecutor.shutdown();

在这个例子中,我们创建了一个 Guava 的 ListenableFutureTask,并将其提交到线程池执行。通过 Futures.addCallback 方法添加回调,在 onFailure 方法中捕获任务执行过程中的异常,实现了 Guava 异步任务的错误处理。如果要与 CompletableFuture 结合,需要进行适当的转换,并在转换过程中处理好错误传递和恢复。

通过上述详细的介绍和代码示例,相信你对 Java 中 CompletableFuture 的异步任务执行错误恢复有了全面而深入的理解。在实际开发中,合理运用这些错误恢复机制,可以大大提高异步程序的健壮性和稳定性。无论是简单的异步任务,还是复杂的异步任务链与并行任务,都能够有效地应对各种错误情况,确保程序的正常运行。