MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture的链式调用与组合

2024-03-231.3k 阅读

Java CompletableFuture的链式调用

在Java中,CompletableFuture 提供了强大的异步编程能力,链式调用是其核心特性之一。它允许我们以一种流畅的方式编写异步操作序列,使得代码更加简洁和易读。

基本链式调用

假设我们有一个简单的异步任务,例如从网络获取数据。通常,我们可能会这样写:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureChainExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时操作,比如从网络获取数据
            return "Data from network";
        });

        CompletableFuture<String> processedFuture = future.thenApply(data -> {
            // 对获取到的数据进行处理
            return "Processed: " + data;
        });

        String result = processedFuture.get();
        System.out.println(result);
    }
}

在这个例子中,CompletableFuture.supplyAsync 方法创建了一个异步任务,该任务返回一个 CompletableFuture 对象。thenApply 方法则在异步任务完成后,对其结果进行处理。thenApply 方法接受一个 Function 作为参数,该 Function 会将前一个 CompletableFuture 的结果作为输入,并返回一个新的结果。

链式调用的优势

链式调用的主要优势在于代码的可读性和维护性。传统的异步编程,尤其是使用回调函数时,代码可能会变得非常复杂,出现“回调地狱”的情况。例如,考虑以下传统回调风格的代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CallbackExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            // 第一个异步任务
            String data = "Data from network";
            executor.submit(() -> {
                // 基于第一个任务结果的第二个异步任务
                String processedData = "Processed: " + data;
                System.out.println(processedData);
            });
        });
        executor.shutdown();
    }
}

可以看到,随着异步任务的增加,代码变得越来越难以阅读和维护。而使用 CompletableFuture 的链式调用,代码结构更加清晰,每个异步步骤一目了然。

多步链式调用

CompletableFuture 支持多个步骤的链式调用。假设我们有一个更复杂的场景,需要对获取的数据进行多次处理:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class MultiStepChainExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Data from network";
        })
       .thenApply(data -> {
            return "Step 1: " + data;
        })
       .thenApply(data -> {
            return "Step 2: " + data;
        })
       .thenApply(data -> {
            return "Step 3: " + data;
        });

        String result = future.get();
        System.out.println(result);
    }
}

在这个例子中,我们依次对数据进行了三次处理,每个 thenApply 方法都基于前一个方法的结果进行操作。这种链式调用使得整个异步处理流程非常清晰。

处理异常

在链式调用中,异常处理是非常重要的。CompletableFuture 提供了 exceptionally 方法来处理异步任务中的异常。例如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ExceptionHandlingInChainExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("Simulated exception");
            }
            return "Data from network";
        })
       .thenApply(data -> {
            return "Processed: " + data;
        })
       .exceptionally(ex -> {
            System.out.println("Caught exception: " + ex.getMessage());
            return "Default value";
        });

        String result = future.get();
        System.out.println(result);
    }
}

在这个例子中,如果 supplyAsync 中的异步任务抛出异常,exceptionally 方法会捕获该异常,并返回一个默认值。这样可以确保即使在异步任务出现异常的情况下,程序也能继续运行,而不会崩溃。

Java CompletableFuture的组合

除了链式调用,CompletableFuture 还支持任务的组合。这意味着我们可以将多个异步任务组合成一个更复杂的任务,以满足不同的业务需求。

组合多个独立任务

假设我们有两个独立的异步任务,例如从不同的数据源获取数据,然后将这两个数据进行合并。我们可以使用 CompletableFuturethenCombine 方法来实现:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CombineIndependentTasksExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            // 模拟从第一个数据源获取数据
            return "Data from source 1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            // 模拟从第二个数据源获取数据
            return "Data from source 2";
        });

        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (data1, data2) -> {
            return data1 + " and " + data2;
        });

        String result = combinedFuture.get();
        System.out.println(result);
    }
}

在这个例子中,thenCombine 方法接受两个 CompletableFuture 对象和一个 BiFunction。当这两个 CompletableFuture 都完成时,BiFunction 会将它们的结果作为参数,并返回一个合并后的结果。

按顺序组合任务

