MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture thenApply实现数据转换的细节

2023-01-133.4k 阅读

Java CompletableFuture thenApply 实现数据转换的细节

一、CompletableFuture 概述

在Java的并发编程领域,CompletableFuture 是Java 8引入的一个强大工具,用于异步计算。它提供了一种灵活的方式来处理异步操作的结果,支持链式调用、组合多个异步操作等特性。CompletableFuture 实现了 Future 接口和 CompletionStage 接口,这使得它既可以像传统的 Future 那样获取异步操作的结果,又能基于 CompletionStage 进行更复杂的异步操作编排。

CompletableFuture 的设计理念是将异步操作的结果作为一种可传递和处理的值。通过这种方式,我们可以在异步操作完成后,以一种类似于流水线的方式对结果进行一系列的处理,而不需要手动管理线程和锁。

二、thenApply 方法的基本概念

thenApplyCompletableFuture 类中定义在 CompletionStage 接口上的方法之一。它的作用是在当前 CompletableFuture 完成后,对其结果应用一个函数,并返回一个新的 CompletableFuture,新的 CompletableFuture 的结果就是应用函数后的返回值。

其方法签名如下:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);

这里,T 是当前 CompletableFuture 的结果类型,U 是应用函数 fn 后返回的结果类型。Function 是Java 8函数式接口,它接受一个参数并返回一个结果。

三、thenApply 的简单示例

为了更好地理解 thenApply 的工作原理,我们来看一个简单的代码示例。假设我们有一个异步任务,它返回一个整数,然后我们使用 thenApply 将这个整数乘以2。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 5);
        CompletableFuture<Integer> resultFuture = future.thenApply(num -> num * 2);
        Integer result = resultFuture.get();
        System.out.println("Result: " + result);
    }
}

在上述代码中:

  1. CompletableFuture.supplyAsync(() -> 5) 创建了一个异步任务,该任务返回整数 5。这里 supplyAsync 方法接受一个 Supplier 函数式接口,Supplier 不接受参数并返回一个结果。
  2. future.thenApply(num -> num * 2)future 的结果应用函数 num -> num * 2。这个函数将输入的整数乘以2,并返回一个新的 CompletableFuture<Integer>,其结果就是乘法运算后的结果。
  3. resultFuture.get() 获取最终的结果,并打印出来。

四、thenApply 的执行机制

  1. 异步任务的完成触发
    • CompletableFuture 表示的异步任务完成时(无论是正常完成还是异常完成),thenApply 注册的函数才会被执行。如果异步任务尚未完成,thenApply 注册的函数会被暂存,直到任务完成。
    • 在内部,CompletableFuture 使用一种基于事件驱动的机制。当异步任务的计算结果准备好后,会触发一系列的回调操作,其中就包括执行 thenApply 注册的函数。
  2. 函数的执行线程
    • 如果 CompletableFuture 是由 supplyAsync 等异步方法创建的,thenApply 注册的函数默认会在同一个 ForkJoinPool.commonPool() 线程池中执行。
    • 例如在前面的例子中,supplyAsync 创建的异步任务和 thenApply 应用的函数都在 ForkJoinPool.commonPool() 的线程中执行。这是因为 CompletableFuture 的设计理念是尽可能高效地利用线程资源,避免线程上下文切换带来的开销。
    • 不过,如果 CompletableFuture 是通过 CompletableFuture.completedFuture 等方法创建的已完成的 CompletableFuturethenApply 注册的函数会在调用 thenApply 的线程中立即执行。

