MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture whenComplete全面监控任务状态的应用

2022-11-185.8k 阅读

Java CompletableFuture whenComplete 全面监控任务状态的应用

1. CompletableFuture 简介

在 Java 中,CompletableFuture 是 Java 8 引入的一个强大的异步编程工具。它实现了 Future 接口和 CompletionStage 接口,不仅能够获取异步任务的执行结果,还提供了丰富的方法来处理异步任务的完成情况,比如链式调用、组合多个异步任务等。这使得异步编程在 Java 中变得更加简洁和高效。

Future 接口是 Java 早期用于异步计算的方式,它允许我们启动一个异步任务并在将来某个时间获取任务的结果。然而,Future 存在一些局限性,比如我们无法得知任务是否完成,只能通过 get() 方法阻塞等待结果,或者使用 isDone() 方法轮询检查任务状态。而且,Future 对于处理异步任务的完成情况缺乏灵活性,难以进行链式调用和组合多个异步任务。

CompletableFuture 则弥补了这些不足。它提供了一系列方法来处理异步任务的完成情况,比如 thenApply()thenAccept()thenRun() 等,这些方法允许我们在任务完成后执行相应的操作,并且可以链式调用。同时,CompletableFuture 还支持多个异步任务的组合,比如 allOf()anyOf() 方法,使得异步编程更加灵活和强大。

2. whenComplete 方法详解

whenComplete 方法是 CompletableFuture 提供的一个用于处理任务完成情况的方法。它的定义如下:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)

这个方法接收一个 BiConsumer 类型的参数 actionBiConsumer 是一个函数式接口,它有两个参数:第一个参数是异步任务的结果(如果任务成功完成),第二个参数是任务执行过程中抛出的异常(如果任务异常完成)。当异步任务完成(无论是正常完成还是异常完成)时,action 将会被执行。

需要注意的是,whenComplete 方法返回的是一个新的 CompletableFuture,这个新的 CompletableFuture 的结果和原始的 CompletableFuture 的结果是一样的。也就是说,whenComplete 方法不会改变原始 CompletableFuture 的结果,它只是在原始 CompletableFuture 完成时执行指定的操作。

下面是一个简单的示例代码,展示了 whenComplete 方法的基本用法:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class WhenCompleteExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟异步任务
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务完成";
        });

        future.whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("任务正常完成,结果是: " + result);
            } else {
                System.out.println("任务异常完成,异常信息是: " + ex.getMessage());
            }
        });

        // 获取异步任务的结果
        String result = future.get();
        System.out.println("通过 get 方法获取的结果: " + result);
    }
}

在这个示例中,我们使用 CompletableFuture.supplyAsync 方法创建了一个异步任务,这个任务会休眠 2 秒后返回一个字符串 "任务完成"。然后,我们调用 whenComplete 方法,在任务完成时打印任务的结果或者异常信息。最后,我们通过 get 方法获取异步任务的结果并打印。

3. whenComplete 方法的应用场景

3.1 日志记录

在实际开发中,我们经常需要记录异步任务的执行情况,以便在出现问题时能够快速定位。whenComplete 方法可以很方便地实现这一功能。

import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

public class WhenCompleteLoggingExample {
    private static final Logger LOGGER = Logger.getLogger(WhenCompleteLoggingExample.class.getName());

    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟异步任务
            if (Math.random() < 0.5) {
                throw new RuntimeException("任务失败");
            }
            return "任务成功";
        });

        future.whenComplete((result, ex) -> {
            if (ex == null) {
                LOGGER.log(Level.INFO, "任务正常完成,结果是: " + result);
            } else {
                LOGGER.log(Level.SEVERE, "任务异常完成", ex);
            }
        });
    }
}

在这个示例中,我们在异步任务中通过 Math.random() 方法模拟了任务失败的情况。然后,在 whenComplete 方法中,根据任务的完成情况记录不同级别的日志。如果任务正常完成,记录 INFO 级别的日志;如果任务异常完成,记录 SEVERE 级别的日志,并打印异常堆栈信息。

3.2 资源清理

在一些情况下,异步任务可能会占用一些资源,比如数据库连接、文件句柄等。当任务完成后,我们需要及时清理这些资源,以避免资源泄漏。whenComplete 方法可以帮助我们在任务完成时进行资源清理。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;

public class WhenCompleteResourceCleanupExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            Connection connection = null;
            try {
                connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "password");
                // 执行数据库操作
                System.out.println("执行数据库操作");
            } catch (SQLException e) {
                throw new RuntimeException(e);
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        future.whenComplete((v, ex) -> {
            if (ex != null) {
                System.out.println("任务异常,清理资源失败: " + ex.getMessage());
            } else {
                System.out.println("任务正常完成,资源已清理");
            }
        });
    }
}

