MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture异常处理保障异步任务稳定性

2021-07-046.8k 阅读

Java CompletableFuture 异常处理基础

在Java中,CompletableFuture 是一个强大的类,用于异步计算。它提供了一种方便的方式来处理异步任务的结果,包括处理任务执行过程中可能出现的异常。理解 CompletableFuture 的异常处理机制是确保异步任务稳定性的关键。

异常传播机制

CompletableFuture 所代表的异步任务发生异常时,异常会被传播。例如,我们创建一个简单的 CompletableFuture 任务,在任务执行过程中抛出异常:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            throw new RuntimeException("模拟异常");
        });

        // 这里没有处理异常,异常会被传播
    }
}

在上述代码中,runAsync 方法启动一个异步任务,任务中抛出了 RuntimeException。如果没有对这个异常进行处理,它会在 CompletableFuture 内部传播,并且在获取任务结果时(例如通过 get 方法),异常会被重新抛出。

处理异常的基本方法

为了避免异常传播导致程序崩溃,CompletableFuture 提供了几种处理异常的方法。最常用的是 exceptionally 方法。我们对上述代码进行修改,使用 exceptionally 方法来处理异常:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            throw new RuntimeException("模拟异常");
        }).exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return null;
        });

        // 这里不需要再担心异常传播
    }
}

在这个例子中,exceptionally 方法接收一个 Function,当异步任务发生异常时,这个 Function 会被调用。Function 的参数是异常对象,我们可以在其中进行异常处理,例如打印异常信息,并且返回一个默认值(在这个例子中返回 null,因为任务返回类型是 Void)。这样,即使异步任务发生异常,程序也不会崩溃,而是会执行异常处理逻辑。

链式调用中的异常处理

CompletableFuture 的强大之处在于它支持链式调用,使得异步任务的编排更加方便。在链式调用中,异常处理也需要特别注意。

链式调用基础

假设我们有一个简单的异步任务链,第一个任务返回一个字符串,第二个任务基于第一个任务的结果进行处理:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureChainingExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "初始结果")
               .thenApply(result -> result + " 处理后");

        future.thenAccept(System.out::println);
    }
}

在这个例子中,supplyAsync 方法启动一个异步任务,返回一个 CompletableFuture<String>thenApply 方法基于前一个任务的结果进行处理,返回一个新的 CompletableFuture。最后,thenAccept 方法消费最终的结果并打印。

链式调用中的异常处理

当链式调用中的某个任务发生异常时,异常同样会传播。我们修改上述代码,让 thenApply 方法抛出异常:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureChainingExceptionExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "初始结果")
               .thenApply(result -> {
                    throw new RuntimeException("模拟异常");
                });

        // 这里没有处理异常,异常会传播
    }
}

为了处理链式调用中的异常,我们可以在链式调用的末尾使用 exceptionally 方法。修改如下:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureChainingExceptionExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "初始结果")
               .thenApply(result -> {
                    throw new RuntimeException("模拟异常");
                })
               .exceptionally(ex -> {
                    System.out.println("捕获到异常: " + ex.getMessage());
                    return "默认结果";
                });

        future.thenAccept(System.out::println);
    }
}

在这个例子中,exceptionally 方法捕获了 thenApply 方法中抛出的异常,并返回一个默认结果。这样,即使链式调用中的某个任务出现异常,整个任务链也不会中断,而是会执行异常处理逻辑并返回默认结果。

多个异常处理方法的配合使用

除了 exceptionally 方法,CompletableFuture 还提供了 whenCompletehandle 方法,它们在异常处理方面有不同的特点,可以与 exceptionally 方法配合使用。

whenComplete 方法会在任务完成(无论是正常完成还是异常完成)时被调用,它接收两个参数,第一个参数是任务的结果(如果任务正常完成),第二个参数是异常对象(如果任务异常完成):

import java.util.concurrent.CompletableFuture;

public class CompletableFutureWhenCompleteExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("模拟异常");
        }).whenComplete((result, ex) -> {
            if (ex != null) {
                System.out.println("捕获到异常: " + ex.getMessage());
            } else {
                System.out.println("任务正常完成,结果: " + result);
            }
        });
    }
}

handle 方法与 whenComplete 方法类似,但它可以返回一个新的结果,用于替换原任务的结果(无论是正常结果还是异常情况下的结果):

import java.util.concurrent.CompletableFuture;

public class CompletableFutureHandleExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("模拟异常");
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println("捕获到异常: " + ex.getMessage());
                return "默认结果";
            } else {
                return result;
            }
        });

        future.thenAccept(System.out::println);
    }
}

在实际应用中,我们可以根据具体需求选择合适的异常处理方法。exceptionally 方法适用于只关注异常情况并返回默认结果的场景;whenComplete 方法适用于需要在任务完成时进行通用的处理,无论是否发生异常;handle 方法则结合了两者的特点,既可以处理异常,又可以返回新的结果。

并行异步任务中的异常处理

在实际开发中,我们经常会遇到需要并行执行多个异步任务的场景。CompletableFuture 提供了一些方法来支持并行任务,同时也需要妥善处理并行任务中的异常。

并行任务基础

CompletableFutureallOf 方法可以用于等待所有的 CompletableFuture 任务完成。例如,我们有两个并行的异步任务:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureParallelExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "任务1结果");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "任务2结果");

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

        allFutures.thenRun(() -> {
            try {
                System.out.println("任务1结果: " + future1.get());
                System.out.println("任务2结果: " + future2.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

在这个例子中,allOf 方法返回一个新的 CompletableFuture<Void>,当 future1future2 都完成时,这个新的 CompletableFuture 才会完成。thenRun 方法在所有任务完成后执行,通过 get 方法获取每个任务的结果。

并行任务中的异常处理

当并行任务中的某个任务发生异常时,情况会变得稍微复杂一些。allOf 方法不会直接抛出异常,但是通过 get 方法获取异常任务的结果时,异常会被重新抛出。我们修改上述代码,让 future1 抛出异常:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureParallelExceptionExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("任务1异常");
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "任务2结果");

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

        allFutures.thenRun(() -> {
            try {
                System.out.println("任务1结果: " + future1.get());
                System.out.println("任务2结果: " + future2.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

在这个例子中,当 future1 抛出异常时,allOf 方法返回的 CompletableFuture<Void> 仍然会完成,但是在 thenRun 方法中通过 get 方法获取 future1 的结果时,异常会被重新抛出并打印堆栈信息。

为了更好地处理并行任务中的异常,我们可以在每个任务中单独处理异常,或者在 allOf 方法返回的 CompletableFuture 上使用异常处理方法。例如,我们在每个任务中使用 exceptionally 方法处理异常:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureParallelExceptionHandlingExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("任务1异常");
        }).exceptionally(ex -> {
            System.out.println("捕获到任务1异常: " + ex.getMessage());
            return "任务1默认结果";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "任务2结果");

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

        allFutures.thenRun(() -> {
            try {
                System.out.println("任务1结果: " + future1.get());
                System.out.println("任务2结果: " + future2.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

在这个例子中,future1 中的异常被 exceptionally 方法捕获并处理,返回了默认结果。这样,即使 future1 发生异常,整个并行任务的处理流程也不会被中断。

自定义异常处理策略

在实际应用中,默认的异常处理方法可能无法满足复杂的业务需求。这时,我们可以自定义异常处理策略。

自定义异常处理类

我们可以创建一个自定义的异常处理类,实现特定的异常处理逻辑。例如,我们创建一个 CustomExceptionHandler 类,它接收一个异常并返回一个默认结果:

import java.util.concurrent.CompletableFuture;

class CustomExceptionHandler {
    public static String handleException(Throwable ex) {
        System.out.println("自定义异常处理: " + ex.getMessage());
        return "自定义默认结果";
    }
}

然后,我们在 CompletableFuture 中使用这个自定义的异常处理类:

import java.util.concurrent.CompletableFuture;

public class CustomExceptionHandlingExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("模拟异常");
        }).exceptionally(CustomExceptionHandler::handleException);

        future.thenAccept(System.out::println);
    }
}

在这个例子中,exceptionally 方法接收 CustomExceptionHandler::handleException 作为参数,当异步任务发生异常时,会调用 CustomExceptionHandler 类中的 handleException 方法进行异常处理,并返回自定义的默认结果。

结合依赖注入使用自定义异常处理

在大型项目中,通常会使用依赖注入(例如 Spring 框架)来管理对象。我们可以将自定义异常处理类作为一个 bean 注入到需要处理 CompletableFuture 异常的组件中。假设我们使用 Spring 框架,定义一个 AsyncTaskService 类:

import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;

@Service
public class AsyncTaskService {
    private final CustomExceptionHandler customExceptionHandler;

    public AsyncTaskService(CustomExceptionHandler customExceptionHandler) {
        this.customExceptionHandler = customExceptionHandler;
    }

    public CompletableFuture<String> executeAsyncTask() {
        return CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("模拟异常");
        }).exceptionally(customExceptionHandler::handleException);
    }
}

在这个例子中,AsyncTaskService 类通过构造函数注入了 CustomExceptionHandler 实例。在 executeAsyncTask 方法中,CompletableFuture 使用了注入的 CustomExceptionHandler 来处理异常。这样,我们可以根据不同的业务场景和配置,灵活地切换异常处理策略。

异常处理与资源管理

在异步任务中,资源管理也是一个重要的方面。当异步任务发生异常时,需要确保相关资源能够正确释放,以避免资源泄漏。

资源管理基础

假设我们的异步任务需要使用一些资源,例如数据库连接。在正常情况下,我们需要在任务完成后关闭资源:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;

public class ResourceManagementExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            Connection connection = null;
            try {
                connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "root", "password");
                // 执行数据库操作
                return "操作成功";
            } catch (SQLException e) {
                throw new RuntimeException(e);
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).thenAccept(System.out::println);
    }
}

在这个例子中,我们在 try - finally 块中确保数据库连接在任务完成后(无论是正常完成还是异常完成)被关闭。

异常处理与资源管理结合

当异常处理与资源管理结合时,需要特别注意逻辑的正确性。例如,我们在异常处理过程中可能需要记录日志,并且仍然要确保资源被正确释放:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ExceptionAndResourceManagementExample {
    private static final Logger LOGGER = Logger.getLogger(ExceptionAndResourceManagementExample.class.getName());

    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            Connection connection = null;
            try {
                connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "root", "password");
                // 执行数据库操作,这里模拟抛出异常
                throw new SQLException("模拟数据库操作异常");
            } catch (SQLException e) {
                LOGGER.log(Level.SEVERE, "数据库操作异常", e);
                throw new RuntimeException(e);
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException e) {
                        LOGGER.log(Level.SEVERE, "关闭数据库连接异常", e);
                    }
                }
            }
        }).exceptionally(ex -> {
            LOGGER.log(Level.SEVERE, "处理异步任务异常", ex);
            return "默认结果";
        }).thenAccept(System.out::println);
    }
}

