MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture handle灵活处理异常与结果的策略

2022-01-071.9k 阅读

Java CompletableFuture handle 方法概述

在Java的异步编程中,CompletableFuture 是一个强大的工具,它允许我们以一种更简洁、更灵活的方式处理异步操作。CompletableFuturehandle 方法在处理异步任务的结果和异常方面提供了独特的功能。

handle 方法接收一个 BiFunction 作为参数,这个 BiFunction 会在 CompletableFuture 完成(无论是正常完成还是因异常完成)时被调用。它的签名如下:

<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable,? extends U> fn);

其中,T 是当前 CompletableFuture 的结果类型,U 是新生成的 CompletableFuture 的结果类型。BiFunction 的第一个参数是当前 CompletableFuture 的结果(如果任务正常完成),第二个参数是异常(如果任务因异常完成)。根据任务的执行情况,BiFunction 可以返回一个新的值,这个值会成为新的 CompletableFuture 的结果。

正常完成时的处理

简单示例

假设我们有一个异步任务,它模拟从数据库中获取用户信息。以下是使用 CompletableFuturehandle 方法处理正常结果的示例代码:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureHandleExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟数据库查询操作
            return "User Information";
        });

        CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
            if (ex == null) {
                return "Processed: " + result;
            } else {
                ex.printStackTrace();
                return "Error occurred";
            }
        });

        handledFuture.thenAccept(System.out::println);
    }
}

在上述代码中,CompletableFuture.supplyAsync 创建了一个异步任务,该任务返回一个字符串,表示从数据库获取的用户信息。然后,handle 方法对这个异步任务的结果进行处理。如果任务正常完成(ex == null),则在结果前加上 "Processed: " 并返回。如果发生异常,则打印异常堆栈跟踪信息并返回 "Error occurred"。最后,thenAccept 方法用于消费最终的结果并打印到控制台。

复杂业务处理

在实际应用中,我们可能需要对正常结果进行更复杂的业务处理。例如,我们从数据库获取用户信息后,需要根据用户信息调用另一个服务进行权限验证。以下是示例代码:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureHandleComplexExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟数据库查询操作
            return "User Information";
        });

        CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
            if (ex == null) {
                // 假设这里有一个权限验证服务
                boolean hasPermission = checkPermission(result);
                if (hasPermission) {
                    return "Authorized: " + result;
                } else {
                    return "Not Authorized";
                }
            } else {
                ex.printStackTrace();
                return "Error occurred";
            }
        });

        handledFuture.thenAccept(System.out::println);
    }

    private static boolean checkPermission(String userInfo) {
        // 模拟权限验证逻辑
        return userInfo.contains("admin");
    }
}

在这个示例中,handle 方法内部调用了 checkPermission 方法来验证用户权限。如果用户有权限,则在结果前加上 "Authorized: ";如果没有权限,则返回 "Not Authorized"。这样,我们可以在 handle 方法中根据正常的结果进行复杂的业务逻辑处理。

异常处理策略

捕获并返回默认值

当异步任务发生异常时,我们可以通过 handle 方法捕获异常并返回一个默认值。这在很多情况下非常有用,例如在缓存失效时,我们可以返回一个默认的缓存值,而不是让整个系统因异常而崩溃。以下是示例代码:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureHandleDefaultValueExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟可能会抛出异常的操作
            if (Math.random() < 0.5) {
                throw new RuntimeException("Simulated Exception");
            }
            return "Normal Result";
        });

        CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
            if (ex == null) {
                return result;
            } else {
                System.out.println("Exception caught: " + ex.getMessage());
                return "Default Value";
            }
        });

        handledFuture.thenAccept(System.out::println);
    }
}

在上述代码中,CompletableFuture.supplyAsync 中的异步任务有 50% 的概率抛出异常。handle 方法捕获到异常后,打印异常信息并返回 "Default Value"。这样,即使异步任务出现异常,我们也能得到一个有意义的返回值,而不会导致程序异常终止。

异常转换与处理