在这个示例中,我们在异步任务中获取了一个数据库连接,并执行了一些数据库操作。在 finally 块中,我们关闭了数据库连接。然后,在 whenComplete 方法中,根据任务的完成情况打印相应的信息。如果任务异常,打印清理资源失败的信息;如果任务正常完成,打印资源已清理的信息。

3.3 错误处理和重试

在异步任务执行过程中,可能会出现各种错误。我们可以使用 whenComplete 方法来捕获这些错误,并根据错误情况进行相应的处理,比如重试。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class WhenCompleteErrorHandlingAndRetryExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = performTaskWithRetry(3);
        String result = future.get();
        System.out.println("最终结果: " + result);
    }

    private static CompletableFuture<String> performTaskWithRetry(int maxRetries) {
        CompletableFuture<String> future = new CompletableFuture<>();
        performTask(future, maxRetries);
        return future;
    }

    private static void performTask(CompletableFuture<String> future, int retries) {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("任务失败");
            }
            return "任务成功";
        }).whenComplete((result, ex) -> {
            if (ex == null) {
                future.complete(result);
            } else if (retries > 0) {
                System.out.println("任务失败,重试次数: " + retries);
                performTask(future, retries - 1);
            } else {
                future.completeExceptionally(ex);
            }
        });
    }
}

在这个示例中,我们定义了一个 performTaskWithRetry 方法,该方法接受一个最大重试次数的参数。在 performTask 方法中,我们模拟了一个可能失败的异步任务。如果任务失败且重试次数大于 0,则进行重试;如果重试次数用完仍然失败,则将异常传递给 CompletableFuture。通过 whenComplete 方法,我们可以根据任务的完成情况进行相应的处理,实现了错误处理和重试的功能。

4. whenComplete 与其他相关方法的对比

4.1 whenComplete 与 thenApply

thenApply 方法也是 CompletableFuture 提供的一个用于处理任务完成情况的方法。它的定义如下:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

thenApply 方法接收一个 Function 类型的参数 fnFunction 是一个函数式接口,它只有一个参数,即异步任务的结果(如果任务成功完成),并返回一个新的结果。当异步任务成功完成时,fn 将会被执行,并且 thenApply 方法返回的新的 CompletableFuture 的结果就是 fn 的返回值。

whenComplete 方法不同的是,thenApply 方法只能处理任务正常完成的情况,它无法处理任务异常完成的情况。而且,thenApply 方法会改变原始 CompletableFuture 的结果,返回一个新的 CompletableFuture,其结果是 fn 的返回值。

下面是一个示例代码,展示了 thenApply 方法的用法:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "任务完成");

        CompletableFuture<Integer> newFuture = future.thenApply(result -> result.length());

        Integer length = newFuture.get();
        System.out.println("字符串长度: " + length);
    }
}

在这个示例中,我们使用 CompletableFuture.supplyAsync 方法创建了一个异步任务,返回一个字符串 "任务完成"。然后,我们调用 thenApply 方法,将字符串的长度作为新的结果返回,并通过 get 方法获取新的 CompletableFuture 的结果。

4.2 whenComplete 与 thenAccept

thenAccept 方法也是 CompletableFuture 提供的一个用于处理任务完成情况的方法。它的定义如下:

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

thenAccept 方法接收一个 Consumer 类型的参数 actionConsumer 是一个函数式接口,它只有一个参数,即异步任务的结果(如果任务成功完成),并且没有返回值。当异步任务成功完成时,action 将会被执行,并且 thenAccept 方法返回的新的 CompletableFuture 的结果是 null

whenComplete 方法不同的是,thenAccept 方法只能处理任务正常完成的情况,它无法处理任务异常完成的情况。而且,thenAccept 方法会改变原始 CompletableFuture 的结果,返回一个新的 CompletableFuture,其结果是 null

下面是一个示例代码,展示了 thenAccept 方法的用法:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenAcceptExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "任务完成");

        CompletableFuture<Void> newFuture = future.thenAccept(result -> System.out.println("任务结果: " + result));

        newFuture.get();
        System.out.println("新的 CompletableFuture 的结果: " + newFuture.join());
    }
}

在这个示例中,我们使用 CompletableFuture.supplyAsync 方法创建了一个异步任务,返回一个字符串 "任务完成"。然后,我们调用 thenAccept 方法,打印任务的结果,并通过 get 方法获取新的 CompletableFuture 的结果(这里是 null)。

4.3 whenComplete 与 handle