在这个例子中,当数据库操作抛出异常时,首先在 catch 块中记录异常日志,然后重新抛出异常。在 finally 块中,确保数据库连接被关闭,如果关闭过程中出现异常,也记录相应的日志。在 exceptionally 方法中,再次记录异步任务的异常日志,并返回默认结果。这样,我们既正确处理了异常,又确保了资源的正确释放。

异常处理在实际项目中的应用场景

在实际项目中,CompletableFuture 的异常处理机制有着广泛的应用场景。

微服务间的异步调用

在微服务架构中,经常会有服务之间的异步调用。例如,一个订单服务需要调用库存服务来检查库存并扣减库存。如果库存服务调用失败,订单服务需要正确处理异常,而不是导致整个订单处理流程崩溃。

import java.util.concurrent.CompletableFuture;

public class MicroserviceExample {
    public static CompletableFuture<Boolean> checkAndDeductStock(String productId, int quantity) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟库存服务调用,这里可能抛出异常
            if (Math.random() < 0.5) {
                throw new RuntimeException("库存服务异常");
            }
            // 实际逻辑:检查库存并扣减
            return true;
        }).exceptionally(ex -> {
            System.out.println("库存服务调用异常: " + ex.getMessage());
            return false;
        });
    }

    public static void processOrder(String productId, int quantity) {
        CompletableFuture<Boolean> stockFuture = checkAndDeductStock(productId, quantity);
        stockFuture.thenAccept(stockResult -> {
            if (stockResult) {
                System.out.println("库存充足,订单处理成功");
            } else {
                System.out.println("库存不足或库存服务异常,订单处理失败");
            }
        });
    }

    public static void main(String[] args) {
        processOrder("product1", 10);
    }
}

