Java 中 CompletableFuture 异步任务异常处理策略
CompletableFuture 简介
在Java 8 引入 CompletableFuture
之前,处理异步任务相对复杂。Future
接口是Java早期用于异步计算的工具,但它存在一些局限性,例如获取结果时可能会阻塞主线程,并且缺乏对异步任务完成后的链式操作和异常处理的便捷机制。
CompletableFuture
实现了 Future
和 CompletionStage
接口,它不仅提供了异步计算的能力,还允许我们以一种更灵活、更强大的方式处理异步任务的结果和异常。它支持异步任务的链式调用、组合多个异步任务以及优雅地处理异常。
CompletableFuture 创建异步任务
CompletableFuture
提供了多种静态方法来创建异步任务,常见的有 supplyAsync
和 runAsync
。
supplyAsync
supplyAsync
方法用于创建一个有返回值的异步任务。它接受一个 Supplier
作为参数,该 Supplier
定义了异步执行的逻辑,并返回计算结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟异步任务执行
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "异步任务执行完成";
});
String result = future.get();
System.out.println(result);
}
}
在上述代码中,supplyAsync
方法启动了一个异步任务,在任务中模拟了两秒的延迟,然后返回一个字符串。get
方法用于获取异步任务的结果,这会阻塞主线程直到任务完成。
runAsync
runAsync
方法用于创建一个没有返回值的异步任务。它接受一个 Runnable
作为参数,该 Runnable
定义了异步执行的逻辑。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureRunAsyncExample {
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 模拟异步任务执行
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("无返回值的异步任务执行完成");
});
}
}
此代码中,runAsync
启动了一个异步任务,任务执行两秒延迟后打印一条消息。由于 runAsync
返回的 CompletableFuture
是 Void
类型,所以不能通过 get
方法获取具体的返回值。
CompletableFuture 异常处理策略
使用 exceptionally
方法
exceptionally
方法用于在异步任务发生异常时提供一个替代结果。它接受一个 Function
作为参数,该 Function
以异常作为输入,并返回一个替代结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExceptionallyExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("模拟异常");
}
return "正常结果";
});
String result = future.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "异常时的替代结果";
}).get();
System.out.println(result);
}
}
在这段代码中,supplyAsync
定义的异步任务有50%的概率抛出异常。exceptionally
方法捕获到异常后,打印异常信息并返回一个替代结果。
使用 whenComplete
方法
whenComplete
方法用于在异步任务完成(无论成功还是失败)时执行一个回调函数。它接受一个 BiConsumer
作为参数,该 BiConsumer
接收任务的结果(如果有)和异常(如果有)作为输入。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureWhenCompleteExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("模拟异常");
}
return "正常结果";
}).whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("捕获到异常: " + ex.getMessage());
} else {
System.out.println("任务成功完成,结果: " + result);
}
});
// 防止主线程退出
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这里,whenComplete
回调函数根据 ex
是否为 null
判断任务是否成功。如果 ex
不为 null
,则说明任务发生异常,打印异常信息;否则,打印任务成功的结果。
使用 handle
方法
handle
方法结合了 whenComplete
和 exceptionally
的功能。它接受一个 BiFunction
作为参数,该 BiFunction
接收任务的结果(如果有)和异常(如果有)作为输入,并返回一个新的结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureHandleExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("模拟异常");
}
return "正常结果";
});
String result = future.handle((res, ex) -> {
if (ex != null) {
System.out.println("捕获到异常: " + ex.getMessage());
return "异常时的替代结果";
} else {
return res;
}
}).get();
System.out.println(result);
}
}
在这个例子中,handle
方法根据任务执行情况返回不同的结果。如果任务成功,返回正常结果;如果任务失败,打印异常信息并返回替代结果。
链式调用中的异常处理
简单链式调用异常处理
CompletableFuture
支持链式调用,在链式调用中,异常也能得到合理处理。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureChainingExceptionExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("模拟异常");
}
return "第一步结果";
})
.thenApply(result -> {
System.out.println("第一步结果: " + result);
return result + " 经过第二步处理";
})
.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "异常时的替代结果";
})
.thenAccept(finalResult -> System.out.println("最终结果: " + finalResult));
}
}
在这个链式调用中,supplyAsync
定义的第一步任务有50%的概率抛出异常。如果第一步任务成功,thenApply
方法会处理第一步的结果;如果第一步任务失败,exceptionally
方法捕获异常并返回替代结果,最后 thenAccept
打印最终结果。
多步链式调用异常处理
当链式调用涉及多个步骤时,异常处理同样重要。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureMultiStepChainingExceptionExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("第一步异常");
}
return "第一步结果";
})
.thenApply(result -> {
if (Math.random() > 0.5) {
throw new RuntimeException("第二步异常");
}
return result + " 经过第二步处理";
})
.thenApply(result -> {
if (Math.random() > 0.5) {
throw new RuntimeException("第三步异常");
}
return result + " 经过第三步处理";
})
.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "异常时的替代结果";
})
.thenAccept(finalResult -> System.out.println("最终结果: " + finalResult));
}
}
在这个多步链式调用中,每一步都有50%的概率抛出异常。只要任何一步抛出异常,exceptionally
方法就会捕获并处理异常,返回替代结果。
组合多个 CompletableFuture 的异常处理
allOf 方法的异常处理
CompletableFuture.allOf
方法用于等待所有给定的 CompletableFuture
都完成。如果其中任何一个 CompletableFuture
抛出异常,allOf
返回的 CompletableFuture
也会异常完成。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAllOfExceptionExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("future1 异常");
}
return "future1 结果";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("future2 异常");
}
return "future2 结果";
});
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
allOfFuture.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return null;
}).thenRun(() -> {
try {
if (!future1.isCompletedExceptionally()) {
System.out.println("future1 结果: " + future1.get());
}
if (!future2.isCompletedExceptionally()) {
System.out.println("future2 结果: " + future2.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).get();
}
}
在这个例子中,allOf
方法等待 future1
和 future2
都完成。如果其中任何一个抛出异常,exceptionally
方法会捕获异常并处理。然后通过检查每个 CompletableFuture
是否异常完成来决定是否获取并打印它们的结果。
anyOf 方法的异常处理
CompletableFuture.anyOf
方法用于等待任何一个给定的 CompletableFuture
完成。如果所有 CompletableFuture
都抛出异常,anyOf
返回的 CompletableFuture
也会异常完成。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAnyOfExceptionExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("future1 异常");
}
return "future1 结果";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("future2 异常");
}
return "future2 结果";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
anyOfFuture.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return null;
}).thenAccept(result -> {
if (result != null) {
System.out.println("最先完成的结果: " + result);
}
}).get();
}
}
这里,anyOf
方法等待 future1
或 future2
其中一个完成。如果所有任务都异常,exceptionally
方法捕获异常。如果有任务成功完成,thenAccept
打印最先完成的任务结果。
自定义异常类型处理
在实际应用中,我们可能会定义自己的异常类型。CompletableFuture
同样可以很好地处理自定义异常。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
class CustomException extends RuntimeException {
public CustomException(String message) {
super(message);
}
}
public class CompletableFutureCustomExceptionExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new CustomException("自定义异常");
}
return "正常结果";
})
.exceptionally(ex -> {
if (ex instanceof CustomException) {
System.out.println("捕获到自定义异常: " + ex.getMessage());
} else {
System.out.println("捕获到其他异常: " + ex.getMessage());
}
return "异常时的替代结果";
})
.thenAccept(result -> System.out.println("最终结果: " + result));
}
}
在这个例子中,我们定义了 CustomException
自定义异常。在 exceptionally
方法中,通过 instanceof
判断异常类型,并进行相应的处理。
异常处理与线程池的结合
当使用 CompletableFuture
时,我们可以指定自定义的线程池来执行异步任务。在这种情况下,异常处理需要注意线程池的特性。
import java.util.concurrent.*;
public class CompletableFutureThreadPoolExceptionExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("模拟异常");
}
return "正常结果";
}, executor)
.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "异常时的替代结果";
})
.thenAccept(result -> System.out.println("最终结果: " + result));
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
在这个代码中,我们创建了一个固定大小的线程池 executor
,并将其作为参数传递给 supplyAsync
方法。这样异步任务就在指定的线程池中执行。异常处理部分与之前类似,但要注意在程序结束时正确关闭线程池。
生产环境中的异常处理考量
在生产环境中,CompletableFuture
的异常处理需要更加谨慎。
日志记录
在异常处理逻辑中,应该详细记录异常信息,包括异常类型、异常消息以及异常发生的上下文。这有助于快速定位和解决问题。
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
public class CompletableFutureProductionExceptionExample {
private static final Logger LOGGER = Logger.getLogger(CompletableFutureProductionExceptionExample.class.getName());
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("模拟生产环境异常");
}
return "正常结果";
})
.exceptionally(ex -> {
LOGGER.log(Level.SEVERE, "发生异常", ex);
return "异常时的替代结果";
})
.thenAccept(result -> System.out.println("最终结果: " + result));
}
}
这里使用Java自带的日志框架 Logger
记录异常信息,在实际生产中,可能会使用更强大的日志框架如Log4j或SLF4J。
重试机制
对于一些由于临时性故障(如网络波动、资源短暂不可用等)导致的异常,可以考虑添加重试机制。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureRetryExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int maxRetries = 3;
int retryCount = 0;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5 && retryCount < maxRetries) {
retryCount++;
throw new RuntimeException("模拟可重试异常");
}
return "正常结果";
});
String result = future.exceptionally(ex -> {
if (retryCount < maxRetries) {
System.out.println("重试第 " + retryCount + " 次");
return CompletableFutureRetryExample.retry(maxRetries, retryCount).join();
} else {
System.out.println("达到最大重试次数,无法继续重试");
return "异常时的替代结果";
}
}).get();
System.out.println("最终结果: " + result);
}
private static CompletableFuture<String> retry(int maxRetries, int retryCount) {
return CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5 && retryCount < maxRetries) {
retryCount++;
throw new RuntimeException("模拟可重试异常");
}
return "重试成功结果";
});
}
}
在这个例子中,我们定义了一个简单的重试机制。如果异步任务抛出特定类型的异常且重试次数未达到最大重试次数,就进行重试。
回滚操作
在涉及到事务性操作的异步任务中,如果发生异常,需要进行回滚操作以保证数据的一致性。
import java.util.concurrent.CompletableFuture;
class Database {
public void executeTransaction() {
System.out.println("开始数据库事务");
// 模拟数据库操作
}
public void rollbackTransaction() {
System.out.println("回滚数据库事务");
}
}
public class CompletableFutureRollbackExample {
public static void main(String[] args) {
Database database = new Database();
CompletableFuture.supplyAsync(() -> {
database.executeTransaction();
if (Math.random() > 0.5) {
throw new RuntimeException("模拟事务异常");
}
return "事务成功";
})
.exceptionally(ex -> {
database.rollbackTransaction();
System.out.println("捕获到异常,回滚事务");
return "异常时的替代结果";
})
.thenAccept(result -> System.out.println("最终结果: " + result));
}
}
在这个例子中,Database
类模拟了数据库操作。如果异步任务执行过程中抛出异常,在 exceptionally
方法中调用 rollbackTransaction
方法进行回滚。
通过以上多种异常处理策略的介绍和示例代码,我们可以在使用 CompletableFuture
进行异步编程时,更有效地处理各种异常情况,确保程序的稳定性和可靠性。无论是简单的异步任务,还是复杂的链式调用和组合任务,都能通过合适的异常处理策略来应对可能出现的问题。同时,在生产环境中,结合日志记录、重试机制和回滚操作等考量,能进一步提升系统的健壮性。