MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 任务完成回调 whenComplete 方法

2021-04-141.9k 阅读

CompletableFuture 概述

在Java 8引入的CompletableFuture类为异步编程提供了强大的支持。它不仅允许我们以异步方式执行任务,还提供了丰富的方法来处理任务的完成、错误处理以及结果组合等。CompletableFuture代表一个异步计算的结果,这意味着我们可以在任务完成后获取其结果,而无需阻塞当前线程等待任务执行完毕。

CompletableFuture实现了Future接口,这是Java早期用于异步计算的接口。与传统的Future相比,CompletableFuture更加灵活和强大,它允许我们以链式调用的方式编写异步代码,使得异步编程更加直观和可读。例如,我们可以轻松地将多个异步任务串联起来,一个任务的结果作为另一个任务的输入,而不必像传统方式那样手动管理线程和同步。

whenComplete 方法的基本概念

whenComplete方法是CompletableFuture类提供的用于任务完成回调的方法之一。当CompletableFuture代表的异步任务完成(无论是正常完成还是因异常而完成)时,whenComplete方法中指定的回调函数将会被执行。它的基本签名如下:

CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)

这里的action是一个BiConsumer,它接收两个参数:第一个参数是异步任务正常完成时的结果(如果任务正常完成,result不为nullexceptionnull),第二个参数是任务执行过程中抛出的异常(如果任务因异常而完成,exception不为nullresultnull)。无论任务是成功还是失败,whenComplete的回调都会被执行。

whenComplete 方法的返回值

whenComplete方法返回一个新的CompletableFuture,这个新的CompletableFuture在原始CompletableFuture完成时也会完成,并且其结果与原始CompletableFuture相同。也就是说,我们可以在链式调用中继续对这个返回的CompletableFuture进行操作。例如:

CompletableFuture.supplyAsync(() -> "Hello")
                 .whenComplete((result, exception) -> {
                      if (exception == null) {
                          System.out.println("Task completed successfully: " + result);
                      } else {
                          System.out.println("Task failed: " + exception.getMessage());
                      }
                  })
                 .thenApply(String::toUpperCase)
                 .thenAccept(System.out::println);

在上述代码中,首先通过supplyAsync创建一个异步任务,返回字符串“Hello”。whenComplete回调在任务完成时打印任务的完成状态。然后通过thenApply将结果转换为大写,最后通过thenAccept打印最终结果。

whenComplete 方法的使用场景

结果处理与日志记录

whenComplete方法非常适合用于记录任务的执行结果或错误信息。例如,在一个Web应用中,我们可能有一个异步任务来查询数据库获取用户信息。当任务完成时,我们可以使用whenComplete记录任务是否成功以及获取到的用户信息。

CompletableFuture<User> getUserFuture = CompletableFuture.supplyAsync(() -> {
    // 模拟数据库查询
    if (Math.random() > 0.5) {
        return new User("John", 30);
    } else {
        throw new RuntimeException("Database query failed");
    }
});

getUserFuture.whenComplete((user, exception) -> {
    if (exception == null) {
        System.out.println("User retrieved successfully: " + user);
    } else {
        System.out.println("Error retrieving user: " + exception.getMessage());
    }
});

在这个例子中,supplyAsync模拟了一个可能成功或失败的数据库查询任务。whenComplete回调根据任务的完成状态记录相应的信息。

资源清理

在异步任务执行过程中,可能会涉及到一些资源的获取,比如数据库连接、文件句柄等。当任务完成后,无论成功与否,都需要清理这些资源。whenComplete方法可以方便地实现这一点。

CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
    Connection connection = null;
    try {
        connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password");
        // 执行数据库操作
        Statement statement = connection.createStatement();
        statement.executeUpdate("INSERT INTO users (name, age) VALUES ('Jane', 25)");
    } catch (SQLException e) {
        throw new RuntimeException(e);
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
});

task.whenComplete((result, exception) -> {
    if (exception != null) {
        System.out.println("Database operation failed: " + exception.getMessage());
    } else {
        System.out.println("Database operation completed successfully");
    }
});

