MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 任务异步回调 thenApplyAsync 方法

2024-02-071.4k 阅读

什么是 CompletableFuture

在Java中,CompletableFuture 是Java 8引入的一个强大的类,用于支持异步编程。它提供了一种更灵活和简洁的方式来处理异步任务,尤其是在需要处理多个异步任务之间的依赖关系以及处理异步任务的结果时。CompletableFuture 实现了 Future 接口,这意味着它可以用于表示一个异步操作的结果,并且可以通过 get() 方法阻塞等待结果。同时,它还提供了丰富的方法来处理异步任务的完成、组合和转换,使得异步编程变得更加直观和高效。

thenApplyAsync 方法概述

thenApplyAsyncCompletableFuture 类中的一个方法,用于对 CompletableFuture 的结果进行异步转换。具体来说,当一个 CompletableFuture 完成时,thenApplyAsync 方法会异步地应用一个函数到这个完成的 CompletableFuture 的结果上,并返回一个新的 CompletableFuture,这个新的 CompletableFuture 的结果是应用函数后的返回值。

thenApplyAsync 方法有两个重载版本:

  1. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn):使用默认的 ForkJoinPool.commonPool() 线程池来异步执行转换函数。
  2. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor):使用指定的 Executor 来异步执行转换函数。

thenApplyAsync 方法的工作原理

  1. 异步执行thenApplyAsync 方法会将传入的函数提交到指定的线程池(如果未指定则使用 ForkJoinPool.commonPool())中异步执行。这意味着主线程不会阻塞等待转换函数的执行,而是继续执行后续的代码。
  2. 结果转换:当原始的 CompletableFuture 完成(无论是正常完成还是异常完成)时,thenApplyAsync 方法会获取其结果,并将这个结果作为参数传递给传入的转换函数 fn。转换函数 fn 会对这个结果进行处理,并返回一个新的结果。
  3. 返回新的 CompletableFuturethenApplyAsync 方法会返回一个新的 CompletableFuture,这个新的 CompletableFuture 的结果就是转换函数 fn 的返回值。如果转换函数执行过程中抛出异常,新的 CompletableFuture 会以这个异常为原因完成。

代码示例

基本示例

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureThenApplyAsyncExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1在线程: " + Thread.currentThread().getName() + " 执行");
            return "Hello";
        }).thenApplyAsync(result -> {
            System.out.println("转换任务在线程: " + Thread.currentThread().getName() + " 执行");
            return result + ", World!";
        }).thenAcceptAsync(finalResult -> {
            System.out.println("最终结果: " + finalResult + " 在线程: " + Thread.currentThread().getName() + " 处理");
        });

        // 主线程休眠,防止程序提前退出
        Thread.sleep(2000);
    }
}

在这个示例中:

  1. CompletableFuture.supplyAsync(() -> {... }) 创建了一个异步任务,在这个任务中,我们打印出当前执行线程的名称,并返回字符串 "Hello"
  2. thenApplyAsync(result -> {... }) 方法异步地对前一个 CompletableFuture 的结果进行转换。它在另一个线程中执行,将前一个任务返回的 "Hello" 字符串转换为 "Hello, World!"
  3. thenAcceptAsync(finalResult -> {... }) 方法异步地处理最终的结果,打印出最终结果和当前执行线程的名称。

通过运行这个示例,你可以看到不同任务在不同线程中异步执行的效果。

使用自定义 Executor

import java.util.concurrent.*;

public class CompletableFutureThenApplyAsyncWithExecutorExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1在自定义线程池线程: " + Thread.currentThread().getName() + " 执行");
            return 10;
        }).thenApplyAsync(result -> {
            System.out.println("转换任务在自定义线程池线程: " + Thread.currentThread().getName() + " 执行");
            return result * 2;
        }, executorService).thenAcceptAsync(finalResult -> {
            System.out.println("最终结果: " + finalResult + " 在自定义线程池线程: " + Thread.currentThread().getName() + " 处理");
        }, executorService);

        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在这个示例中:

  1. 我们创建了一个固定大小为2的线程池 executorService
  2. CompletableFuture.supplyAsync(() -> {... }) 创建了一个异步任务,返回整数 10,并在自定义线程池中的线程中执行。
  3. thenApplyAsync(result -> {... }, executorService) 方法使用我们自定义的线程池来异步执行转换函数,将前一个任务的结果乘以2。
  4. thenAcceptAsync(finalResult -> {... }, executorService) 方法同样使用自定义线程池来异步处理最终的结果。

最后,我们正确地关闭了线程池,确保所有任务都能执行完毕。

异常处理

