MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步任务异常处理策略

2024-04-043.3k 阅读

CompletableFuture 简介

在Java 8 引入 CompletableFuture 之前,处理异步任务相对复杂。Future 接口是Java早期用于异步计算的工具,但它存在一些局限性,例如获取结果时可能会阻塞主线程,并且缺乏对异步任务完成后的链式操作和异常处理的便捷机制。

CompletableFuture 实现了 FutureCompletionStage 接口,它不仅提供了异步计算的能力,还允许我们以一种更灵活、更强大的方式处理异步任务的结果和异常。它支持异步任务的链式调用、组合多个异步任务以及优雅地处理异常。

CompletableFuture 创建异步任务

CompletableFuture 提供了多种静态方法来创建异步任务,常见的有 supplyAsyncrunAsync

supplyAsync

supplyAsync 方法用于创建一个有返回值的异步任务。它接受一个 Supplier 作为参数,该 Supplier 定义了异步执行的逻辑,并返回计算结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟异步任务执行
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "异步任务执行完成";
        });

        String result = future.get();
        System.out.println(result);
    }
}

在上述代码中,supplyAsync 方法启动了一个异步任务,在任务中模拟了两秒的延迟,然后返回一个字符串。get 方法用于获取异步任务的结果,这会阻塞主线程直到任务完成。

runAsync

runAsync 方法用于创建一个没有返回值的异步任务。它接受一个 Runnable 作为参数,该 Runnable 定义了异步执行的逻辑。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureRunAsyncExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟异步任务执行
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("无返回值的异步任务执行完成");
        });
    }
}

此代码中,runAsync 启动了一个异步任务,任务执行两秒延迟后打印一条消息。由于 runAsync 返回的 CompletableFutureVoid 类型,所以不能通过 get 方法获取具体的返回值。

CompletableFuture 异常处理策略

使用 exceptionally 方法

exceptionally 方法用于在异步任务发生异常时提供一个替代结果。它接受一个 Function 作为参数,该 Function 以异常作为输入,并返回一个替代结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExceptionallyExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("模拟异常");
            }
            return "正常结果";
        });

        String result = future.exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return "异常时的替代结果";
        }).get();

        System.out.println(result);
    }
}

在这段代码中,supplyAsync 定义的异步任务有50%的概率抛出异常。exceptionally 方法捕获到异常后,打印异常信息并返回一个替代结果。

使用 whenComplete 方法

whenComplete 方法用于在异步任务完成(无论成功还是失败)时执行一个回调函数。它接受一个 BiConsumer 作为参数,该 BiConsumer 接收任务的结果(如果有)和异常(如果有)作为输入。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureWhenCompleteExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("模拟异常");
            }
            return "正常结果";
        }).whenComplete((result, ex) -> {
            if (ex != null) {
                System.out.println("捕获到异常: " + ex.getMessage());
            } else {
                System.out.println("任务成功完成,结果: " + result);
            }
        });

        // 防止主线程退出
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这里,whenComplete 回调函数根据 ex 是否为 null 判断任务是否成功。如果 ex 不为 null,则说明任务发生异常,打印异常信息;否则,打印任务成功的结果。

使用 handle 方法

handle 方法结合了 whenCompleteexceptionally 的功能。它接受一个 BiFunction 作为参数,该 BiFunction 接收任务的结果(如果有)和异常(如果有)作为输入,并返回一个新的结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureHandleExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("模拟异常");
            }
            return "正常结果";
        });

        String result = future.handle((res, ex) -> {
            if (ex != null) {
                System.out.println("捕获到异常: " + ex.getMessage());
                return "异常时的替代结果";
            } else {
                return res;
            }
        }).get();

        System.out.println(result);
    }
}

在这个例子中,handle 方法根据任务执行情况返回不同的结果。如果任务成功,返回正常结果;如果任务失败,打印异常信息并返回替代结果。

链式调用中的异常处理

简单链式调用异常处理