在上述代码中,runAsync方法执行一个数据库插入操作。在任务执行过程中获取数据库连接,任务完成后,无论是否成功,whenComplete回调都会打印相应的状态信息。同时,在任务内部通过finally块确保数据库连接被关闭。

异步任务链的中间处理

在构建复杂的异步任务链时,whenComplete可以用于在任务链的中间阶段对结果进行处理或记录,而不影响任务链的后续执行。

CompletableFuture.supplyAsync(() -> "Hello")
                 .thenApply(String::toUpperCase)
                 .whenComplete((result, exception) -> {
                      System.out.println("Intermediate result: " + result);
                  })
                 .thenApply(result -> result + " World")
                 .thenAccept(System.out::println);

在这个例子中,whenComplete回调在thenApply将字符串转换为大写后执行,打印中间结果。然后任务链继续执行,将结果与“ World”拼接并最终打印。

whenComplete 方法与异常处理

捕获任务中的异常

whenComplete方法的回调函数中可以捕获异步任务执行过程中抛出的异常。这使得我们可以在任务完成后统一处理异常,而不必在任务执行的代码块中分散处理异常。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        return 10;
    } else {
        throw new RuntimeException("Task failed");
    }
});

future.whenComplete((result, exception) -> {
    if (exception != null) {
        System.out.println("Caught exception: " + exception.getMessage());
    } else {
        System.out.println("Result: " + result);
    }
});

在上述代码中,supplyAsync创建的异步任务有一定概率抛出异常。whenComplete回调能够捕获并处理这个异常,打印相应的错误信息。

异常处理与任务链的延续

whenComplete捕获到异常时,我们仍然可以通过返回的CompletableFuture继续构建任务链。例如,我们可以在异常发生时执行一些备用操作。

CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        return 10;
    } else {
        throw new RuntimeException("Task failed");
    }
})
.whenComplete((result, exception) -> {
    if (exception != null) {
        System.out.println("Task failed, performing fallback");
    }
})
.exceptionally(ex -> {
    // 备用操作,返回一个默认值
    return -1;
})
.thenAccept(System.out::println);

在这个例子中,whenComplete回调在任务失败时打印提示信息。exceptionally方法在捕获到异常时执行备用操作,返回一个默认值-1,并继续任务链的执行,最终将结果打印出来。

whenComplete 方法的线程模型

回调执行的线程

whenComplete方法的回调函数默认在执行完成CompletableFuture任务的线程中执行。例如,如果CompletableFuture是通过supplyAsync创建的,并且任务在ForkJoinPool.commonPool()线程池中执行,那么whenComplete的回调也会在这个线程池中执行。

CompletableFuture.supplyAsync(() -> {
    System.out.println("Task is running in thread: " + Thread.currentThread().getName());
    return "Result";
})
.whenComplete((result, exception) -> {
    System.out.println("Callback is running in thread: " + Thread.currentThread().getName());
});

在上述代码中,任务和回调都打印当前执行的线程名。运行代码可以发现,任务和回调通常在同一个线程中执行(除非线程池中的线程被复用或其他特殊情况)。

使用自定义线程池

如果我们希望whenComplete的回调在特定的线程池中执行,可以使用whenCompleteAsync方法。whenCompleteAsync方法有两个重载版本,一个接受Executor参数,另一个不接受参数(不接受参数时默认使用ForkJoinPool.commonPool())。

ExecutorService executor = Executors.newFixedThreadPool(2);

CompletableFuture.supplyAsync(() -> {
    System.out.println("Task is running in thread: " + Thread.currentThread().getName());
    return "Result";
})
.whenCompleteAsync((result, exception) -> {
    System.out.println("Callback is running in thread: " + Thread.currentThread().getName());
}, executor);

executor.shutdown();

在这个例子中,whenCompleteAsync方法的回调在我们自定义的Executor线程池中执行。通过打印线程名可以看到任务和回调在不同的线程中执行。

whenComplete 方法与其他 CompletableFuture 方法的组合使用

与 thenApply 组合

whenComplete可以与thenApply方法组合使用,先通过whenComplete对任务结果进行记录或简单处理,然后通过thenApply对结果进行进一步的转换。

