MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture链式调用优化异步编程流程

2022-07-185.6k 阅读

Java CompletableFuture 链式调用基础

在 Java 的异步编程领域,CompletableFuture 是一个强大的工具。它允许我们以一种更简洁、更可控的方式编写异步代码。CompletableFuture 实现了 Future 接口和 CompletionStage 接口,这使得它既能够获取异步操作的结果(类似于传统的 Future),又能够在异步操作完成时执行后续的操作(通过 CompletionStage 提供的丰富方法)。

CompletableFuture 的创建

在开始链式调用之前,我们首先要创建 CompletableFuture 实例。CompletableFuture 提供了多种静态方法来创建实例。

  1. 使用 CompletableFuture.supplyAsync 创建有返回值的异步任务

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Hello, CompletableFuture!";
    });
    

    在上述代码中,supplyAsync 方法接收一个 Supplier 接口的实现,它会在一个新的线程中执行 Supplierget 方法,并返回一个 CompletableFuture,其结果就是 Supplier 的返回值。

  2. 使用 CompletableFuture.runAsync 创建无返回值的异步任务

    CompletableFuture<Void> futureVoid = CompletableFuture.runAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("This is a void CompletableFuture task.");
    });
    

    runAsync 方法接收一个 Runnable 接口的实现,它同样会在新线程中执行 Runnablerun 方法,但返回的 CompletableFuture 的结果类型是 Void,因为 Runnable 本身没有返回值。

基本的链式调用方法

  1. thenApply thenApply 方法用于在 CompletableFuture 完成时,对其结果进行转换。它接收一个 Function 接口的实现,该 Function 的输入是前一个 CompletableFuture 的结果,输出是新的 CompletableFuture 的结果。

    CompletableFuture.supplyAsync(() -> "Hello")
     .thenApply(s -> s + ", World")
     .thenApply(String::toUpperCase)
     .thenAccept(System.out::println);
    

    在上述代码中,首先通过 supplyAsync 创建一个返回 "Hello" 的 CompletableFuture。然后使用 thenApply 将结果转换为 "Hello, World",接着再次使用 thenApply 将其转换为大写的 "HELLO, WORLD",最后使用 thenAccept 打印结果。thenAccept 方法接收一个 Consumer 接口的实现,用于在 CompletableFuture 完成时消费其结果,但不返回新的 CompletableFuture

  2. thenCompose thenCompose 方法与 thenApply 类似,但它接收的 Function 返回的是另一个 CompletableFuture。这在需要进行异步操作的链式调用时非常有用。

    CompletableFuture.supplyAsync(() -> 10)
     .thenCompose(i -> CompletableFuture.supplyAsync(() -> i * 2))
     .thenAccept(System.out::println);
    

    这里首先创建一个返回 10 的 CompletableFuture,然后通过 thenCompose,将 10 作为输入,创建并返回一个新的 CompletableFuture,其结果是 10 乘以 2,即 20。最后使用 thenAccept 打印结果。如果使用 thenApply 代替 thenCompose,返回的将是一个 CompletableFuture<CompletableFuture<Integer>>,而不是直接的 CompletableFuture<Integer>,这会使后续处理变得复杂。

  3. thenCombine thenCombine 方法用于将两个 CompletableFuture 的结果进行合并。它接收另一个 CompletableFuture 和一个 BiFunctionBiFunction 的两个输入分别是两个 CompletableFuture 的结果,输出是新的 CompletableFuture 的结果。

    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 3);
    future1.thenCombine(future2, (a, b) -> a + b)
     .thenAccept(System.out::println);
    

    在上述代码中,future1 返回 5,future2 返回 3。通过 thenCombine,将两个结果相加得到 8,并打印出来。

CompletableFuture 链式调用的高级应用

处理异常

在异步编程中,异常处理是至关重要的。CompletableFuture 提供了多种方法来优雅地处理异常。

  1. exceptionally exceptionally 方法用于在 CompletableFuture 发生异常时,提供一个默认值或进行异常处理。它接收一个 Function,该 Function 的输入是异常,输出是一个替代值。

    CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("Simulated exception");
        }
        return "Success";
    }).exceptionally(ex -> {
        System.out.println("Caught exception: " + ex.getMessage());
        return "Default value";
    }).thenAccept(System.out::println);
    

    在上述代码中,有 50% 的概率会抛出一个运行时异常。通过 exceptionally,当异常发生时,会捕获异常并打印异常信息,同时返回一个默认值 "Default value" 并打印。

  2. handle handle 方法可以同时处理正常结果和异常情况。它接收一个 BiFunction,第一个参数是正常结果(如果没有异常),第二个参数是异常(如果有异常)。

    CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("Simulated exception");
        }
        return "Success";
    }).handle((result, ex) -> {
        if (ex != null) {
            System.out.println("Caught exception: " + ex.getMessage());
            return "Default value";
        }
        return result;
    }).thenAccept(System.out::println);
    

    这里 handle 方法在异常发生时进行处理并返回默认值,在没有异常时直接返回正常结果。与 exceptionally 相比,handle 提供了更全面的处理方式,因为它可以同时处理正常和异常情况。