有时候,我们需要按照特定的顺序执行多个异步任务,即使这些任务在逻辑上是独立的。例如,我们可能需要先获取用户信息,然后根据用户信息获取用户的订单列表。可以使用 thenCompose 方法来实现这种按顺序的组合:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class User {
    private String name;

    public User(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

class Order {
    private String orderInfo;

    public Order(String orderInfo) {
        this.orderInfo = orderInfo;
    }

    public String getOrderInfo() {
        return orderInfo;
    }
}

public class SequentialCompositionExample {
    public static CompletableFuture<User> getUser() {
        return CompletableFuture.supplyAsync(() -> {
            return new User("John");
        });
    }

    public static CompletableFuture<Order> getOrder(User user) {
        return CompletableFuture.supplyAsync(() -> {
            return new Order("Order for " + user.getName());
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Order> future = getUser()
       .thenCompose(user -> getOrder(user));

        Order result = future.get();
        System.out.println(result.getOrderInfo());
    }
}

在这个例子中,thenCompose 方法会等待第一个 CompletableFuturegetUser)完成,然后将其结果作为参数传递给第二个 CompletableFuturegetOrder)。这样就确保了任务按照顺序执行。

并行执行任务并等待所有完成

如果我们有多个独立的异步任务,并且希望它们并行执行,然后等待所有任务都完成后再进行下一步操作,可以使用 CompletableFutureallOf 方法。例如,我们需要从多个数据库表中获取数据,然后进行汇总:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AllOfExample {
    public static CompletableFuture<Integer> getDataFromTable1() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从表1获取数据
            return 10;
        });
    }

    public static CompletableFuture<Integer> getDataFromTable2() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从表2获取数据
            return 20;
        });
    }

    public static CompletableFuture<Integer> getDataFromTable3() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从表3获取数据
            return 30;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture[] futures = {
            getDataFromTable1(),
            getDataFromTable2(),
            getDataFromTable3()
        };

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures);

        allFutures.join();

        int total = 0;
        for (CompletableFuture<Integer> future : futures) {
            total += future.get();
        }

        System.out.println("Total: " + total);
    }
}

在这个例子中,allOf 方法接受一个 CompletableFuture 数组,并返回一个新的 CompletableFuture。当所有传入的 CompletableFuture 都完成时,这个新的 CompletableFuture 才会完成。我们可以通过调用 join 方法等待所有任务完成,然后再处理它们的结果。

任意一个任务完成即返回

有时候,我们只关心多个异步任务中任意一个任务的完成情况。例如,我们向多个服务器发送请求,只要有一个服务器响应,我们就可以停止等待。可以使用 CompletableFutureanyOf 方法来实现:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AnyOfExample {
    public static CompletableFuture<String> sendRequestToServer1() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Response from server 1";
        });
    }

    public static CompletableFuture<String> sendRequestToServer2() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Response from server 2";
        });
    }

    public static CompletableFuture<String> sendRequestToServer3() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Response from server 3";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture[] futures = {
            sendRequestToServer1(),
            sendRequestToServer2(),
            sendRequestToServer3()
        };

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futures);

        String result = (String) anyFuture.get();
        System.out.println(result);
    }
}

在这个例子中,anyOf 方法接受一个 CompletableFuture 数组,并返回一个新的 CompletableFuture。当任意一个传入的 CompletableFuture 完成时,这个新的 CompletableFuture 就会完成,并返回已完成任务的结果。

CompletableFuture链式调用与组合的高级应用

在实际应用中,CompletableFuture 的链式调用与组合可以用于更复杂的场景,例如分布式系统中的数据处理、微服务之间的交互等。

分布式数据处理

