MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture链式调用的错误处理与恢复

2024-03-133.5k 阅读

Java CompletableFuture链式调用的错误处理与恢复

在Java的异步编程中,CompletableFuture提供了强大的异步操作能力,通过链式调用可以方便地组合多个异步任务。然而,在链式调用过程中,错误处理与恢复机制至关重要,它能确保程序在面对异常时仍能保持稳定和正确的执行流程。

CompletableFuture链式调用基础

CompletableFuture允许将多个异步操作链接在一起,形成一个处理流程。例如,假设我们有一个简单的异步任务,获取用户信息后根据用户信息获取订单列表。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟获取用户信息的异步操作
            return "User Information";
        });
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息获取订单列表的异步操作
            return "Order List for " + userInfo;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = getUserInfo()
               .thenApplyAsync(CompletableFutureExample::getOrderList)
               .thenApplyAsync(result -> {
                    try {
                        return result.get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                });
        System.out.println(future.get());
    }
}

在上述代码中,getUserInfo方法返回一个CompletableFuture<String>,代表获取用户信息的异步操作。thenApplyAsync方法接收一个函数,该函数以getUserInfo的结果作为参数,并返回一个新的CompletableFuture。这里我们将获取订单列表的操作作为thenApplyAsync的参数,从而实现了两个异步操作的链式调用。

链式调用中的错误处理

在实际应用中,异步操作可能会抛出异常。例如,在获取用户信息或订单列表时,可能会因为网络问题、数据库故障等原因出现异常。CompletableFuture提供了多种方式来处理这些异常。

1. 使用exceptionally方法

exceptionally方法用于在异步操作抛出异常时提供一个替代结果。当链式调用中的某个CompletableFuture抛出异常时,exceptionally方法会被调用,它接收一个Function,该Function的参数为异常对象,返回值为替代结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureErrorHandling {
    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟可能抛出异常的获取用户信息操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Failed to get user info");
            }
            return "User Information";
        });
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息获取订单列表的异步操作
            return "Order List for " + userInfo;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = getUserInfo()
               .thenApplyAsync(CompletableFutureErrorHandling::getOrderList)
               .exceptionally(ex -> {
                    System.out.println("Caught exception: " + ex.getMessage());
                    return "Default Order List";
                });
        System.out.println(future.get());
    }
}

在上述代码中,getUserInfo方法有50%的概率抛出异常。当异常发生时,exceptionally方法中的Function会被调用,打印异常信息并返回一个默认的订单列表。

2. 使用handle方法

handle方法与exceptionally方法类似,但它无论异步操作是否成功都会被调用。它接收一个BiFunction,第一个参数为异步操作的结果(如果成功),第二个参数为异常对象(如果失败)。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureHandleExample {
    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟可能抛出异常的获取用户信息操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Failed to get user info");
            }
            return "User Information";
        });
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息获取订单列表的异步操作
            return "Order List for " + userInfo;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = getUserInfo()
               .thenApplyAsync(CompletableFutureHandleExample::getOrderList)
               .handle((result, ex) -> {
                    if (ex != null) {
                        System.out.println("Caught exception: " + ex.getMessage());
                        return "Default Order List";
                    }
                    return result;
                });
        System.out.println(future.get());
    }
}

在这个例子中,handle方法中的BiFunction会根据resultex的值来决定返回结果。如果ex不为null,说明异步操作失败,返回默认订单列表;否则返回正常的订单列表。

错误恢复机制

除了简单的错误处理,CompletableFuture还支持错误恢复机制,即在出现异常后重新尝试异步操作。

1. 手动重试

可以通过在exceptionallyhandle方法中手动重新发起异步操作来实现重试。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class CompletableFutureRetryManual {
    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟可能抛出异常的获取用户信息操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Failed to get user info");
            }
            return "User Information";
        });
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息获取订单列表的异步操作
            return "Order List for " + userInfo;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = getUserInfo()
               .thenApplyAsync(CompletableFutureRetryManual::getOrderList)
               .exceptionally(ex -> {
                    System.out.println("Caught exception: " + ex.getMessage());
                    // 手动重试获取用户信息
                    return getUserInfo()
                           .thenApplyAsync(CompletableFutureRetryManual::getOrderList)
                           .join();
                });
        System.out.println(future.get());
    }
}

在上述代码中,当getUserInfo抛出异常时,exceptionally方法会重新调用getUserInfo并再次执行获取订单列表的操作。这里使用join方法来获取最终结果,因为exceptionally方法需要返回一个String类型的结果。

2. 使用自定义重试策略