CompletableFuture 支持链式调用,在链式调用中,异常也能得到合理处理。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureChainingExceptionExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("模拟异常");
            }
            return "第一步结果";
        })
       .thenApply(result -> {
            System.out.println("第一步结果: " + result);
            return result + " 经过第二步处理";
        })
       .exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return "异常时的替代结果";
        })
       .thenAccept(finalResult -> System.out.println("最终结果: " + finalResult));
    }
}

在这个链式调用中,supplyAsync 定义的第一步任务有50%的概率抛出异常。如果第一步任务成功,thenApply 方法会处理第一步的结果;如果第一步任务失败,exceptionally 方法捕获异常并返回替代结果,最后 thenAccept 打印最终结果。

多步链式调用异常处理

当链式调用涉及多个步骤时,异常处理同样重要。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureMultiStepChainingExceptionExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("第一步异常");
            }
            return "第一步结果";
        })
       .thenApply(result -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("第二步异常");
            }
            return result + " 经过第二步处理";
        })
       .thenApply(result -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("第三步异常");
            }
            return result + " 经过第三步处理";
        })
       .exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return "异常时的替代结果";
        })
       .thenAccept(finalResult -> System.out.println("最终结果: " + finalResult));
    }
}

在这个多步链式调用中,每一步都有50%的概率抛出异常。只要任何一步抛出异常,exceptionally 方法就会捕获并处理异常,返回替代结果。

组合多个 CompletableFuture 的异常处理

allOf 方法的异常处理

CompletableFuture.allOf 方法用于等待所有给定的 CompletableFuture 都完成。如果其中任何一个 CompletableFuture 抛出异常,allOf 返回的 CompletableFuture 也会异常完成。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAllOfExceptionExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("future1 异常");
            }
            return "future1 结果";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("future2 异常");
            }
            return "future2 结果";
        });

        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);

        allOfFuture.exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return null;
        }).thenRun(() -> {
            try {
                if (!future1.isCompletedExceptionally()) {
                    System.out.println("future1 结果: " + future1.get());
                }
                if (!future2.isCompletedExceptionally()) {
                    System.out.println("future2 结果: " + future2.get());
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }).get();
    }
}

在这个例子中,allOf 方法等待 future1future2 都完成。如果其中任何一个抛出异常,exceptionally 方法会捕获异常并处理。然后通过检查每个 CompletableFuture 是否异常完成来决定是否获取并打印它们的结果。

anyOf 方法的异常处理

CompletableFuture.anyOf 方法用于等待任何一个给定的 CompletableFuture 完成。如果所有 CompletableFuture 都抛出异常,anyOf 返回的 CompletableFuture 也会异常完成。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAnyOfExceptionExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("future1 异常");
            }
            return "future1 结果";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("future2 异常");
            }
            return "future2 结果";
        });

        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);

        anyOfFuture.exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return null;
        }).thenAccept(result -> {
            if (result != null) {
                System.out.println("最先完成的结果: " + result);
            }
        }).get();
    }
}

这里,anyOf 方法等待 future1future2 其中一个完成。如果所有任务都异常,exceptionally 方法捕获异常。如果有任务成功完成,thenAccept 打印最先完成的任务结果。

自定义异常类型处理

在实际应用中,我们可能会定义自己的异常类型。CompletableFuture 同样可以很好地处理自定义异常。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class CustomException extends RuntimeException {
    public CustomException(String message) {
        super(message);
    }
}

public class CompletableFutureCustomExceptionExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new CustomException("自定义异常");
            }
            return "正常结果";
        })
       .exceptionally(ex -> {
            if (ex instanceof CustomException) {
                System.out.println("捕获到自定义异常: " + ex.getMessage());
            } else {
                System.out.println("捕获到其他异常: " + ex.getMessage());
            }
            return "异常时的替代结果";
        })
       .thenAccept(result -> System.out.println("最终结果: " + result));
    }
}

在这个例子中,我们定义了 CustomException 自定义异常。在 exceptionally 方法中,通过 instanceof 判断异常类型,并进行相应的处理。

异常处理与线程池的结合

当使用 CompletableFuture 时,我们可以指定自定义的线程池来执行异步任务。在这种情况下,异常处理需要注意线程池的特性。