有时候,我们可能希望将捕获到的异常转换为另一种类型的异常,或者对异常进行更复杂的处理。例如,我们可能希望将一个特定的业务异常转换为更通用的系统异常,以便上层调用者能够统一处理。以下是示例代码:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureHandleExceptionTransformationExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟可能会抛出业务异常的操作
            if (Math.random() < 0.5) {
                throw new BusinessException("Business Logic Error");
            }
            return "Normal Result";
        });

        CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
            if (ex == null) {
                return result;
            } else if (ex instanceof BusinessException) {
                // 将业务异常转换为系统异常
                throw new SystemException("System Error due to: " + ex.getMessage());
            } else {
                ex.printStackTrace();
                return "Error occurred";
            }
        });

        handledFuture.exceptionally(ex -> {
            System.out.println("Final Exception caught: " + ex.getMessage());
            return "Default Value";
        }).thenAccept(System.out::println);
    }
}

class BusinessException extends RuntimeException {
    public BusinessException(String message) {
        super(message);
    }
}

class SystemException extends RuntimeException {
    public SystemException(String message) {
        super(message);
    }
}

在这个示例中,CompletableFuture.supplyAsync 中的异步任务有 50% 的概率抛出 BusinessExceptionhandle 方法捕获到 BusinessException 后,将其转换为 SystemException 并重新抛出。最后,通过 exceptionally 方法捕获最终的异常,并返回一个默认值。这样,我们可以在 handle 方法中灵活地处理和转换异常,以满足不同层次的异常处理需求。

链式调用与组合处理

多个 CompletableFuture 链式处理

CompletableFuturehandle 方法支持链式调用,这使得我们可以对多个异步任务进行顺序处理。例如,我们有一个任务是从数据库获取用户ID,另一个任务是根据用户ID获取用户详细信息。以下是示例代码:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureChainedHandleExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> userIdFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟从数据库获取用户ID
            return 123;
        });

        CompletableFuture<String> userInfoFuture = userIdFuture.handle((userId, ex) -> {
            if (ex == null) {
                return "User ID: " + userId;
            } else {
                ex.printStackTrace();
                return "Error getting user ID";
            }
        }).thenApplyAsync(result -> {
            if (result.startsWith("User ID")) {
                int userId = Integer.parseInt(result.split(": ")[1]);
                // 模拟根据用户ID获取用户详细信息
                return "User Details for ID " + userId;
            } else {
                return result;
            }
        });

        userInfoFuture.thenAccept(System.out::println);
    }
}

在上述代码中,userIdFuture 首先获取用户ID。然后,handle 方法对获取用户ID的结果进行处理,如果成功则返回包含用户ID的字符串。接着,thenApplyAsync 方法根据这个字符串提取用户ID,并再次发起异步操作获取用户详细信息。通过这种链式调用,我们可以将多个异步任务按顺序组合起来,并在每个步骤中灵活处理结果和异常。

组合多个 CompletableFuture 的结果

有时候,我们需要组合多个 CompletableFuture 的结果。例如,我们有一个任务获取用户的基本信息,另一个任务获取用户的权限信息,我们希望将这两个结果组合起来。以下是示例代码:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureCombinedHandleExample {
    public static void main(String[] args) {
        CompletableFuture<String> basicInfoFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟获取用户基本信息
            return "Basic Information";
        });

        CompletableFuture<String> permissionInfoFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟获取用户权限信息
            return "Permission Information";
        });

        CompletableFuture<String> combinedFuture = CompletableFuture.allOf(basicInfoFuture, permissionInfoFuture)
              .thenApplyAsync(v -> {
                    String basicInfo = basicInfoFuture.join();
                    String permissionInfo = permissionInfoFuture.join();
                    return basicInfo + " - " + permissionInfo;
                })
              .handle((result, ex) -> {
                    if (ex == null) {
                        return result;
                    } else {
                        ex.printStackTrace();
                        return "Error combining information";
                    }
                });

        combinedFuture.thenAccept(System.out::println);
    }
}

在这个示例中,basicInfoFuturepermissionInfoFuture 分别异步获取用户的基本信息和权限信息。CompletableFuture.allOf 方法等待这两个任务都完成。然后,thenApplyAsync 方法将两个任务的结果组合起来。最后,handle 方法处理组合结果或可能出现的异常。通过这种方式,我们可以灵活地组合多个异步任务的结果,并在最后一步统一处理异常。

性能与资源管理

线程池的合理使用

在使用 CompletableFuture 进行异步编程时,合理使用线程池对于性能和资源管理至关重要。默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 来执行异步任务。然而,在某些情况下,这可能不是最优的选择。例如,如果我们有大量的I/O密集型任务,使用一个专门的线程池可能会提高性能。以下是如何使用自定义线程池的示例代码:

import java.util.concurrent.*;

public class CompletableFutureThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟I/O密集型操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result";
        }, executor);

        CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
            if (ex == null) {
                return "Processed: " + result;
            } else {
                ex.printStackTrace();
                return "Error occurred";
            }
        });

        handledFuture.thenAccept(System.out::println);

        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在上述代码中,我们创建了一个固定大小为10的线程池 executor,并将其作为参数传递给 supplyAsync 方法。这样,异步任务将在这个自定义线程池中执行。在程序结束时,我们正确地关闭线程池,以确保所有任务完成并释放资源。

避免不必要的阻塞

在处理 CompletableFuture 时,应尽量避免不必要的阻塞操作。例如,joinget 方法会阻塞当前线程直到 CompletableFuture 完成。如果在异步任务的处理链中频繁使用这些方法,可能会导致性能问题和死锁。相反,我们应该使用 thenApplythenAccepthandle 等非阻塞方法来处理结果和异常。以下是一个错误使用阻塞方法的示例:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureBlockingErrorExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟异步操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result";
        });

        try {
            String result = future.get(); // 阻塞当前线程
            System.out.println("Processed: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,future.get() 会阻塞主线程,直到异步任务完成。这违背了异步编程的初衷,可能会导致性能问题。正确的做法是使用非阻塞方法,如下所示:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureNonBlockingExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟异步操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result";
        });

        CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
            if (ex == null) {
                return "Processed: " + result;
            } else {
                ex.printStackTrace();
                return "Error occurred";
            }
        });

        handledFuture.thenAccept(System.out::println);
    }
}

在这个修正后的示例中,我们使用 handlethenAccept 方法来处理异步任务的结果,避免了阻塞主线程,从而提高了程序的性能和响应性。

与其他异步框架的对比

与 Future 的对比

在Java 8 引入 CompletableFuture 之前,Future 是处理异步任务的主要方式。然而,Future 存在一些局限性。例如,Future 只能通过 get 方法获取结果,这会阻塞当前线程,并且它没有提供直接处理异常的便捷方式。相比之下,CompletableFuture 提供了更丰富的方法来处理结果和异常,并且支持非阻塞的链式调用。以下是一个简单的对比示例:

import java.util.concurrent.*;

public class FutureVsCompletableFutureExample {
    public static void main(String[] args) {
        // 使用 Future
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(() -> {
            // 模拟异步操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from Future";
        });

        try {
            String result = future.get(); // 阻塞当前线程
            System.out.println("Future Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }

        // 使用 CompletableFuture
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟异步操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from CompletableFuture";
        });

        CompletableFuture<String> handledFuture = completableFuture.handle((result, ex) -> {
            if (ex == null) {
                return "Processed: " + result;
            } else {
                ex.printStackTrace();
                return "Error occurred";
            }
        });

        handledFuture.thenAccept(System.out::println);
    }
}

在上述代码中,使用 Future 时需要通过 get 方法阻塞获取结果,并且异常处理相对繁琐。而使用 CompletableFuture,我们可以通过 handle 方法灵活处理结果和异常,并且不需要阻塞主线程。

与 RxJava 的对比

RxJava 是一个流行的响应式编程框架,它也提供了强大的异步处理能力。与 CompletableFuture 相比,RxJava 更侧重于数据流的处理和事件驱动的编程模型。CompletableFuture 则更简洁,适合处理简单的异步任务和结果/异常处理。例如,在处理单个异步任务并处理结果和异常时,CompletableFuture 可能更直观:

import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;

public class CompletableFutureVsRxJavaExample {
    public static void main(String[] args) {
        // 使用 CompletableFuture
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟异步操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from CompletableFuture";
        });

        CompletableFuture<String> handledFuture = completableFuture.handle((result, ex) -> {
            if (ex == null) {
                return "Processed: " + result;
            } else {
                ex.printStackTrace();
                return "Error occurred";
            }
        });

        handledFuture.thenAccept(System.out::println);

        // 使用 RxJava
        Single<String> single = Single.fromCallable(() -> {
            // 模拟异步操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from RxJava";
        }).subscribeOn(Schedulers.io());

        single.subscribe(result -> System.out.println("Processed: " + result),
                ex -> ex.printStackTrace());
    }
}