handle 方法也是 CompletableFuture 提供的一个用于处理任务完成情况的方法。它的定义如下:

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable,? extends U> fn)

handle 方法接收一个 BiFunction 类型的参数 fnBiFunction 是一个函数式接口,它有两个参数:第一个参数是异步任务的结果(如果任务成功完成),第二个参数是任务执行过程中抛出的异常(如果任务异常完成),并返回一个新的结果。当异步任务完成(无论是正常完成还是异常完成)时,fn 将会被执行,并且 handle 方法返回的新的 CompletableFuture 的结果就是 fn 的返回值。

whenComplete 方法不同的是,handle 方法会改变原始 CompletableFuture 的结果,返回一个新的 CompletableFuture,其结果是 fn 的返回值。而 whenComplete 方法不会改变原始 CompletableFuture 的结果,它只是在原始 CompletableFuture 完成时执行指定的操作。

下面是一个示例代码,展示了 handle 方法的用法:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class HandleExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("任务失败");
            }
            return "任务成功";
        });

        CompletableFuture<String> newFuture = future.handle((result, ex) -> {
            if (ex == null) {
                return "正常结果: " + result;
            } else {
                return "异常结果: " + ex.getMessage();
            }
        });

        String newResult = newFuture.get();
        System.out.println("新的结果: " + newResult);
    }
}

在这个示例中,我们使用 CompletableFuture.supplyAsync 方法创建了一个异步任务,该任务可能会失败。然后,我们调用 handle 方法,根据任务的完成情况返回不同的结果,并通过 get 方法获取新的 CompletableFuture 的结果。

5. whenComplete 方法的注意事项

5.1 异常处理

whenComplete 方法中,我们可以通过 BiConsumer 的第二个参数来捕获异步任务执行过程中抛出的异常。但是需要注意的是,whenComplete 方法本身并不会抛出异常,即使 BiConsumer 中抛出了异常,也不会影响原始 CompletableFuture 的结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class WhenCompleteExceptionHandlingExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("任务失败");
        });

        future.whenComplete((result, ex) -> {
            if (ex != null) {
                System.out.println("捕获到异常: " + ex.getMessage());
                // 这里抛出的异常不会影响原始 CompletableFuture 的结果
                throw new RuntimeException("处理异常时抛出的异常");
            }
        });

        try {
            String result = future.get();
        } catch (ExecutionException e) {
            System.out.println("通过 get 方法捕获到的异常: " + e.getCause().getMessage());
        }
    }
}

在这个示例中,异步任务抛出了一个异常,我们在 whenComplete 方法中捕获到了这个异常,并在处理异常时又抛出了一个新的异常。但是,通过 get 方法获取异步任务的结果时,捕获到的仍然是原始异步任务抛出的异常,而不是 whenComplete 方法中处理异常时抛出的异常。

5.2 线程模型

whenComplete 方法默认是在执行异步任务的线程中执行 BiConsumer。这意味着如果异步任务执行时间较长,BiConsumer 的执行也会被延迟。如果我们希望 BiConsumer 在另一个线程中执行,可以使用 whenCompleteAsync 方法。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class WhenCompleteThreadModelExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务完成";
        });

        future.whenCompleteAsync((result, ex) -> {
            if (ex == null) {
                System.out.println("任务正常完成,结果是: " + result + ",当前线程: " + Thread.currentThread().getName());
            } else {
                System.out.println("任务异常完成,异常信息是: " + ex.getMessage());
            }
        }, executor);

        String result = future.get();
        System.out.println("通过 get 方法获取的结果: " + result);
        executor.shutdown();
    }
}

在这个示例中,我们使用 whenCompleteAsync 方法,并传入一个自定义的线程池 executor。这样,BiConsumer 将会在自定义线程池中执行,而不会阻塞执行异步任务的线程。

6. 总结

whenComplete 方法是 Java CompletableFuture 中一个非常实用的方法,它可以帮助我们全面监控异步任务的状态,无论是任务正常完成还是异常完成,都能进行相应的处理。通过 whenComplete 方法,我们可以实现日志记录、资源清理、错误处理和重试等功能,提高异步编程的可靠性和灵活性。

在使用 whenComplete 方法时,需要注意异常处理和线程模型等问题,避免出现意想不到的结果。同时,与 thenApplythenAccepthandle 等相关方法进行对比,可以更好地理解 whenComplete 方法的特点和适用场景。

总之,熟练掌握 whenComplete 方法的用法,对于编写高效、可靠的异步应用程序非常重要。希望通过本文的介绍和示例代码,读者能够对 whenComplete 方法有更深入的理解和应用。