CompletableFuture.supplyAsync(() -> "hello")
                 .whenComplete((result, exception) -> {
                      System.out.println("Original result: " + result);
                  })
                 .thenApply(String::toUpperCase)
                 .thenAccept(System.out::println);

在这个例子中,whenComplete先打印原始结果,然后thenApply将结果转换为大写并打印。

与 thenAccept 组合

whenCompletethenAccept也可以组合使用。whenComplete用于处理任务完成的通用逻辑,thenAccept用于对最终结果进行消费。

CompletableFuture.supplyAsync(() -> 10)
                 .whenComplete((result, exception) -> {
                      if (exception == null) {
                          System.out.println("Task completed with result: " + result);
                      } else {
                          System.out.println("Task failed: " + exception.getMessage());
                      }
                  })
                 .thenAccept(result -> System.out.println("Final result: " + result));

在这个例子中,whenComplete打印任务完成状态,thenAccept打印最终结果。

与 thenCompose 组合

whenCompletethenCompose组合可以用于构建复杂的异步任务链,其中一个任务的结果作为另一个异步任务的输入,并且在中间过程中可以使用whenComplete进行结果记录或处理。

CompletableFuture.supplyAsync(() -> "input")
                 .whenComplete((result, exception) -> {
                      System.out.println("First task result: " + result);
                  })
                 .thenCompose(input -> CompletableFuture.supplyAsync(() -> {
                      System.out.println("Processing input: " + input);
                      return input + " processed";
                  }))
                 .thenAccept(System.out::println);

在这个例子中,第一个任务返回“input”,whenComplete打印第一个任务的结果。然后thenCompose将这个结果作为输入启动另一个异步任务,对输入进行处理并返回新的结果,最后打印最终结果。

实际应用案例分析

电商系统中的库存查询与订单处理

假设我们正在开发一个电商系统,当用户下单时,需要先查询库存是否足够。库存查询是一个异步操作,因为它可能涉及到与外部库存系统的交互。如果库存足够,则创建订单。

CompletableFuture<Boolean> checkStockFuture = CompletableFuture.supplyAsync(() -> {
    // 模拟库存查询
    return Math.random() > 0.5;
});

CompletableFuture<Void> orderFuture = checkStockFuture
       .whenComplete((stockAvailable, exception) -> {
            if (exception != null) {
                System.out.println("Stock check failed: " + exception.getMessage());
            } else {
                System.out.println("Stock check result: " + stockAvailable);
            }
        })
       .thenCompose(stockAvailable -> {
            if (stockAvailable) {
                return CompletableFuture.runAsync(() -> {
                    System.out.println("Creating order...");
                    // 实际的订单创建逻辑
                });
            } else {
                return CompletableFuture.runAsync(() -> {
                    System.out.println("Insufficient stock, cannot create order");
                });
            }
        });

orderFuture.join();

在这个例子中,checkStockFuture模拟库存查询。whenComplete在库存查询完成时打印查询结果或错误信息。然后thenCompose根据库存查询结果决定是否创建订单,并通过runAsync异步执行相应的操作。

数据分析系统中的数据聚合与报告生成

在一个数据分析系统中,我们可能需要从多个数据源获取数据,然后对这些数据进行聚合,最后生成报告。每个步骤都可以是异步操作。

CompletableFuture<List<Integer>> dataSource1Future = CompletableFuture.supplyAsync(() -> {
    // 模拟从数据源1获取数据
    return Arrays.asList(1, 2, 3);
});

CompletableFuture<List<Integer>> dataSource2Future = CompletableFuture.supplyAsync(() -> {
    // 模拟从数据源2获取数据
    return Arrays.asList(4, 5, 6);
});

CompletableFuture<Integer> aggregateFuture = CompletableFuture.allOf(dataSource1Future, dataSource2Future)
       .thenApply(v -> {
            List<Integer> combinedData = new ArrayList<>();
            combinedData.addAll(dataSource1Future.join());
            combinedData.addAll(dataSource2Future.join());
            return combinedData.stream().mapToInt(Integer::intValue).sum();
        })
       .whenComplete((sum, exception) -> {
            if (exception == null) {
                System.out.println("Aggregated sum: " + sum);
            } else {
                System.out.println("Aggregation failed: " + exception.getMessage());
            }
        });