在这个例子中,checkAndDeductStock 方法模拟了库存服务的异步调用。如果调用过程中出现异常,通过 exceptionally 方法处理异常并返回 false。在 processOrder 方法中,根据库存检查结果处理订单,确保即使库存服务出现异常,订单服务也能正确处理,不会导致整个系统崩溃。

批量数据处理

在大数据处理场景中,经常需要批量处理数据。例如,我们需要读取一批文件并进行处理。每个文件的处理可以作为一个异步任务,如果某个文件处理失败,我们不希望整个批量处理过程中断,而是记录异常并继续处理其他文件。

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class BatchDataProcessingExample {
    private static final Logger LOGGER = Logger.getLogger(BatchDataProcessingExample.class.getName());

    public static CompletableFuture<Void> processFile(File file) {
        return CompletableFuture.runAsync(() -> {
            try {
                // 模拟文件处理,这里可能抛出异常
                if (Math.random() < 0.5) {
                    throw new RuntimeException("文件处理异常");
                }
                System.out.println("处理文件: " + file.getName());
            } catch (Exception e) {
                LOGGER.log(Level.SEVERE, "处理文件 " + file.getName() + " 异常", e);
            }
        });
    }

    public static void main(String[] args) {
        File[] files = new File("data").listFiles();
        if (files != null) {
            List<CompletableFuture<Void>> futures = new ArrayList<>();
            for (File file : files) {
                futures.add(processFile(file));
            }

            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                   .thenRun(() -> System.out.println("所有文件处理完成"))
                   .exceptionally(ex -> {
                        LOGGER.log(Level.SEVERE, "批量处理文件异常", ex);
                        return null;
                    });
        }
    }
}

在这个例子中,processFile 方法处理单个文件,在处理过程中如果出现异常,通过日志记录异常信息。在 main 方法中,对一批文件进行异步处理,使用 allOf 方法等待所有文件处理完成。如果整个批量处理过程中出现异常,通过 exceptionally 方法记录异常日志。这样,即使某个文件处理失败,其他文件仍然可以继续处理,保证了批量数据处理的稳定性。

通过深入理解 CompletableFuture 的异常处理机制,并结合实际项目中的应用场景,我们能够编写更加健壮、稳定的异步程序,提高系统的可靠性和性能。在实际开发中,需要根据具体业务需求,灵活选择合适的异常处理方法和策略,确保异步任务在各种情况下都能正确执行。同时,要注意资源管理,避免因异常导致的资源泄漏问题。通过合理运用异常处理和资源管理,我们可以充分发挥 CompletableFuture 的强大功能,构建高效、稳定的Java应用程序。