并行异步操作

  1. allOf allOf 方法用于等待多个 CompletableFuture 都完成。它接收多个 CompletableFuture 作为参数,并返回一个新的 CompletableFuture<Void>,当所有传入的 CompletableFuture 都完成时,这个新的 CompletableFuture 才完成。

    CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "A";
    });
    CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "B";
    });
    CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureA, futureB);
    allFuture.thenRun(() -> {
        try {
            System.out.println("All futures completed. Result of A: " + futureA.get());
            System.out.println("Result of B: " + futureB.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }).join();
    

    在上述代码中,futureAfutureB 是两个异步任务,分别模拟 2 秒和 1 秒的耗时操作。通过 allOf 创建一个新的 CompletableFuture<Void>,当 futureAfutureB 都完成时,allFuture 才完成。然后通过 thenRun 在所有任务完成后打印结果。

  2. anyOf anyOf 方法与 allOf 相反,它等待多个 CompletableFuture 中的任意一个完成。它接收多个 CompletableFuture 作为参数,并返回一个新的 CompletableFuture,这个新的 CompletableFuture 的结果就是第一个完成的 CompletableFuture 的结果。

    CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "C";
    });
    CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "D";
    });
    CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futureC, futureD);
    anyFuture.thenAccept(result -> System.out.println("First completed result: " + result)).join();
    

    这里 futureCfutureD 是两个异步任务,futureD 会先完成。通过 anyOf 创建的 anyFuture 会在 futureD 完成时就完成,其结果就是 futureD 的结果 "D",并打印出来。

CompletableFuture 链式调用的性能优化

线程池的合理使用

默认情况下,CompletableFuture 的异步操作使用 ForkJoinPool.commonPool() 线程池。然而,在某些场景下,这可能不是最优的选择。

  1. 自定义线程池 当我们有大量的异步任务,并且这些任务的性质不同(例如 I/O 密集型和 CPU 密集型任务混合)时,使用自定义线程池可以提高性能。

    ExecutorService executor = Executors.newFixedThreadPool(10);
    CompletableFuture.supplyAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task result";
    }, executor)
     .thenApply(s -> s + " processed")
     .thenAccept(System.out::println);
    executor.shutdown();
    

    在上述代码中,我们创建了一个固定大小为 10 的线程池 executor。通过 supplyAsync 的第二个参数将异步任务提交到这个自定义线程池。这样可以根据任务的特点合理分配线程资源,避免 ForkJoinPool.commonPool() 可能出现的线程饥饿等问题。

  2. 线程池大小的调整 线程池大小的选择需要根据任务的类型和系统资源来决定。对于 CPU 密集型任务,线程池大小一般设置为 CPU 核心数加 1,以充分利用 CPU 资源并考虑到可能的线程上下文切换开销。对于 I/O 密集型任务,线程池大小可以设置得较大,因为 I/O 操作会使线程处于等待状态,不会占用 CPU 资源,更多的线程可以提高整体的吞吐量。

    int cpuCoreCount = Runtime.getRuntime().availableProcessors();
    ExecutorService cpuIntensiveExecutor = Executors.newFixedThreadPool(cpuCoreCount + 1);
    ExecutorService ioIntensiveExecutor = Executors.newFixedThreadPool(cpuCoreCount * 2);
    

    在实际应用中,可以根据任务的类型选择合适的线程池来执行 CompletableFuture 的异步任务。

减少不必要的链式调用

虽然 CompletableFuture 的链式调用提供了简洁的编程方式,但过多的不必要链式调用可能会导致性能问题。

  1. 合并操作 如果多个 thenApply 操作只是简单的转换,并且可以合并为一个操作,那么应该进行合并。

    // 原始方式
    CompletableFuture.supplyAsync(() -> "hello")
     .thenApply(String::toUpperCase)
     .thenApply(s -> s + " WORLD")
     .thenApply(String::trim)
     .thenAccept(System.out::println);
    // 优化方式
    CompletableFuture.supplyAsync(() -> "hello")
     .thenApply(s -> {
         s = s.toUpperCase();
         s = s + " WORLD";
         return s.trim();
     })
     .thenAccept(System.out::println);
    

    在上述代码中,优化后的方式将多个 thenApply 操作合并为一个,减少了中间 CompletableFuture 的创建和不必要的线程切换,从而提高了性能。

  2. 避免过度嵌套 过度嵌套的 CompletableFuture 链式调用可能会使代码难以理解和维护,同时也可能影响性能。例如,在多层嵌套的 thenApplythenCompose 中,如果可以通过其他方式简化,应该进行优化。

    // 过度嵌套
    CompletableFuture.supplyAsync(() -> 1)
     .thenApply(i -> i * 2)
     .thenCompose(j -> CompletableFuture.supplyAsync(() -> j + 3))
     .thenApply(k -> k * 4)
     .thenCompose(l -> CompletableFuture.supplyAsync(() -> l - 5))
     .thenAccept(System.out::println);
    // 优化方式
    CompletableFuture.supplyAsync(() -> {
        int result = 1;
        result = result * 2;
        result = result + 3;
        result = result * 4;
        result = result - 5;
        return result;
    }).thenAccept(System.out::println);
    

    优化后的方式将复杂的嵌套操作合并为一个异步任务,减少了中间的异步操作和线程切换,提高了性能和代码的可读性。