假设我们有一个分布式系统,需要从多个节点获取数据,然后对这些数据进行聚合和分析。可以使用 CompletableFuture 的组合功能来实现:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class DistributedDataProcessingExample {
    public static CompletableFuture<List<Integer>> getDataFromNode1() {
        return CompletableFuture.supplyAsync(() -> {
            List<Integer> data = new ArrayList<>();
            data.add(1);
            data.add(2);
            return data;
        });
    }

    public static CompletableFuture<List<Integer>> getDataFromNode2() {
        return CompletableFuture.supplyAsync(() -> {
            List<Integer> data = new ArrayList<>();
            data.add(3);
            data.add(4);
            return data;
        });
    }

    public static CompletableFuture<List<Integer>> getDataFromNode3() {
        return CompletableFuture.supplyAsync(() -> {
            List<Integer> data = new ArrayList<>();
            data.add(5);
            data.add(6);
            return data;
        });
    }

    public static CompletableFuture<Integer> aggregateData(List<List<Integer>> allData) {
        return CompletableFuture.supplyAsync(() -> {
            int sum = 0;
            for (List<Integer> dataList : allData) {
                for (int num : dataList) {
                    sum += num;
                }
            }
            return sum;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<List<Integer>> future1 = getDataFromNode1();
        CompletableFuture<List<Integer>> future2 = getDataFromNode2();
        CompletableFuture<List<Integer>> future3 = getDataFromNode3();

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

        CompletableFuture<List<List<Integer>>> combinedDataFuture = allFutures.thenApply(v -> {
            List<List<Integer>> combinedData = new ArrayList<>();
            try {
                combinedData.add(future1.get());
                combinedData.add(future2.get());
                combinedData.add(future3.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            return combinedData;
        });

        CompletableFuture<Integer> resultFuture = combinedDataFuture.thenCompose(combinedData -> aggregateData(combinedData));

        int result = resultFuture.get();
        System.out.println("Aggregated result: " + result);
    }
}

在这个例子中,我们从三个不同的节点获取数据,使用 allOf 方法等待所有数据获取完成,然后将这些数据组合起来进行聚合计算。

微服务交互

在微服务架构中,一个服务可能需要调用多个其他微服务来完成一个业务逻辑。例如,一个订单服务可能需要调用用户服务获取用户信息,调用库存服务检查库存,然后根据这些信息处理订单。可以使用 CompletableFuture 来管理这些微服务之间的异步调用:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class UserService {
    public static CompletableFuture<String> getUserInfo() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟调用用户服务获取用户信息
            return "User information";
        });
    }
}

class InventoryService {
    public static CompletableFuture<String> checkInventory() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟调用库存服务检查库存
            return "Inventory is available";
        });
    }
}

class OrderService {
    public static CompletableFuture<String> processOrder(String userInfo, String inventoryInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户信息和库存信息处理订单
            return "Order processed with " + userInfo + " and " + inventoryInfo;
        });
    }
}

public class MicroserviceInteractionExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> userFuture = UserService.getUserInfo();
        CompletableFuture<String> inventoryFuture = InventoryService.checkInventory();

        CompletableFuture<String> combinedFuture = userFuture.thenCombine(inventoryFuture, (userInfo, inventoryInfo) -> {
            return OrderService.processOrder(userInfo, inventoryInfo).join();
        });

        String result = combinedFuture.get();
        System.out.println(result);
    }
}

在这个例子中,订单服务通过 thenCombine 方法组合了用户服务和库存服务的结果,然后进行订单处理。

链式调用与组合中的线程管理

在使用 CompletableFuture 的链式调用和组合时,线程管理是一个重要的方面。默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 来执行异步任务。然而,在某些情况下,我们可能需要自定义线程池。

使用自定义线程池

假设我们有一个高负载的应用,需要为 CompletableFuture 任务分配专门的线程池,以避免与其他任务竞争资源。可以通过 supplyAsync 等方法的重载版本来指定线程池:

import java.util.concurrent.*;

public class CustomThreadPoolExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(5);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Data from async task";
        }, executor);

        CompletableFuture<String> processedFuture = future.thenApply(data -> {
            return "Processed: " + data;
        });

        String result = processedFuture.get();
        System.out.println(result);

        executor.shutdown();
    }
}

在这个例子中,我们创建了一个固定大小为5的线程池,并将其传递给 supplyAsync 方法。这样,异步任务将在我们自定义的线程池中执行。

线程池对链式调用和组合的影响