为了更灵活地控制重试逻辑,可以定义一个通用的重试策略。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class CompletableFutureRetryPolicy {
    public static <U> CompletableFuture<U> retry(Supplier<CompletableFuture<U>> supplier, int maxRetries, long delay, TimeUnit timeUnit) {
        return supplier.get()
               .exceptionally(ex -> {
                    if (maxRetries > 1) {
                        try {
                            timeUnit.sleep(delay);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        return retry(supplier, maxRetries - 1, delay, timeUnit).join();
                    } else {
                        throw new RuntimeException(ex);
                    }
                });
    }

    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟可能抛出异常的获取用户信息操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Failed to get user info");
            }
            return "User Information";
        });
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息获取订单列表的异步操作
            return "Order List for " + userInfo;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = retry(() -> getUserInfo()
                       .thenApplyAsync(CompletableFutureRetryPolicy::getOrderList),
                3, 1, TimeUnit.SECONDS);
        System.out.println(future.get());
    }
}

在上述代码中,retry方法接收一个Supplier<CompletableFuture<U>>、最大重试次数maxRetries、重试间隔delay和时间单位timeUnit。当异步操作抛出异常时,retry方法会根据重试策略进行重试,每次重试前会等待指定的时间间隔。如果达到最大重试次数仍失败,则抛出异常。

异常传播

在链式调用中,异常会自动传播到后续的CompletableFuture。例如,如果getUserInfo抛出异常,那么thenApplyAsync中对getOrderList的调用将不会执行,异常会直接传播到exceptionallyhandle方法。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExceptionPropagation {
    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟抛出异常的获取用户信息操作
            throw new RuntimeException("Failed to get user info");
        });
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息获取订单列表的异步操作
            return "Order List for " + userInfo;
        });
    }

    public static void main(String[] args) {
        CompletableFuture<String> future = getUserInfo()
               .thenApplyAsync(CompletableFutureExceptionPropagation::getOrderList)
               .exceptionally(ex -> {
                    System.out.println("Caught exception: " + ex.getMessage());
                    return "Default Order List";
                });
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,getUserInfo抛出的异常直接传播到了exceptionally方法,getOrderList不会被执行。

总结错误处理与恢复的要点

  1. 选择合适的错误处理方法:根据需求选择exceptionallyhandle方法。如果只关心异常情况,exceptionally方法更简洁;如果需要在成功和失败时都进行处理,handle方法更合适。
  2. 实现有效的错误恢复:手动重试或使用自定义重试策略可以提高系统的可靠性。在重试时,要合理设置重试次数和间隔,避免过度重试导致系统资源浪费。
  3. 理解异常传播机制:异常会在链式调用中自动传播,这有助于保持代码的简洁性和逻辑性。合理利用异常传播可以更好地处理复杂的异步操作流程。

通过掌握CompletableFuture链式调用的错误处理与恢复机制,开发人员能够编写出更健壮、可靠的异步程序,提高系统在面对异常时的稳定性和可用性。无论是简单的错误处理还是复杂的错误恢复策略,都能为异步编程提供有力的支持。在实际应用中,根据具体业务场景灵活运用这些机制,将有助于打造高效、稳定的Java应用程序。

在处理复杂的业务逻辑时,可能会涉及多个异步操作的嵌套和组合。例如,在获取用户信息后,不仅要获取订单列表,还可能需要根据订单列表获取订单详情,并且每个操作都可能出现异常。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class ComplexCompletableFutureExample {
    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟获取用户信息的异步操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Failed to get user info");
            }
            return "User Information";
        });
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息获取订单列表的异步操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Failed to get order list");
            }
            return "Order List for " + userInfo;
        });
    }

    public static CompletableFuture<String> getOrderDetails(String orderList) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据订单列表获取订单详情的异步操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Failed to get order details");
            }
            return "Order Details for " + orderList;
        });
    }

    public static void main(String[] args) {
        CompletableFuture<String> future = getUserInfo()
               .thenApplyAsync(ComplexCompletableFutureExample::getOrderList)
               .thenApplyAsync(ComplexCompletableFutureExample::getOrderDetails)
               .handle((result, ex) -> {
                    if (ex != null) {
                        System.out.println("Caught exception: " + ex.getMessage());
                        return "Default Order Details";
                    }
                    return result;
                });
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,我们构建了一个包含三个异步操作的链式调用。每个操作都有一定概率抛出异常,通过handle方法统一处理异常,返回默认的订单详情。

另外,在实际开发中,可能会遇到需要在不同线程池中执行异步操作的情况。CompletableFuture提供了相应的方法来指定线程池。

import java.util.concurrent.*;

public class CompletableFutureWithExecutor {
    private static final ExecutorService executorService = Executors.newFixedThreadPool(10);

    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟获取用户信息的异步操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Failed to get user info");
            }
            return "User Information";
        }, executorService);
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息获取订单列表的异步操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Failed to get order list");
            }
            return "Order List for " + userInfo;
        }, executorService);
    }

    public static void main(String[] args) {
        CompletableFuture<String> future = getUserInfo()
               .thenApplyAsync(CompletableFutureWithExecutor::getOrderList, executorService)
               .exceptionally(ex -> {
                    System.out.println("Caught exception: " + ex.getMessage());
                    return "Default Order List";
                });
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
            try {
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    executorService.shutdownNow();
                    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                        System.err.println("Pool did not terminate");
                    }
                }
            } catch (InterruptedException ie) {
                executorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
}