CompletableFuture 链式调用在实际项目中的应用场景

微服务间的异步调用

在微服务架构中,经常需要调用多个微服务接口,并对结果进行组合或处理。CompletableFuture 的链式调用可以很好地满足这种需求。

  1. 聚合数据 假设我们有一个电商系统,需要从商品微服务获取商品信息,从库存微服务获取库存信息,并将两者聚合展示给用户。

    CompletableFuture<Product> productFuture = CompletableFuture.supplyAsync(() -> {
        // 调用商品微服务获取商品信息
        return getProductFromMicroservice();
    });
    CompletableFuture<Integer> stockFuture = CompletableFuture.supplyAsync(() -> {
        // 调用库存微服务获取库存信息
        return getStockFromMicroservice();
    });
    productFuture.thenCombine(stockFuture, (product, stock) -> {
        product.setStock(stock);
        return product;
    }).thenAccept(productWithStock -> {
        // 展示给用户
        displayProduct(productWithStock);
    });
    

    在上述代码中,通过 CompletableFuture 分别异步调用商品微服务和库存微服务,然后使用 thenCombine 将两个结果聚合,最后展示给用户。

  2. 级联调用 有时候微服务间的调用是级联的,例如订单微服务调用支付微服务,支付微服务调用银行接口。

    CompletableFuture<PaymentResult> paymentFuture = CompletableFuture.supplyAsync(() -> {
        // 调用支付微服务
        return callPaymentMicroservice();
    });
    paymentFuture.thenCompose(paymentResult -> CompletableFuture.supplyAsync(() -> {
        if (paymentResult.isSuccess()) {
            // 调用银行接口确认支付
            return callBankInterface(paymentResult);
        }
        return new BankResponse(false, "Payment failed");
    })).thenAccept(bankResponse -> {
        // 处理银行响应
        handleBankResponse(bankResponse);
    });
    

    这里通过 thenCompose 实现了级联调用,只有当支付微服务调用成功时才会调用银行接口。

大数据处理

在大数据处理场景中,经常需要对大量数据进行异步处理和结果合并。

  1. 并行计算 假设有一个任务是对一个大数据集进行并行计算,例如计算每个数据的平方并求和。

    List<Integer> dataList = Arrays.asList(1, 2, 3, 4, 5);
    List<CompletableFuture<Integer>> futureList = new ArrayList<>();
    for (int data : dataList) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> data * data);
        futureList.add(future);
    }
    CompletableFuture<Integer> sumFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
     .thenApply(v -> futureList.stream()
        .mapToInt(f -> {
             try {
                 return f.get();
             } catch (InterruptedException | ExecutionException e) {
                 e.printStackTrace();
                 return 0;
             }
         })
        .sum());
    sumFuture.thenAccept(sum -> System.out.println("Sum of squares: " + sum)).join();
    

    在上述代码中,首先为每个数据创建一个异步计算平方的 CompletableFuture,并添加到列表中。然后使用 allOf 等待所有计算完成,接着通过 thenApply 计算所有平方值的总和。

  2. 数据转换和合并 对于从不同数据源获取的数据进行转换和合并,CompletableFuture 链式调用也非常有用。

    CompletableFuture<List<String>> dataSource1Future = CompletableFuture.supplyAsync(() -> {
        // 从数据源 1 获取数据
        return getDataSource1();
    });
    CompletableFuture<List<String>> dataSource2Future = CompletableFuture.supplyAsync(() -> {
        // 从数据源 2 获取数据
        return getDataSource2();
    });
    dataSource1Future.thenCombine(dataSource2Future, (list1, list2) -> {
        List<String> combinedList = new ArrayList<>(list1);
        combinedList.addAll(list2);
        return combinedList;
    }).thenApply(CompletableFutureChain::transformData)
     .thenAccept(transformedData -> {
         // 处理转换后的数据
         processTransformedData(transformedData);
     });
    

    这里通过 thenCombine 将两个数据源的数据合并,然后通过 thenApply 对合并后的数据进行转换,最后处理转换后的数据。

通过以上对 CompletableFuture 链式调用的基础、高级应用、性能优化以及实际应用场景的详细介绍,相信你对如何利用 CompletableFuture 优化异步编程流程有了更深入的理解和掌握。在实际项目中,合理运用 CompletableFuture 的链式调用,可以提高代码的可读性、可维护性以及系统的性能和响应速度。