CompletableFuture<Void> reportFuture = aggregateFuture
       .thenAccept(sum -> {
            System.out.println("Generating report with sum: " + sum);
            // 实际的报告生成逻辑
        });

reportFuture.join();

在这个例子中,dataSource1FuturedataSource2Future分别模拟从两个数据源获取数据。allOf方法等待两个数据源的数据都获取完成。然后thenApply对数据进行聚合,whenComplete在聚合完成时打印结果或错误信息。最后thenAccept根据聚合结果生成报告。

注意事项

避免阻塞回调线程

whenComplete回调中,应避免执行长时间阻塞的操作,因为这可能会影响线程池的性能。如果需要执行阻塞操作,建议将其放在另一个异步任务中。

CompletableFuture.supplyAsync(() -> "result")
                 .whenComplete((result, exception) -> {
                      // 错误做法,阻塞线程
                      try {
                          Thread.sleep(5000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println("Callback with result: " + result);
                  });

上述代码中在whenComplete回调中使用Thread.sleep阻塞线程5秒,这是不合适的。可以将阻塞操作放在另一个异步任务中:

CompletableFuture.supplyAsync(() -> "result")
                 .whenComplete((result, exception) -> {
                      CompletableFuture.runAsync(() -> {
                          try {
                              Thread.sleep(5000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          System.out.println("Callback with result: " + result);
                      });
                  });

内存泄漏风险

如果在whenComplete回调中持有对外部资源(如数据库连接、文件句柄等)的强引用,并且没有正确释放这些资源,可能会导致内存泄漏。确保在任务完成后及时清理这些资源。

class ResourceHolder {
    private Connection connection;

    public ResourceHolder() {
        try {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password");
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public void doTask() {
        CompletableFuture.runAsync(() -> {
            // 执行任务
        })
       .whenComplete((result, exception) -> {
            // 没有释放connection,可能导致内存泄漏
            if (exception != null) {
                System.out.println("Task failed: " + exception.getMessage());
            } else {
                System.out.println("Task completed");
            }
        });
    }

    public void close() {
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

在上述代码中,ResourceHolder类在whenComplete回调中没有释放数据库连接,可能导致内存泄漏。应该在whenComplete回调中或者在合适的地方调用close方法释放连接。

异常处理的完备性

虽然whenComplete可以捕获任务执行过程中的异常,但在复杂的任务链中,确保所有可能的异常都被妥善处理是很重要的。特别是在与其他CompletableFuture方法组合使用时,要注意异常处理的连续性。

CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        return "result";
    } else {
        throw new RuntimeException("Task failed");
    }
})
.whenComplete((result, exception) -> {
    if (exception != null) {
        System.out.println("Caught in whenComplete: " + exception.getMessage());
    }
})
.thenApply(String::toUpperCase)
.thenAccept(System.out::println);

在上述代码中,如果任务在supplyAsync中抛出异常,whenComplete可以捕获并打印异常信息。但是,由于thenApply没有处理异常,任务链可能会在thenApply处中断,而没有进一步的异常处理。可以通过exceptionally方法来确保异常在整个任务链中得到妥善处理:

CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        return "result";
    } else {
        throw new RuntimeException("Task failed");
    }
})
.whenComplete((result, exception) -> {
    if (exception != null) {
        System.out.println("Caught in whenComplete: " + exception.getMessage());
    }
})
.exceptionally(ex -> {
    System.out.println("Caught in exceptionally: " + ex.getMessage());
    return "default";
})
.thenApply(String::toUpperCase)
.thenAccept(System.out::println);

在这个改进后的代码中,exceptionally方法在whenComplete之后捕获异常,并返回一个默认值,确保任务链可以继续执行。

通过深入理解CompletableFuturewhenComplete方法及其在不同场景下的应用,我们可以更加高效地编写异步代码,充分利用多核处理器的性能,提升应用程序的响应速度和并发处理能力。同时,注意避免常见的问题,如阻塞线程、内存泄漏和异常处理不当等,以确保异步代码的稳定性和可靠性。