thenApplyAsync 方法在执行过程中如果出现异常,会导致返回的新 CompletableFuture 以异常的形式完成。我们可以通过 exceptionally 方法来处理这些异常。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureThenApplyAsyncExceptionExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1在线程: " + Thread.currentThread().getName() + " 执行");
            if (Math.random() > 0.5) {
                throw new RuntimeException("任务1出现异常");
            }
            return "正常结果";
        }).thenApplyAsync(result -> {
            System.out.println("转换任务在线程: " + Thread.currentThread().getName() + " 执行");
            return result + " 转换后";
        }).exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage() + " 在线程: " + Thread.currentThread().getName());
            return "默认结果";
        }).thenAcceptAsync(finalResult -> {
            System.out.println("最终结果: " + finalResult + " 在线程: " + Thread.currentThread().getName() + " 处理");
        });

        // 主线程休眠,防止程序提前退出
        Thread.sleep(2000);
    }
}

在这个示例中:

  1. CompletableFuture.supplyAsync(() -> {... }) 创建的异步任务有一定概率抛出异常。
  2. thenApplyAsync(result -> {... }) 方法对结果进行转换,如果前一个任务抛出异常,这个方法不会执行。
  3. exceptionally(ex -> {... }) 方法捕获前一个 CompletableFuture 中抛出的异常,并返回一个默认结果。
  4. thenAcceptAsync(finalResult -> {... }) 方法处理最终结果,无论这个结果是正常的还是由异常处理返回的默认结果。

与 thenApply 的区别

thenApply 方法和 thenApplyAsync 方法都用于对 CompletableFuture 的结果进行转换,但它们之间有一个重要的区别:执行方式。

  1. thenApplythenApply 方法会在调用它的 CompletableFuture 完成的线程中同步执行转换函数。这意味着如果前一个 CompletableFuture 是在主线程中完成的,那么 thenApply 中的转换函数也会在主线程中执行。如果前一个 CompletableFuture 是在一个异步线程中完成的,那么 thenApply 中的转换函数也会在这个异步线程中执行。
  2. thenApplyAsyncthenApplyAsync 方法会将转换函数提交到指定的线程池(如果未指定则使用 ForkJoinPool.commonPool())中异步执行。这意味着无论前一个 CompletableFuture 是在哪个线程中完成的,thenApplyAsync 中的转换函数都会在另一个线程中执行。

例如,以下代码展示了两者的区别:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyVsThenApplyAsyncExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 使用 thenApply
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1在线程: " + Thread.currentThread().getName() + " 执行");
            return "Hello";
        }).thenApply(result -> {
            System.out.println("thenApply转换任务在线程: " + Thread.currentThread().getName() + " 执行");
            return result + ", World!";
        }).thenAccept(finalResult -> {
            System.out.println("thenApply最终结果: " + finalResult + " 在线程: " + Thread.currentThread().getName() + " 处理");
        });

        // 使用 thenApplyAsync
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2在线程: " + Thread.currentThread().getName() + " 执行");
            return "Hello";
        }).thenApplyAsync(result -> {
            System.out.println("thenApplyAsync转换任务在线程: " + Thread.currentThread().getName() + " 执行");
            return result + ", World!";
        }).thenAcceptAsync(finalResult -> {
            System.out.println("thenApplyAsync最终结果: " + finalResult + " 在线程: " + Thread.currentThread().getName() + " 处理");
        });

        // 主线程休眠,防止程序提前退出
        Thread.sleep(2000);
    }
}

在这个示例中,thenApply 中的转换函数会在 supplyAsync 任务完成的线程中执行,而 thenApplyAsync 中的转换函数会在另一个线程(来自 ForkJoinPool.commonPool())中执行。

应用场景

  1. 复杂计算结果的异步转换:在一些需要进行复杂计算的异步任务中,计算完成后可能需要对结果进行进一步的转换。例如,从数据库中异步获取用户数据后,可能需要将用户数据转换为特定的业务对象格式,这个转换过程可以使用 thenApplyAsync 异步执行,避免阻塞主线程。
  2. 链式异步操作:当有多个异步操作需要依次执行,并且每个操作的结果依赖于前一个操作的结果时,可以使用 thenApplyAsync 方法将这些操作链接起来。这样可以确保每个操作都是异步执行的,提高整体的执行效率。
  3. 与其他异步框架集成:在一些大型项目中,可能会使用多种异步框架。CompletableFuturethenApplyAsync 方法可以方便地与其他异步框架集成,例如在响应式编程中,将 CompletableFuture 的结果转换为响应式流中的元素。

注意事项

  1. 线程池的选择:如果使用 thenApplyAsync 方法时不指定 Executor,默认会使用 ForkJoinPool.commonPool()。这个线程池是一个共享的线程池,可能会被其他异步任务共享。在高并发场景下,如果任务执行时间较长,可能会导致线程池饱和,影响其他任务的执行。因此,在需要时,应该考虑使用自定义的线程池。
  2. 异常处理:在使用 thenApplyAsync 方法时,要注意正确处理可能出现的异常。由于 thenApplyAsync 是异步执行的,如果不使用 exceptionally 等方法处理异常,异常可能会被忽略,导致程序出现难以调试的问题。
  3. 内存泄漏风险:如果在 thenApplyAsync 中使用了资源(如文件句柄、数据库连接等),要确保在任务完成后正确释放这些资源,否则可能会导致内存泄漏。