import java.util.concurrent.*;

public class CompletableFutureThreadPoolExceptionExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("模拟异常");
            }
            return "正常结果";
        }, executor)
       .exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return "异常时的替代结果";
        })
       .thenAccept(result -> System.out.println("最终结果: " + result));

        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在这个代码中,我们创建了一个固定大小的线程池 executor,并将其作为参数传递给 supplyAsync 方法。这样异步任务就在指定的线程池中执行。异常处理部分与之前类似,但要注意在程序结束时正确关闭线程池。

生产环境中的异常处理考量

在生产环境中,CompletableFuture 的异常处理需要更加谨慎。

日志记录

在异常处理逻辑中,应该详细记录异常信息,包括异常类型、异常消息以及异常发生的上下文。这有助于快速定位和解决问题。

import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CompletableFutureProductionExceptionExample {
    private static final Logger LOGGER = Logger.getLogger(CompletableFutureProductionExceptionExample.class.getName());

    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("模拟生产环境异常");
            }
            return "正常结果";
        })
       .exceptionally(ex -> {
            LOGGER.log(Level.SEVERE, "发生异常", ex);
            return "异常时的替代结果";
        })
       .thenAccept(result -> System.out.println("最终结果: " + result));
    }
}

这里使用Java自带的日志框架 Logger 记录异常信息,在实际生产中,可能会使用更强大的日志框架如Log4j或SLF4J。

重试机制

对于一些由于临时性故障(如网络波动、资源短暂不可用等)导致的异常,可以考虑添加重试机制。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureRetryExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int maxRetries = 3;
        int retryCount = 0;

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5 && retryCount < maxRetries) {
                retryCount++;
                throw new RuntimeException("模拟可重试异常");
            }
            return "正常结果";
        });

        String result = future.exceptionally(ex -> {
            if (retryCount < maxRetries) {
                System.out.println("重试第 " + retryCount + " 次");
                return CompletableFutureRetryExample.retry(maxRetries, retryCount).join();
            } else {
                System.out.println("达到最大重试次数,无法继续重试");
                return "异常时的替代结果";
            }
        }).get();

        System.out.println("最终结果: " + result);
    }

    private static CompletableFuture<String> retry(int maxRetries, int retryCount) {
        return CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5 && retryCount < maxRetries) {
                retryCount++;
                throw new RuntimeException("模拟可重试异常");
            }
            return "重试成功结果";
        });
    }
}

在这个例子中,我们定义了一个简单的重试机制。如果异步任务抛出特定类型的异常且重试次数未达到最大重试次数,就进行重试。

回滚操作

在涉及到事务性操作的异步任务中,如果发生异常,需要进行回滚操作以保证数据的一致性。

import java.util.concurrent.CompletableFuture;

class Database {
    public void executeTransaction() {
        System.out.println("开始数据库事务");
        // 模拟数据库操作
    }

    public void rollbackTransaction() {
        System.out.println("回滚数据库事务");
    }
}

public class CompletableFutureRollbackExample {
    public static void main(String[] args) {
        Database database = new Database();

        CompletableFuture.supplyAsync(() -> {
            database.executeTransaction();
            if (Math.random() > 0.5) {
                throw new RuntimeException("模拟事务异常");
            }
            return "事务成功";
        })
       .exceptionally(ex -> {
            database.rollbackTransaction();
            System.out.println("捕获到异常,回滚事务");
            return "异常时的替代结果";
        })
       .thenAccept(result -> System.out.println("最终结果: " + result));
    }
}

在这个例子中,Database 类模拟了数据库操作。如果异步任务执行过程中抛出异常,在 exceptionally 方法中调用 rollbackTransaction 方法进行回滚。

通过以上多种异常处理策略的介绍和示例代码,我们可以在使用 CompletableFuture 进行异步编程时,更有效地处理各种异常情况,确保程序的稳定性和可靠性。无论是简单的异步任务,还是复杂的链式调用和组合任务,都能通过合适的异常处理策略来应对可能出现的问题。同时,在生产环境中,结合日志记录、重试机制和回滚操作等考量,能进一步提升系统的健壮性。