Java 中 CompletableFuture 任务完成回调 whenComplete 方法
CompletableFuture 概述
在Java 8引入的CompletableFuture
类为异步编程提供了强大的支持。它不仅允许我们以异步方式执行任务,还提供了丰富的方法来处理任务的完成、错误处理以及结果组合等。CompletableFuture
代表一个异步计算的结果,这意味着我们可以在任务完成后获取其结果,而无需阻塞当前线程等待任务执行完毕。
CompletableFuture
实现了Future
接口,这是Java早期用于异步计算的接口。与传统的Future
相比,CompletableFuture
更加灵活和强大,它允许我们以链式调用的方式编写异步代码,使得异步编程更加直观和可读。例如,我们可以轻松地将多个异步任务串联起来,一个任务的结果作为另一个任务的输入,而不必像传统方式那样手动管理线程和同步。
whenComplete 方法的基本概念
whenComplete
方法是CompletableFuture
类提供的用于任务完成回调的方法之一。当CompletableFuture
代表的异步任务完成(无论是正常完成还是因异常而完成)时,whenComplete
方法中指定的回调函数将会被执行。它的基本签名如下:
CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
这里的action
是一个BiConsumer
,它接收两个参数:第一个参数是异步任务正常完成时的结果(如果任务正常完成,result
不为null
,exception
为null
),第二个参数是任务执行过程中抛出的异常(如果任务因异常而完成,exception
不为null
,result
为null
)。无论任务是成功还是失败,whenComplete
的回调都会被执行。
whenComplete 方法的返回值
whenComplete
方法返回一个新的CompletableFuture
,这个新的CompletableFuture
在原始CompletableFuture
完成时也会完成,并且其结果与原始CompletableFuture
相同。也就是说,我们可以在链式调用中继续对这个返回的CompletableFuture
进行操作。例如:
CompletableFuture.supplyAsync(() -> "Hello")
.whenComplete((result, exception) -> {
if (exception == null) {
System.out.println("Task completed successfully: " + result);
} else {
System.out.println("Task failed: " + exception.getMessage());
}
})
.thenApply(String::toUpperCase)
.thenAccept(System.out::println);
在上述代码中,首先通过supplyAsync
创建一个异步任务,返回字符串“Hello”。whenComplete
回调在任务完成时打印任务的完成状态。然后通过thenApply
将结果转换为大写,最后通过thenAccept
打印最终结果。
whenComplete 方法的使用场景
结果处理与日志记录
whenComplete
方法非常适合用于记录任务的执行结果或错误信息。例如,在一个Web应用中,我们可能有一个异步任务来查询数据库获取用户信息。当任务完成时,我们可以使用whenComplete
记录任务是否成功以及获取到的用户信息。
CompletableFuture<User> getUserFuture = CompletableFuture.supplyAsync(() -> {
// 模拟数据库查询
if (Math.random() > 0.5) {
return new User("John", 30);
} else {
throw new RuntimeException("Database query failed");
}
});
getUserFuture.whenComplete((user, exception) -> {
if (exception == null) {
System.out.println("User retrieved successfully: " + user);
} else {
System.out.println("Error retrieving user: " + exception.getMessage());
}
});
在这个例子中,supplyAsync
模拟了一个可能成功或失败的数据库查询任务。whenComplete
回调根据任务的完成状态记录相应的信息。
资源清理
在异步任务执行过程中,可能会涉及到一些资源的获取,比如数据库连接、文件句柄等。当任务完成后,无论成功与否,都需要清理这些资源。whenComplete
方法可以方便地实现这一点。
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
Connection connection = null;
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password");
// 执行数据库操作
Statement statement = connection.createStatement();
statement.executeUpdate("INSERT INTO users (name, age) VALUES ('Jane', 25)");
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
});
task.whenComplete((result, exception) -> {
if (exception != null) {
System.out.println("Database operation failed: " + exception.getMessage());
} else {
System.out.println("Database operation completed successfully");
}
});
在上述代码中,runAsync
方法执行一个数据库插入操作。在任务执行过程中获取数据库连接,任务完成后,无论是否成功,whenComplete
回调都会打印相应的状态信息。同时,在任务内部通过finally
块确保数据库连接被关闭。
异步任务链的中间处理
在构建复杂的异步任务链时,whenComplete
可以用于在任务链的中间阶段对结果进行处理或记录,而不影响任务链的后续执行。
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(String::toUpperCase)
.whenComplete((result, exception) -> {
System.out.println("Intermediate result: " + result);
})
.thenApply(result -> result + " World")
.thenAccept(System.out::println);
在这个例子中,whenComplete
回调在thenApply
将字符串转换为大写后执行,打印中间结果。然后任务链继续执行,将结果与“ World”拼接并最终打印。
whenComplete 方法与异常处理
捕获任务中的异常
whenComplete
方法的回调函数中可以捕获异步任务执行过程中抛出的异常。这使得我们可以在任务完成后统一处理异常,而不必在任务执行的代码块中分散处理异常。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
return 10;
} else {
throw new RuntimeException("Task failed");
}
});
future.whenComplete((result, exception) -> {
if (exception != null) {
System.out.println("Caught exception: " + exception.getMessage());
} else {
System.out.println("Result: " + result);
}
});
在上述代码中,supplyAsync
创建的异步任务有一定概率抛出异常。whenComplete
回调能够捕获并处理这个异常,打印相应的错误信息。
异常处理与任务链的延续
当whenComplete
捕获到异常时,我们仍然可以通过返回的CompletableFuture
继续构建任务链。例如,我们可以在异常发生时执行一些备用操作。
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
return 10;
} else {
throw new RuntimeException("Task failed");
}
})
.whenComplete((result, exception) -> {
if (exception != null) {
System.out.println("Task failed, performing fallback");
}
})
.exceptionally(ex -> {
// 备用操作,返回一个默认值
return -1;
})
.thenAccept(System.out::println);
在这个例子中,whenComplete
回调在任务失败时打印提示信息。exceptionally
方法在捕获到异常时执行备用操作,返回一个默认值-1
,并继续任务链的执行,最终将结果打印出来。
whenComplete 方法的线程模型
回调执行的线程
whenComplete
方法的回调函数默认在执行完成CompletableFuture
任务的线程中执行。例如,如果CompletableFuture
是通过supplyAsync
创建的,并且任务在ForkJoinPool.commonPool()
线程池中执行,那么whenComplete
的回调也会在这个线程池中执行。
CompletableFuture.supplyAsync(() -> {
System.out.println("Task is running in thread: " + Thread.currentThread().getName());
return "Result";
})
.whenComplete((result, exception) -> {
System.out.println("Callback is running in thread: " + Thread.currentThread().getName());
});
在上述代码中,任务和回调都打印当前执行的线程名。运行代码可以发现,任务和回调通常在同一个线程中执行(除非线程池中的线程被复用或其他特殊情况)。
使用自定义线程池
如果我们希望whenComplete
的回调在特定的线程池中执行,可以使用whenCompleteAsync
方法。whenCompleteAsync
方法有两个重载版本,一个接受Executor
参数,另一个不接受参数(不接受参数时默认使用ForkJoinPool.commonPool()
)。
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
System.out.println("Task is running in thread: " + Thread.currentThread().getName());
return "Result";
})
.whenCompleteAsync((result, exception) -> {
System.out.println("Callback is running in thread: " + Thread.currentThread().getName());
}, executor);
executor.shutdown();
在这个例子中,whenCompleteAsync
方法的回调在我们自定义的Executor
线程池中执行。通过打印线程名可以看到任务和回调在不同的线程中执行。
whenComplete 方法与其他 CompletableFuture 方法的组合使用
与 thenApply 组合
whenComplete
可以与thenApply
方法组合使用,先通过whenComplete
对任务结果进行记录或简单处理,然后通过thenApply
对结果进行进一步的转换。
CompletableFuture.supplyAsync(() -> "hello")
.whenComplete((result, exception) -> {
System.out.println("Original result: " + result);
})
.thenApply(String::toUpperCase)
.thenAccept(System.out::println);
在这个例子中,whenComplete
先打印原始结果,然后thenApply
将结果转换为大写并打印。
与 thenAccept 组合
whenComplete
和thenAccept
也可以组合使用。whenComplete
用于处理任务完成的通用逻辑,thenAccept
用于对最终结果进行消费。
CompletableFuture.supplyAsync(() -> 10)
.whenComplete((result, exception) -> {
if (exception == null) {
System.out.println("Task completed with result: " + result);
} else {
System.out.println("Task failed: " + exception.getMessage());
}
})
.thenAccept(result -> System.out.println("Final result: " + result));
在这个例子中,whenComplete
打印任务完成状态,thenAccept
打印最终结果。
与 thenCompose 组合
whenComplete
与thenCompose
组合可以用于构建复杂的异步任务链,其中一个任务的结果作为另一个异步任务的输入,并且在中间过程中可以使用whenComplete
进行结果记录或处理。
CompletableFuture.supplyAsync(() -> "input")
.whenComplete((result, exception) -> {
System.out.println("First task result: " + result);
})
.thenCompose(input -> CompletableFuture.supplyAsync(() -> {
System.out.println("Processing input: " + input);
return input + " processed";
}))
.thenAccept(System.out::println);
在这个例子中,第一个任务返回“input”,whenComplete
打印第一个任务的结果。然后thenCompose
将这个结果作为输入启动另一个异步任务,对输入进行处理并返回新的结果,最后打印最终结果。
实际应用案例分析
电商系统中的库存查询与订单处理
假设我们正在开发一个电商系统,当用户下单时,需要先查询库存是否足够。库存查询是一个异步操作,因为它可能涉及到与外部库存系统的交互。如果库存足够,则创建订单。
CompletableFuture<Boolean> checkStockFuture = CompletableFuture.supplyAsync(() -> {
// 模拟库存查询
return Math.random() > 0.5;
});
CompletableFuture<Void> orderFuture = checkStockFuture
.whenComplete((stockAvailable, exception) -> {
if (exception != null) {
System.out.println("Stock check failed: " + exception.getMessage());
} else {
System.out.println("Stock check result: " + stockAvailable);
}
})
.thenCompose(stockAvailable -> {
if (stockAvailable) {
return CompletableFuture.runAsync(() -> {
System.out.println("Creating order...");
// 实际的订单创建逻辑
});
} else {
return CompletableFuture.runAsync(() -> {
System.out.println("Insufficient stock, cannot create order");
});
}
});
orderFuture.join();
在这个例子中,checkStockFuture
模拟库存查询。whenComplete
在库存查询完成时打印查询结果或错误信息。然后thenCompose
根据库存查询结果决定是否创建订单,并通过runAsync
异步执行相应的操作。
数据分析系统中的数据聚合与报告生成
在一个数据分析系统中,我们可能需要从多个数据源获取数据,然后对这些数据进行聚合,最后生成报告。每个步骤都可以是异步操作。
CompletableFuture<List<Integer>> dataSource1Future = CompletableFuture.supplyAsync(() -> {
// 模拟从数据源1获取数据
return Arrays.asList(1, 2, 3);
});
CompletableFuture<List<Integer>> dataSource2Future = CompletableFuture.supplyAsync(() -> {
// 模拟从数据源2获取数据
return Arrays.asList(4, 5, 6);
});
CompletableFuture<Integer> aggregateFuture = CompletableFuture.allOf(dataSource1Future, dataSource2Future)
.thenApply(v -> {
List<Integer> combinedData = new ArrayList<>();
combinedData.addAll(dataSource1Future.join());
combinedData.addAll(dataSource2Future.join());
return combinedData.stream().mapToInt(Integer::intValue).sum();
})
.whenComplete((sum, exception) -> {
if (exception == null) {
System.out.println("Aggregated sum: " + sum);
} else {
System.out.println("Aggregation failed: " + exception.getMessage());
}
});
CompletableFuture<Void> reportFuture = aggregateFuture
.thenAccept(sum -> {
System.out.println("Generating report with sum: " + sum);
// 实际的报告生成逻辑
});
reportFuture.join();
在这个例子中,dataSource1Future
和dataSource2Future
分别模拟从两个数据源获取数据。allOf
方法等待两个数据源的数据都获取完成。然后thenApply
对数据进行聚合,whenComplete
在聚合完成时打印结果或错误信息。最后thenAccept
根据聚合结果生成报告。
注意事项
避免阻塞回调线程
在whenComplete
回调中,应避免执行长时间阻塞的操作,因为这可能会影响线程池的性能。如果需要执行阻塞操作,建议将其放在另一个异步任务中。
CompletableFuture.supplyAsync(() -> "result")
.whenComplete((result, exception) -> {
// 错误做法,阻塞线程
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Callback with result: " + result);
});
上述代码中在whenComplete
回调中使用Thread.sleep
阻塞线程5秒,这是不合适的。可以将阻塞操作放在另一个异步任务中:
CompletableFuture.supplyAsync(() -> "result")
.whenComplete((result, exception) -> {
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Callback with result: " + result);
});
});
内存泄漏风险
如果在whenComplete
回调中持有对外部资源(如数据库连接、文件句柄等)的强引用,并且没有正确释放这些资源,可能会导致内存泄漏。确保在任务完成后及时清理这些资源。
class ResourceHolder {
private Connection connection;
public ResourceHolder() {
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password");
} catch (SQLException e) {
e.printStackTrace();
}
}
public void doTask() {
CompletableFuture.runAsync(() -> {
// 执行任务
})
.whenComplete((result, exception) -> {
// 没有释放connection,可能导致内存泄漏
if (exception != null) {
System.out.println("Task failed: " + exception.getMessage());
} else {
System.out.println("Task completed");
}
});
}
public void close() {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
在上述代码中,ResourceHolder
类在whenComplete
回调中没有释放数据库连接,可能导致内存泄漏。应该在whenComplete
回调中或者在合适的地方调用close
方法释放连接。
异常处理的完备性
虽然whenComplete
可以捕获任务执行过程中的异常,但在复杂的任务链中,确保所有可能的异常都被妥善处理是很重要的。特别是在与其他CompletableFuture
方法组合使用时,要注意异常处理的连续性。
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
return "result";
} else {
throw new RuntimeException("Task failed");
}
})
.whenComplete((result, exception) -> {
if (exception != null) {
System.out.println("Caught in whenComplete: " + exception.getMessage());
}
})
.thenApply(String::toUpperCase)
.thenAccept(System.out::println);
在上述代码中,如果任务在supplyAsync
中抛出异常,whenComplete
可以捕获并打印异常信息。但是,由于thenApply
没有处理异常,任务链可能会在thenApply
处中断,而没有进一步的异常处理。可以通过exceptionally
方法来确保异常在整个任务链中得到妥善处理:
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
return "result";
} else {
throw new RuntimeException("Task failed");
}
})
.whenComplete((result, exception) -> {
if (exception != null) {
System.out.println("Caught in whenComplete: " + exception.getMessage());
}
})
.exceptionally(ex -> {
System.out.println("Caught in exceptionally: " + ex.getMessage());
return "default";
})
.thenApply(String::toUpperCase)
.thenAccept(System.out::println);
在这个改进后的代码中,exceptionally
方法在whenComplete
之后捕获异常,并返回一个默认值,确保任务链可以继续执行。
通过深入理解CompletableFuture
的whenComplete
方法及其在不同场景下的应用,我们可以更加高效地编写异步代码,充分利用多核处理器的性能,提升应用程序的响应速度和并发处理能力。同时,注意避免常见的问题,如阻塞线程、内存泄漏和异常处理不当等,以确保异步代码的稳定性和可靠性。