使用自定义线程池可以更好地控制任务的执行,特别是在高并发场景下。例如,在链式调用中,如果某个任务非常耗时,使用默认线程池可能会导致其他任务等待。而通过自定义线程池,可以为不同类型的任务分配不同的线程资源。

在任务组合时,例如使用 allOf 方法等待多个任务完成,自定义线程池可以确保这些任务在合理的资源下并行执行。如果不使用自定义线程池,可能会因为 ForkJoinPool.commonPool() 的线程数量限制,导致任务执行效率低下。

链式调用与组合中的性能优化

为了提高 CompletableFuture 链式调用和组合的性能,我们可以采取一些优化措施。

减少不必要的中间结果

在链式调用中,尽量避免创建过多不必要的中间结果。例如,如果某个 thenApply 方法只是对数据进行简单的转换,并且这个转换结果不会在其他地方使用,可以考虑将多个转换操作合并到一个 thenApply 方法中。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class OptimizeIntermediateResultsExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Original data";
        });

        // 优化前
        CompletableFuture<String> optimizedFuture1 = future
       .thenApply(data -> {
            return data + " - step1";
        })
       .thenApply(data -> {
            return data + " - step2";
        });

        // 优化后
        CompletableFuture<String> optimizedFuture2 = future
       .thenApply(data -> {
            return data + " - step1 - step2";
        });

        String result1 = optimizedFuture1.get();
        String result2 = optimizedFuture2.get();

        System.out.println(result1);
        System.out.println(result2);
    }
}

在这个例子中,优化后的代码减少了一个中间结果的创建,从而提高了性能。

合理使用并行任务

在任务组合中,合理使用并行任务可以显著提高性能。例如,在使用 allOf 方法时,如果任务之间没有依赖关系,应该确保它们并行执行。同时,要注意线程池的大小设置,避免因为线程过多导致上下文切换开销过大。

import java.util.concurrent.*;

public class ParallelTaskOptimizationExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }, executor);

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 2;
        }, executor);

        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 3;
        }, executor);

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

        allFutures.join();

        int total = 0;
        try {
            total = future1.get() + future2.get() + future3.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("Total: " + total);

        executor.shutdown();
    }
}

在这个例子中,我们通过自定义线程池,确保三个任务并行执行,提高了整体的执行效率。同时,要根据实际情况调整线程池的大小,以达到最佳的性能。

链式调用与组合在响应式编程中的应用

CompletableFuture 的链式调用和组合在响应式编程中也有重要的应用。响应式编程强调异步数据流和事件驱动,CompletableFuture 可以很好地与响应式编程框架(如 Reactor、RxJava)集成。

与 Reactor 集成

Reactor 是一个基于响应式流规范的 Java 框架。我们可以将 CompletableFuture 转换为 Reactor 的 MonoFlux,以便在 Reactor 中进行更复杂的响应式处理。

import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureToReactorExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Data from CompletableFuture";
        });

        Mono<String> mono = Mono.fromFuture(future);

        mono.subscribe(data -> {
            System.out.println("Received data from Mono: " + data);
        });
    }
}

在这个例子中,我们使用 Mono.fromFuture 方法将 CompletableFuture 转换为 Mono。然后可以使用 Mono 的各种操作符进行进一步的响应式处理。

与 RxJava 集成

RxJava 也是一个流行的响应式编程框架。同样,我们可以将 CompletableFuture 转换为 RxJava 的 ObservableSingle

import io.reactivex.Single;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureToRxJavaExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Data from CompletableFuture";
        });

        Single<String> single = Single.fromFuture(future);

        single.subscribe(data -> {
            System.out.println("Received data from Single: " + data);
        });
    }
}

通过这种集成,我们可以在 CompletableFuture 的基础上,利用响应式编程框架的强大功能,如操作符链、背压处理等,来构建更复杂和高效的异步应用程序。

通过深入理解 CompletableFuture 的链式调用与组合,以及合理应用线程管理、性能优化和与响应式编程框架的集成,我们可以在 Java 中构建出高效、可维护的异步应用程序。无论是处理分布式系统中的数据,还是管理微服务之间的交互,CompletableFuture 都提供了强大的工具来满足我们的需求。