五、thenApply 与异常处理

  1. 异步任务异常时的行为
    • 如果 CompletableFuture 表示的异步任务在执行过程中发生异常,thenApply 注册的函数将不会被执行。
    • 例如,修改前面的示例,让异步任务抛出异常:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyExceptionExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Async task error");
        });
        CompletableFuture<Integer> resultFuture = future.thenApply(num -> num * 2);
        try {
            Integer result = resultFuture.get();
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,supplyAsync 中的任务抛出了一个 RuntimeException。由于异常发生,thenApply 注册的函数 num -> num * 2 不会被执行。当调用 resultFuture.get() 时,会抛出 ExecutionException,其原因是内部的异步任务抛出了异常。 2. 异常处理与恢复

  • 可以使用 exceptionally 方法来处理 CompletableFuture 中的异常,并提供恢复逻辑。结合 thenApply,可以实现更健壮的异步操作流程。
  • 例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyExceptionRecoveryExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Async task error");
        });
        CompletableFuture<Integer> resultFuture = future
              .exceptionally(ex -> {
                    System.out.println("Caught exception: " + ex.getMessage());
                    return 0;
                })
              .thenApply(num -> num * 2);
        try {
            Integer result = resultFuture.get();
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,exceptionally 方法捕获了异步任务抛出的异常,并返回一个默认值 0。然后 thenApply 对这个默认值应用乘法函数,最终返回结果 0。这样就实现了在异步任务异常时的恢复和后续处理。

六、thenApply 的链式调用

  1. 多次数据转换
    • thenApply 支持链式调用,这使得我们可以对异步操作的结果进行多次数据转换。例如,假设我们有一个异步任务返回一个字符串,我们想先将其转换为整数,然后再将整数平方。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyChainingExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "5");
        CompletableFuture<Integer> intFuture = future.thenApply(Integer::parseInt);
        CompletableFuture<Integer> squaredFuture = intFuture.thenApply(num -> num * num);
        Integer result = squaredFuture.get();
        System.out.println("Result: " + result);
    }
}

在上述代码中:

  • future.thenApply(Integer::parseInt) 将字符串转换为整数。
  • intFuture.thenApply(num -> num * num) 对转换后的整数进行平方操作。
  1. 链式调用的执行顺序
    • 链式调用中的 thenApply 方法是按照调用顺序依次执行的。每个 thenApply 依赖于前一个 CompletableFuture 的结果。只有前一个 CompletableFuture 完成,下一个 thenApply 注册的函数才会被执行。
    • 这种链式调用的方式非常类似于函数式编程中的流水线操作,使得异步数据处理变得简洁和可读。

七、thenApply 与其他 CompletionStage 方法的组合使用

  1. 与 thenCompose 组合
    • thenCompose 方法也是 CompletionStage 接口的一部分,它与 thenApply 有一些相似之处,但也有重要区别。thenApply 返回的是一个新的 CompletableFuture,其结果是应用函数后的返回值,而 thenCompose 返回的 CompletableFuture 是由应用函数返回的 CompletableFuture 本身。
    • 例如,假设我们有一个方法返回一个 CompletableFuture<String>,我们想对这个字符串进行处理并返回一个新的 CompletableFuture<Integer>。如果使用 thenApply,会得到一个 CompletableFuture<CompletableFuture<Integer>>,而使用 thenCompose 可以直接得到 CompletableFuture<Integer>
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyThenComposeExample {
    private static CompletableFuture<String> getStringFuture() {
        return CompletableFuture.supplyAsync(() -> "5");
    }

    private static CompletableFuture<Integer> processString(String str) {
        return CompletableFuture.supplyAsync(() -> Integer.parseInt(str) * 2);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> resultFuture1 = getStringFuture()
              .thenApply(ThenApplyThenComposeExample::processString);
        CompletableFuture<Integer> resultFuture2 = getStringFuture()
              .thenCompose(ThenApplyThenComposeExample::processString);
        try {
            // 这里会抛出异常,因为 resultFuture1 是 CompletableFuture<CompletableFuture<Integer>>
            Integer result1 = resultFuture1.get();
        } catch (ExecutionException e) {
            System.out.println("Exception with thenApply: " + e.getCause());
        }
        Integer result2 = resultFuture2.get();
        System.out.println("Result with thenCompose: " + result2);
    }
}

在这个例子中,thenApply 会导致返回类型嵌套,而 thenCompose 能正确地将两个 CompletableFuture 操作组合起来,得到期望的 CompletableFuture<Integer> 结果。 2. 与 thenAccept 组合

  • thenAccept 方法用于在 CompletableFuture 完成后执行一个消费操作,但不返回结果。可以与 thenApply 组合使用,先对结果进行转换,然后进行消费。
  • 例如:
import java.util.concurrent.CompletableFuture;

public class ThenApplyThenAcceptExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> 5)
              .thenApply(num -> num * 2)
              .thenAccept(System.out::println);
    }
}