在上述代码中,CompletableFuture 通过 handle 方法简洁地处理了异步任务的结果和异常。而在 RxJava 中,我们需要使用 Single 类,并通过 subscribe 方法分别处理结果和异常。虽然 RxJava 提供了更强大的数据流操作能力,但对于简单的异步任务处理,CompletableFuture 可能更容易理解和使用。

实际应用场景

微服务架构中的异步调用

在微服务架构中,服务之间的调用通常是异步的。CompletableFuturehandle 方法可以用于处理这些异步调用的结果和异常。例如,一个订单服务可能需要调用库存服务和支付服务。以下是一个简化的示例代码:

import java.util.concurrent.CompletableFuture;

public class MicroserviceAsyncCallExample {
    public static void main(String[] args) {
        CompletableFuture<String> inventoryFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟调用库存服务
            if (Math.random() < 0.5) {
                throw new RuntimeException("Inventory service error");
            }
            return "Inventory check passed";
        });

        CompletableFuture<String> paymentFuture = CompletableFuture.supplyAsync(() -> {
            // 模拟调用支付服务
            if (Math.random() < 0.5) {
                throw new RuntimeException("Payment service error");
            }
            return "Payment processed";
        });

        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(inventoryFuture, paymentFuture)
              .thenApplyAsync(v -> {
                    String inventoryResult = inventoryFuture.join();
                    String paymentResult = paymentFuture.join();
                    return inventoryResult + " - " + paymentResult;
                })
              .handle((result, ex) -> {
                    if (ex == null) {
                        System.out.println("Order processed successfully: " + result);
                    } else {
                        ex.printStackTrace();
                        System.out.println("Order processing failed");
                    }
                    return null;
                });

        combinedFuture.join();
    }
}

在这个示例中,inventoryFuturepaymentFuture 分别模拟调用库存服务和支付服务。CompletableFuture.allOf 等待两个服务调用完成,然后 handle 方法处理最终的结果或异常。通过这种方式,我们可以在微服务架构中有效地处理异步调用,并统一处理异常。

批处理任务中的异常处理

在批处理任务中,我们可能需要处理多个任务,并且希望在某个任务出现异常时能够灵活处理。CompletableFuturehandle 方法可以满足这一需求。例如,我们有一个任务是批量上传文件到云存储。以下是示例代码:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class BatchTaskExceptionHandlingExample {
    public static void main(String[] args) {
        List<String> fileNames = List.of("file1.txt", "file2.txt", "file3.txt");
        List<CompletableFuture<String>> futures = new ArrayList<>();

        for (String fileName : fileNames) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                // 模拟文件上传操作
                if (Math.random() < 0.5) {
                    throw new RuntimeException("Upload error for " + fileName);
                }
                return "Uploaded " + fileName;
            });
            futures.add(future);
        }

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        allFutures.thenApplyAsync(v -> {
            StringBuilder result = new StringBuilder();
            for (CompletableFuture<String> future : futures) {
                try {
                    result.append(future.get()).append("\n");
                } catch (Exception e) {
                    result.append("Error: ").append(e.getMessage()).append("\n");
                }
            }
            return result.toString();
        }).handle((result, ex) -> {
            if (ex == null) {
                System.out.println("Batch upload results:\n" + result);
            } else {
                ex.printStackTrace();
                System.out.println("Batch upload failed");
            }
            return null;
        }).join();
    }
}

在上述代码中,我们为每个文件创建一个 CompletableFuture 来模拟文件上传。CompletableFuture.allOf 等待所有上传任务完成。然后,通过 handle 方法处理所有任务的结果或异常。这样,我们可以在批处理任务中灵活地处理每个任务的异常,并汇总结果。

总结

CompletableFuturehandle 方法在Java异步编程中是一个非常强大的工具。它提供了灵活的方式来处理异步任务的结果和异常,无论是在简单的单个任务处理,还是复杂的链式调用和组合处理中都表现出色。通过合理使用 handle 方法,结合线程池管理和避免阻塞操作,我们可以编写高效、可靠的异步程序。与其他异步框架相比,CompletableFuture 具有简洁、直观的特点,特别适合处理简单到中等复杂度的异步任务。在实际应用中,如微服务架构和批处理任务,CompletableFuturehandle 方法能够有效地处理异步调用和异常,提高系统的稳定性和性能。掌握 CompletableFuturehandle 方法对于Java开发者来说是提升异步编程能力的重要一步。