在这个例子中,我们创建了一个固定大小的线程池executorService,并在getUserInfogetOrderList方法中通过supplyAsync的第二个参数指定使用该线程池执行异步操作。在thenApplyAsync方法中也指定了相同的线程池,以确保整个链式调用在特定的线程池中执行。最后,在程序结束时,我们优雅地关闭线程池,等待所有任务执行完毕或超时。

在处理错误恢复时,除了简单的重试机制,还可以结合熔断机制。熔断机制是一种保护措施,当某个服务在短时间内连续失败达到一定次数时,暂时停止对该服务的调用,避免大量无效请求对系统资源的消耗。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CompletableFutureCircuitBreaker {
    private static final int MAX_FAILURES = 3;
    private static final long RECOVERY_TIME = 5;
    private static final AtomicInteger failureCount = new AtomicInteger(0);
    private static long lastFailureTime = System.currentTimeMillis();

    public static CompletableFuture<String> getUserInfo() {
        if (failureCount.get() >= MAX_FAILURES && System.currentTimeMillis() - lastFailureTime < RECOVERY_TIME * 1000) {
            return CompletableFuture.failedFuture(new RuntimeException("Circuit breaker open"));
        }
        return CompletableFuture.supplyAsync(() -> {
            // 模拟获取用户信息的异步操作
            if (Math.random() > 0.5) {
                lastFailureTime = System.currentTimeMillis();
                failureCount.incrementAndGet();
                throw new RuntimeException("Failed to get user info");
            }
            failureCount.set(0);
            return "User Information";
        });
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息获取订单列表的异步操作
            return "Order List for " + userInfo;
        });
    }

    public static void main(String[] args) {
        CompletableFuture<String> future = getUserInfo()
               .thenApplyAsync(CompletableFutureCircuitBreaker::getOrderList)
               .exceptionally(ex -> {
                    System.out.println("Caught exception: " + ex.getMessage());
                    return "Default Order List";
                });
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,我们通过AtomicInteger记录失败次数,通过lastFailureTime记录上次失败时间。当失败次数达到MAX_FAILURES且在RECOVERY_TIME内时,getUserInfo方法直接返回一个失败的CompletableFuture,表示熔断已打开。这样可以避免在服务不稳定时持续调用可能失败的操作,提高系统的整体稳定性。

在实际的分布式系统中,异步操作可能涉及到远程服务调用,网络问题更加复杂。此时,除了上述的错误处理和恢复机制,还需要考虑网络超时等问题。CompletableFuture提供了completeOnTimeoutorTimeout方法来处理超时情况。

import java.util.concurrent.*;

public class CompletableFutureTimeoutExample {
    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟可能耗时较长的获取用户信息操作
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "User Information";
        });
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            return "Order List for " + userInfo;
        });
    }

    public static void main(String[] args) {
        CompletableFuture<String> future = getUserInfo()
               .thenApplyAsync(CompletableFutureTimeoutExample::getOrderList)
               .orTimeout(2, TimeUnit.SECONDS);
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            System.out.println("Caught exception: " + e.getMessage());
        }
    }
}

在这个例子中,getUserInfo方法模拟了一个可能耗时较长的操作,通过orTimeout方法设置了2秒的超时时间。如果在2秒内操作未完成,CompletableFuture将抛出TimeoutException,我们可以在catch块中进行相应的处理。

在链式调用的错误处理与恢复过程中,日志记录也非常重要。通过记录详细的异常信息和操作过程,能够帮助开发人员快速定位和解决问题。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CompletableFutureLoggingExample {
    private static final Logger LOGGER = Logger.getLogger(CompletableFutureLoggingExample.class.getName());

    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟获取用户信息的异步操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Failed to get user info");
            }
            return "User Information";
        });
    }

    public static CompletableFuture<String> getOrderList(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息获取订单列表的异步操作
            return "Order List for " + userInfo;
        });
    }

    public static void main(String[] args) {
        CompletableFuture<String> future = getUserInfo()
               .thenApplyAsync(CompletableFutureLoggingExample::getOrderList)
               .exceptionally(ex -> {
                    LOGGER.log(Level.SEVERE, "Exception occurred", ex);
                    return "Default Order List";
                });
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,我们使用Java自带的Logger记录异常信息。当getUserInfo抛出异常时,exceptionally方法中的LOGGER.log(Level.SEVERE, "Exception occurred", ex);会将异常信息记录到日志中,方便后续排查问题。

综上所述,在Java的CompletableFuture链式调用中,错误处理与恢复是一个复杂而重要的主题。从基本的异常处理方法,到重试机制、熔断机制、超时处理以及日志记录,每一个环节都对构建健壮的异步应用程序起着关键作用。开发人员需要根据具体的业务场景和需求,灵活运用这些技术,以确保系统在面对各种异常情况时能够稳定、可靠地运行。同时,合理利用线程池和分布式系统中的相关技术,也能进一步提升系统的性能和可用性。在实际开发中,不断积累经验,优化错误处理和恢复策略,是提高Java异步编程质量的重要途径。