在这个例子中,thenApply 将整数乘以2,thenAccept 打印出最终的结果。这种组合在需要对异步结果进行处理并执行一些副作用操作(如日志记录)时非常有用。

八、性能考虑

  1. 线程池使用
    • 如前所述,thenApply 注册的函数默认在 ForkJoinPool.commonPool() 线程池中执行。如果应用程序中有大量的异步任务和 thenApply 操作,可能会导致 ForkJoinPool.commonPool() 线程池的资源紧张。
    • 在这种情况下,可以考虑使用自定义的 Executor。例如:
import java.util.concurrent.*;

public class CustomExecutorThenApplyExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletableFuture.supplyAsync(() -> 5, executor)
              .thenApply(num -> num * 2, executor)
              .thenAccept(System.out::println, executor);
        executor.shutdown();
    }
}

在这个例子中,我们创建了一个固定大小为10的线程池,并将其作为参数传递给 supplyAsyncthenApplythenAccept 方法。这样可以更好地控制异步任务的执行线程资源。 2. 函数执行时间

  • 如果 thenApply 注册的函数执行时间较长,可能会影响整个异步操作链的性能。特别是在高并发环境下,长时间执行的函数可能会阻塞线程池中的线程,导致其他任务无法及时执行。
  • 对于这种情况,可以考虑将复杂的计算任务进一步拆分成更小的异步任务,或者使用异步I/O等技术来减少函数的执行时间。

九、实际应用场景

  1. 微服务间的数据转换
    • 在微服务架构中,一个微服务可能会调用另一个微服务获取数据,然后对获取的数据进行转换。例如,一个用户信息微服务返回用户的ID,另一个微服务根据用户ID获取用户详细信息并进行格式转换。
    • 可以使用 CompletableFuturethenApply 来实现这种异步的数据获取和转换。假设我们有两个模拟的微服务调用:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class MicroserviceDataTransformationExample {
    private static CompletableFuture<Integer> getUserID() {
        return CompletableFuture.supplyAsync(() -> 1);
    }

    private static CompletableFuture<String> getUserDetails(int id) {
        return CompletableFuture.supplyAsync(() -> "User details for ID " + id);
    }

    private static String formatUserDetails(String details) {
        return "Formatted: " + details;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> resultFuture = getUserID()
              .thenCompose(MicroserviceDataTransformationExample::getUserDetails)
              .thenApply(MicroserviceDataTransformationExample::formatUserDetails);
        String result = resultFuture.get();
        System.out.println(result);
    }
}

在这个例子中,getUserID 模拟获取用户ID的微服务调用,getUserDetails 模拟根据用户ID获取用户详细信息的微服务调用,formatUserDetails 对获取的用户详细信息进行格式转换。通过 thenComposethenApply 的组合,实现了微服务间异步数据的获取和转换。 2. 数据预处理和后处理

  • 在数据处理管道中,常常需要对数据进行预处理和后处理。例如,在读取文件数据后,先进行格式转换(预处理),然后进行计算,最后对计算结果进行格式化输出(后处理)。
  • 以下是一个简单的示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class DataPipelineExample {
    private static CompletableFuture<String> readFile() {
        return CompletableFuture.supplyAsync(() -> "5");
    }

    private static Integer parseData(String data) {
        return Integer.parseInt(data);
    }

    private static Integer processData(Integer num) {
        return num * 2;
    }

    private static String formatResult(Integer result) {
        return "Processed result: " + result;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> resultFuture = readFile()
              .thenApply(DataPipelineExample::parseData)
              .thenApply(DataPipelineExample::processData)
              .thenApply(DataPipelineExample::formatResult);
        String result = resultFuture.get();
        System.out.println(result);
    }
}

在这个例子中,readFile 模拟读取文件数据,parseData 进行数据预处理(将字符串转换为整数),processData 进行核心计算,formatResult 进行后处理(格式化结果)。通过 thenApply 的链式调用,构建了一个简单的数据处理管道。

通过深入理解 CompletableFuturethenApply 方法,我们可以在Java的异步编程中更灵活、高效地处理数据转换,构建复杂而健壮的异步应用程序。无论是在微服务架构、数据处理还是其他并发编程场景中,thenApply 都能发挥重要作用。同时,合理地使用线程池、处理异常以及与其他 CompletionStage 方法组合使用,能进一步提升应用程序的性能和可靠性。