结合其他 CompletableFuture 方法

thenApplyAsync 方法可以与其他 CompletableFuture 方法结合使用,以实现更复杂的异步操作。

  1. 与 thenComposeAsync 结合thenComposeAsync 方法用于将一个返回 CompletableFuture 的函数应用到前一个 CompletableFuture 的结果上,并返回一个新的 CompletableFuture。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyAsyncWithThenComposeAsyncExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1在线程: " + Thread.currentThread().getName() + " 执行");
            return "Hello";
        }).thenApplyAsync(result -> {
            System.out.println("转换任务1在线程: " + Thread.currentThread().getName() + " 执行");
            return result + ", World!";
        }).thenComposeAsync(finalResult -> CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2在线程: " + Thread.currentThread().getName() + " 执行");
            return finalResult + " 进一步处理";
        })).thenAcceptAsync(finalResult2 -> {
            System.out.println("最终结果: " + finalResult2 + " 在线程: " + Thread.currentThread().getName() + " 处理");
        });

        // 主线程休眠,防止程序提前退出
        Thread.sleep(2000);
    }
}

在这个示例中,thenApplyAsync 先对第一个任务的结果进行转换,然后 thenComposeAsync 将转换后的结果作为参数传递给另一个返回 CompletableFuture 的函数,实现了更复杂的异步操作链。 2. 与 allOf 和 anyOf 结合allOf 方法用于等待所有给定的 CompletableFuture 完成,anyOf 方法用于等待任意一个给定的 CompletableFuture 完成。可以将 thenApplyAsync 与它们结合使用,以处理多个异步任务的结果。例如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyAsyncWithAllOfExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1在线程: " + Thread.currentThread().getName() + " 执行");
            return "结果1";
        }).thenApplyAsync(result -> {
            System.out.println("转换任务1在线程: " + Thread.currentThread().getName() + " 执行");
            return result + " 转换后";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2在线程: " + Thread.currentThread().getName() + " 执行");
            return "结果2";
        }).thenApplyAsync(result -> {
            System.out.println("转换任务2在线程: " + Thread.currentThread().getName() + " 执行");
            return result + " 转换后";
        });

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

        allFutures.thenRunAsync(() -> {
            try {
                System.out.println("任务1最终结果: " + future1.get());
                System.out.println("任务2最终结果: " + future2.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });

        // 主线程休眠,防止程序提前退出
        Thread.sleep(2000);
    }
}

在这个示例中,thenApplyAsync 分别对两个异步任务的结果进行转换,然后使用 allOf 方法等待两个任务都完成,并在所有任务完成后处理它们的最终结果。

性能优化

  1. 合理使用线程池:如前文所述,选择合适的线程池对于性能至关重要。对于计算密集型任务,可以使用固定大小的线程池,其大小可以根据CPU核心数来设置。对于I/O密集型任务,可以适当增加线程池的大小,以充分利用系统资源。
  2. 减少不必要的异步操作:虽然异步编程可以提高程序的响应性,但过多的异步操作也会带来线程切换的开销。在设计异步任务时,要确保每个异步操作都是必要的,避免将一些简单的、执行时间短的操作也异步化。
  3. 优化转换函数thenApplyAsync 中的转换函数应该尽量简洁高效。避免在转换函数中进行复杂的、耗时的操作,如果确实需要进行复杂操作,可以考虑将其进一步分解为多个异步任务,或者使用更高效的数据结构和算法。

总结

CompletableFuturethenApplyAsync 方法是Java异步编程中的一个重要工具,它允许我们对异步任务的结果进行异步转换,从而实现更灵活和高效的异步操作。通过合理使用 thenApplyAsync 方法,结合其他 CompletableFuture 方法,以及注意线程池的选择和异常处理等方面,我们可以编写出性能优异、健壮的异步程序。无论是在开发大型分布式系统还是小型的高性能应用程序中,掌握 thenApplyAsync 方法的使用都是非常有价值的。在实际应用中,要根据具体的业务需求和场景,灵活运用 thenApplyAsync 方法,以达到最佳的编程效果。同时,不断优化异步任务的性能和资源管理,确保程序在高并发环境下的稳定性和高效性。希望通过本文的介绍和示例,读者能够对 CompletableFuturethenApplyAsync 方法有更深入的理解和掌握,从而在Java异步编程中能够更加得心应手。

以上内容包含了对 CompletableFuturethenApplyAsync 方法的详细介绍、工作原理、代码示例、异常处理、与其他方法的区别、应用场景、注意事项、与其他方法结合以及性能优化等方面,相信能够满足你对该技术点深入学习的需求。如果还有其他相关问题,欢迎随时提问。