Java CompletableFuture whenComplete全面监控任务状态的应用
Java CompletableFuture whenComplete 全面监控任务状态的应用
1. CompletableFuture 简介
在 Java 中,CompletableFuture 是 Java 8 引入的一个强大的异步编程工具。它实现了 Future 接口和 CompletionStage 接口,不仅能够获取异步任务的执行结果,还提供了丰富的方法来处理异步任务的完成情况,比如链式调用、组合多个异步任务等。这使得异步编程在 Java 中变得更加简洁和高效。
Future 接口是 Java 早期用于异步计算的方式,它允许我们启动一个异步任务并在将来某个时间获取任务的结果。然而,Future 存在一些局限性,比如我们无法得知任务是否完成,只能通过 get()
方法阻塞等待结果,或者使用 isDone()
方法轮询检查任务状态。而且,Future 对于处理异步任务的完成情况缺乏灵活性,难以进行链式调用和组合多个异步任务。
CompletableFuture 则弥补了这些不足。它提供了一系列方法来处理异步任务的完成情况,比如 thenApply()
、thenAccept()
、thenRun()
等,这些方法允许我们在任务完成后执行相应的操作,并且可以链式调用。同时,CompletableFuture 还支持多个异步任务的组合,比如 allOf()
和 anyOf()
方法,使得异步编程更加灵活和强大。
2. whenComplete 方法详解
whenComplete
方法是 CompletableFuture 提供的一个用于处理任务完成情况的方法。它的定义如下:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
这个方法接收一个 BiConsumer
类型的参数 action
,BiConsumer
是一个函数式接口,它有两个参数:第一个参数是异步任务的结果(如果任务成功完成),第二个参数是任务执行过程中抛出的异常(如果任务异常完成)。当异步任务完成(无论是正常完成还是异常完成)时,action
将会被执行。
需要注意的是,whenComplete
方法返回的是一个新的 CompletableFuture,这个新的 CompletableFuture 的结果和原始的 CompletableFuture 的结果是一样的。也就是说,whenComplete
方法不会改变原始 CompletableFuture 的结果,它只是在原始 CompletableFuture 完成时执行指定的操作。
下面是一个简单的示例代码,展示了 whenComplete
方法的基本用法:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class WhenCompleteExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟异步任务
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务完成";
});
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("任务正常完成,结果是: " + result);
} else {
System.out.println("任务异常完成,异常信息是: " + ex.getMessage());
}
});
// 获取异步任务的结果
String result = future.get();
System.out.println("通过 get 方法获取的结果: " + result);
}
}
在这个示例中,我们使用 CompletableFuture.supplyAsync
方法创建了一个异步任务,这个任务会休眠 2 秒后返回一个字符串 "任务完成"。然后,我们调用 whenComplete
方法,在任务完成时打印任务的结果或者异常信息。最后,我们通过 get
方法获取异步任务的结果并打印。
3. whenComplete 方法的应用场景
3.1 日志记录
在实际开发中,我们经常需要记录异步任务的执行情况,以便在出现问题时能够快速定位。whenComplete
方法可以很方便地实现这一功能。
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
public class WhenCompleteLoggingExample {
private static final Logger LOGGER = Logger.getLogger(WhenCompleteLoggingExample.class.getName());
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟异步任务
if (Math.random() < 0.5) {
throw new RuntimeException("任务失败");
}
return "任务成功";
});
future.whenComplete((result, ex) -> {
if (ex == null) {
LOGGER.log(Level.INFO, "任务正常完成,结果是: " + result);
} else {
LOGGER.log(Level.SEVERE, "任务异常完成", ex);
}
});
}
}
在这个示例中,我们在异步任务中通过 Math.random()
方法模拟了任务失败的情况。然后,在 whenComplete
方法中,根据任务的完成情况记录不同级别的日志。如果任务正常完成,记录 INFO 级别的日志;如果任务异常完成,记录 SEVERE 级别的日志,并打印异常堆栈信息。
3.2 资源清理
在一些情况下,异步任务可能会占用一些资源,比如数据库连接、文件句柄等。当任务完成后,我们需要及时清理这些资源,以避免资源泄漏。whenComplete
方法可以帮助我们在任务完成时进行资源清理。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;
public class WhenCompleteResourceCleanupExample {
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
Connection connection = null;
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "password");
// 执行数据库操作
System.out.println("执行数据库操作");
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
});
future.whenComplete((v, ex) -> {
if (ex != null) {
System.out.println("任务异常,清理资源失败: " + ex.getMessage());
} else {
System.out.println("任务正常完成,资源已清理");
}
});
}
}
在这个示例中,我们在异步任务中获取了一个数据库连接,并执行了一些数据库操作。在 finally
块中,我们关闭了数据库连接。然后,在 whenComplete
方法中,根据任务的完成情况打印相应的信息。如果任务异常,打印清理资源失败的信息;如果任务正常完成,打印资源已清理的信息。
3.3 错误处理和重试
在异步任务执行过程中,可能会出现各种错误。我们可以使用 whenComplete
方法来捕获这些错误,并根据错误情况进行相应的处理,比如重试。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class WhenCompleteErrorHandlingAndRetryExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = performTaskWithRetry(3);
String result = future.get();
System.out.println("最终结果: " + result);
}
private static CompletableFuture<String> performTaskWithRetry(int maxRetries) {
CompletableFuture<String> future = new CompletableFuture<>();
performTask(future, maxRetries);
return future;
}
private static void performTask(CompletableFuture<String> future, int retries) {
CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("任务失败");
}
return "任务成功";
}).whenComplete((result, ex) -> {
if (ex == null) {
future.complete(result);
} else if (retries > 0) {
System.out.println("任务失败,重试次数: " + retries);
performTask(future, retries - 1);
} else {
future.completeExceptionally(ex);
}
});
}
}
在这个示例中,我们定义了一个 performTaskWithRetry
方法,该方法接受一个最大重试次数的参数。在 performTask
方法中,我们模拟了一个可能失败的异步任务。如果任务失败且重试次数大于 0,则进行重试;如果重试次数用完仍然失败,则将异常传递给 CompletableFuture
。通过 whenComplete
方法,我们可以根据任务的完成情况进行相应的处理,实现了错误处理和重试的功能。
4. whenComplete 与其他相关方法的对比
4.1 whenComplete 与 thenApply
thenApply
方法也是 CompletableFuture 提供的一个用于处理任务完成情况的方法。它的定义如下:
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
thenApply
方法接收一个 Function
类型的参数 fn
,Function
是一个函数式接口,它只有一个参数,即异步任务的结果(如果任务成功完成),并返回一个新的结果。当异步任务成功完成时,fn
将会被执行,并且 thenApply
方法返回的新的 CompletableFuture 的结果就是 fn
的返回值。
与 whenComplete
方法不同的是,thenApply
方法只能处理任务正常完成的情况,它无法处理任务异常完成的情况。而且,thenApply
方法会改变原始 CompletableFuture 的结果,返回一个新的 CompletableFuture,其结果是 fn
的返回值。
下面是一个示例代码,展示了 thenApply
方法的用法:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenApplyExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "任务完成");
CompletableFuture<Integer> newFuture = future.thenApply(result -> result.length());
Integer length = newFuture.get();
System.out.println("字符串长度: " + length);
}
}
在这个示例中,我们使用 CompletableFuture.supplyAsync
方法创建了一个异步任务,返回一个字符串 "任务完成"。然后,我们调用 thenApply
方法,将字符串的长度作为新的结果返回,并通过 get
方法获取新的 CompletableFuture 的结果。
4.2 whenComplete 与 thenAccept
thenAccept
方法也是 CompletableFuture 提供的一个用于处理任务完成情况的方法。它的定义如下:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
thenAccept
方法接收一个 Consumer
类型的参数 action
,Consumer
是一个函数式接口,它只有一个参数,即异步任务的结果(如果任务成功完成),并且没有返回值。当异步任务成功完成时,action
将会被执行,并且 thenAccept
方法返回的新的 CompletableFuture 的结果是 null
。
与 whenComplete
方法不同的是,thenAccept
方法只能处理任务正常完成的情况,它无法处理任务异常完成的情况。而且,thenAccept
方法会改变原始 CompletableFuture 的结果,返回一个新的 CompletableFuture,其结果是 null
。
下面是一个示例代码,展示了 thenAccept
方法的用法:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenAcceptExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "任务完成");
CompletableFuture<Void> newFuture = future.thenAccept(result -> System.out.println("任务结果: " + result));
newFuture.get();
System.out.println("新的 CompletableFuture 的结果: " + newFuture.join());
}
}
在这个示例中,我们使用 CompletableFuture.supplyAsync
方法创建了一个异步任务,返回一个字符串 "任务完成"。然后,我们调用 thenAccept
方法,打印任务的结果,并通过 get
方法获取新的 CompletableFuture 的结果(这里是 null
)。
4.3 whenComplete 与 handle
handle
方法也是 CompletableFuture 提供的一个用于处理任务完成情况的方法。它的定义如下:
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable,? extends U> fn)
handle
方法接收一个 BiFunction
类型的参数 fn
,BiFunction
是一个函数式接口,它有两个参数:第一个参数是异步任务的结果(如果任务成功完成),第二个参数是任务执行过程中抛出的异常(如果任务异常完成),并返回一个新的结果。当异步任务完成(无论是正常完成还是异常完成)时,fn
将会被执行,并且 handle
方法返回的新的 CompletableFuture 的结果就是 fn
的返回值。
与 whenComplete
方法不同的是,handle
方法会改变原始 CompletableFuture 的结果,返回一个新的 CompletableFuture,其结果是 fn
的返回值。而 whenComplete
方法不会改变原始 CompletableFuture 的结果,它只是在原始 CompletableFuture 完成时执行指定的操作。
下面是一个示例代码,展示了 handle
方法的用法:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class HandleExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("任务失败");
}
return "任务成功";
});
CompletableFuture<String> newFuture = future.handle((result, ex) -> {
if (ex == null) {
return "正常结果: " + result;
} else {
return "异常结果: " + ex.getMessage();
}
});
String newResult = newFuture.get();
System.out.println("新的结果: " + newResult);
}
}
在这个示例中,我们使用 CompletableFuture.supplyAsync
方法创建了一个异步任务,该任务可能会失败。然后,我们调用 handle
方法,根据任务的完成情况返回不同的结果,并通过 get
方法获取新的 CompletableFuture 的结果。
5. whenComplete 方法的注意事项
5.1 异常处理
在 whenComplete
方法中,我们可以通过 BiConsumer
的第二个参数来捕获异步任务执行过程中抛出的异常。但是需要注意的是,whenComplete
方法本身并不会抛出异常,即使 BiConsumer
中抛出了异常,也不会影响原始 CompletableFuture 的结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class WhenCompleteExceptionHandlingExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("任务失败");
});
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("捕获到异常: " + ex.getMessage());
// 这里抛出的异常不会影响原始 CompletableFuture 的结果
throw new RuntimeException("处理异常时抛出的异常");
}
});
try {
String result = future.get();
} catch (ExecutionException e) {
System.out.println("通过 get 方法捕获到的异常: " + e.getCause().getMessage());
}
}
}
在这个示例中,异步任务抛出了一个异常,我们在 whenComplete
方法中捕获到了这个异常,并在处理异常时又抛出了一个新的异常。但是,通过 get
方法获取异步任务的结果时,捕获到的仍然是原始异步任务抛出的异常,而不是 whenComplete
方法中处理异常时抛出的异常。
5.2 线程模型
whenComplete
方法默认是在执行异步任务的线程中执行 BiConsumer
。这意味着如果异步任务执行时间较长,BiConsumer
的执行也会被延迟。如果我们希望 BiConsumer
在另一个线程中执行,可以使用 whenCompleteAsync
方法。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class WhenCompleteThreadModelExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务完成";
});
future.whenCompleteAsync((result, ex) -> {
if (ex == null) {
System.out.println("任务正常完成,结果是: " + result + ",当前线程: " + Thread.currentThread().getName());
} else {
System.out.println("任务异常完成,异常信息是: " + ex.getMessage());
}
}, executor);
String result = future.get();
System.out.println("通过 get 方法获取的结果: " + result);
executor.shutdown();
}
}
在这个示例中,我们使用 whenCompleteAsync
方法,并传入一个自定义的线程池 executor
。这样,BiConsumer
将会在自定义线程池中执行,而不会阻塞执行异步任务的线程。
6. 总结
whenComplete
方法是 Java CompletableFuture 中一个非常实用的方法,它可以帮助我们全面监控异步任务的状态,无论是任务正常完成还是异常完成,都能进行相应的处理。通过 whenComplete
方法,我们可以实现日志记录、资源清理、错误处理和重试等功能,提高异步编程的可靠性和灵活性。
在使用 whenComplete
方法时,需要注意异常处理和线程模型等问题,避免出现意想不到的结果。同时,与 thenApply
、thenAccept
、handle
等相关方法进行对比,可以更好地理解 whenComplete
方法的特点和适用场景。
总之,熟练掌握 whenComplete
方法的用法,对于编写高效、可靠的异步应用程序非常重要。希望通过本文的介绍和示例代码,读者能够对 whenComplete
方法有更深入的理解和应用。