MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 优化项目代码

2024-05-073.4k 阅读

Java 中 CompletableFuture 优化项目代码

CompletableFuture 基础介绍

在现代 Java 编程中,随着项目规模和复杂度的增长,异步编程变得愈发重要。CompletableFuture 作为 Java 8 引入的一个强大工具,极大地简化了异步编程模型,使得代码能够更加高效、简洁地处理异步任务。

CompletableFuture 实现了 Future 接口和 CompletionStage 接口。Future 接口是 Java 早期用于异步计算的机制,它允许我们发起一个异步任务,并在稍后获取任务的结果。然而,Future 存在一些局限性,比如在获取结果时可能会导致阻塞,并且缺乏对异步任务链、组合以及异常处理的便捷支持。而 CompletionStage 接口则定义了一系列方法,用于对异步计算进行编排,CompletableFuture 正是基于这些方法,提供了丰富的功能来处理异步任务。

创建 CompletableFuture 对象

  1. 使用 CompletableFuture.supplyAsync 创建有返回值的异步任务

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureExample {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                // 模拟一个耗时操作
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "异步任务执行完成";
            });
    
            String result = future.get();
            System.out.println(result);
        }
    }
    

    在上述代码中,CompletableFuture.supplyAsync 方法接受一个 Supplier 作为参数,这个 Supplier 会在一个新的线程中执行。方法返回一个 CompletableFuture 对象,通过调用 get 方法可以获取异步任务的执行结果。需要注意的是,get 方法会阻塞当前线程,直到异步任务完成。

  2. 使用 CompletableFuture.runAsync 创建无返回值的异步任务

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureExample2 {
        public static void main(String[] args) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                // 模拟一个耗时操作
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("无返回值的异步任务执行完成");
            });
        }
    }
    

    CompletableFuture.runAsync 方法接受一个 Runnable 作为参数,同样在新线程中执行任务,但不返回任何结果。这里返回的 CompletableFuture<Void> 只是用于表示任务的完成状态。

异步任务的链式调用

  1. thenApply 方法 thenApply 方法用于对异步任务的结果进行转换。它接受一个 Function 作为参数,该 Function 的输入是上一个异步任务的结果,输出是新的结果。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureChaining {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "初始结果";
            }).thenApply(result -> {
                return "转换后的结果: " + result;
            });
    
            String result = future.get();
            System.out.println(result);
        }
    }
    

    在上述代码中,supplyAsync 方法返回的 CompletableFuture 对象通过 thenApply 方法进行了结果转换。thenApply 方法中的 Function 接收 supplyAsync 方法返回的字符串,并在其前面添加了一段文本。

  2. thenAccept 方法 thenAccept 方法用于在异步任务完成后执行一个消费操作,它接受一个 Consumer 作为参数,这个 Consumer 会处理异步任务的结果,但不返回新的结果。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureChaining2 {
        public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务结果";
            }).thenAccept(result -> {
                System.out.println("处理结果: " + result);
            });
        }
    }
    

    这里 supplyAsync 方法返回的 CompletableFuture 对象通过 thenAccept 方法,使用 Consumer 打印出了异步任务的结果。

  3. thenRun 方法 thenRun 方法在异步任务完成后执行一个 Runnable,它不关心异步任务的结果。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureChaining3 {
        public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务完成";
            }).thenRun(() -> {
                System.out.println("不关心结果,只执行此操作");
            });
        }
    }
    

    在这个例子中,supplyAsync 完成后,thenRun 中的 Runnable 会被执行,而不会处理 supplyAsync 的返回结果。

异步任务的组合

  1. thenCombine 方法 thenCombine 方法用于将两个异步任务的结果进行合并。它接受另一个 CompletableFuture 和一个 BiFunction 作为参数,BiFunction 用于处理两个异步任务的结果并返回一个新的结果。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureCombination {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "结果1";
            });
    
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "结果2";
            });
    
            CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
                return result1 + " 和 " + result2;
            });
    
            String result = combinedFuture.get();
            System.out.println(result);
        }
    }
    

    在上述代码中,future1future2 是两个异步任务,thenCombine 方法将它们的结果合并成一个新的字符串。

  2. allOf 方法 allOf 方法用于等待所有给定的 CompletableFuture 都完成。它接受多个 CompletableFuture 作为参数,并返回一个新的 CompletableFuture<Void>。当所有传入的 CompletableFuture 都完成时,返回的 CompletableFuture 也完成。

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureAllOf {
        public static void main(String[] args) {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务1完成";
            });
    
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务2完成";
            });
    
            CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
            allOfFuture.join();
            System.out.println("所有任务都完成了");
        }
    }
    

    这里 allOf 方法等待 future1future2 都完成后,才继续执行后续的打印操作。

  3. anyOf 方法 anyOf 方法用于等待给定的 CompletableFuture 中任意一个完成。它接受多个 CompletableFuture 作为参数,并返回一个新的 CompletableFuture,这个新的 CompletableFuture 在任意一个传入的 CompletableFuture 完成时就完成,其结果是第一个完成的 CompletableFuture 的结果。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureAnyOf {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务1完成";
            });
    
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务2完成";
            });
    
            CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
            Object result = anyOfFuture.get();
            System.out.println("第一个完成的任务结果: " + result);
        }
    }
    

    在这个例子中,future2 会先完成,所以 anyOfFuture 获取到的结果就是 future2 的结果。

异常处理

  1. exceptionally 方法 exceptionally 方法用于处理异步任务中的异常。它接受一个 Function 作为参数,当异步任务抛出异常时,这个 Function 会被执行,参数为抛出的异常,返回值作为 CompletableFuture 的结果。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureExceptionHandling {
        public static void main(String[] args) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                if (Math.random() < 0.5) {
                    throw new RuntimeException("模拟异常");
                }
                return "正常结果";
            }).exceptionally(ex -> {
                System.out.println("捕获到异常: " + ex.getMessage());
                return "默认结果";
            });
    
            try {
                String result = future.get();
                System.out.println("最终结果: " + result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    

    在上述代码中,如果 supplyAsync 中的任务抛出异常,exceptionally 方法中的 Function 会处理异常,并返回一个默认结果。

  2. handle 方法 handle 方法既可以处理正常的结果,也可以处理异常。它接受一个 BiFunction 作为参数,第一个参数是异步任务的结果(如果任务正常完成),第二个参数是抛出的异常(如果任务异常)。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureHandle {
        public static void main(String[] args) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                if (Math.random() < 0.5) {
                    throw new RuntimeException("模拟异常");
                }
                return "正常结果";
            }).handle((result, ex) -> {
                if (ex != null) {
                    System.out.println("捕获到异常: " + ex.getMessage());
                    return "默认结果";
                }
                return "处理后的结果: " + result;
            });
    
            try {
                String finalResult = future.get();
                System.out.println(finalResult);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    

    这里 handle 方法根据任务是否异常,分别处理结果或异常,并返回相应的值。

在项目中应用 CompletableFuture 进行优化

  1. 提高并发性能 在一个电商项目中,可能需要从多个不同的数据源获取商品信息,比如商品基本信息、库存信息、价格信息等。如果使用传统的同步方式,每个数据源的获取都需要依次等待,这会导致整个获取商品信息的过程非常耗时。

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    class ProductInfo {
        private String basicInfo;
        private int stock;
        private double price;
    
        // 省略getter和setter方法
    }
    
    class DataSource {
        public String getBasicInfo() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品基本信息";
        }
    
        public int getStock() {
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        }
    
        public double getPrice() {
            try {
                Thread.sleep(2500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 99.9;
        }
    }
    
    public class EcommerceApp {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            DataSource dataSource = new DataSource();
    
            CompletableFuture<String> basicInfoFuture = CompletableFuture.supplyAsync(dataSource::getBasicInfo);
            CompletableFuture<Integer> stockFuture = CompletableFuture.supplyAsync(dataSource::getStock);
            CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(dataSource::getPrice);
    
            CompletableFuture<ProductInfo> productInfoFuture = CompletableFuture.allOf(basicInfoFuture, stockFuture, priceFuture)
                   .thenApplyAsync(v -> {
                        ProductInfo productInfo = new ProductInfo();
                        try {
                            productInfo.setBasicInfo(basicInfoFuture.get());
                            productInfo.setStock(stockFuture.get());
                            productInfo.setPrice(priceFuture.get());
                        } catch (InterruptedException | ExecutionException e) {
                            e.printStackTrace();
                        }
                        return productInfo;
                    });
    
            ProductInfo productInfo = productInfoFuture.get();
            System.out.println("商品基本信息: " + productInfo.getBasicInfo());
            System.out.println("库存: " + productInfo.getStock());
            System.out.println("价格: " + productInfo.getPrice());
        }
    }
    

    在上述代码中,通过 CompletableFuture 分别异步获取商品的基本信息、库存和价格,然后使用 allOf 方法等待所有任务完成后,再组合成完整的商品信息。这样大大提高了获取商品信息的效率,相比同步方式,总耗时约为最长的单个任务耗时(这里是获取价格的 2500 毫秒),而不是所有任务耗时之和。

  2. 优化服务调用链 在一个微服务架构的项目中,可能存在一系列的服务调用,比如用户请求先经过认证服务,再到业务处理服务,最后到数据存储服务。每个服务的调用都可能是异步的,并且需要将前一个服务的结果作为下一个服务的输入。

    import java.util.concurrent.CompletableFuture;
    
    class AuthService {
        public CompletableFuture<String> authenticate(String request) {
            return CompletableFuture.supplyAsync(() -> {
                // 模拟认证过程
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "认证成功";
            });
        }
    }
    
    class BusinessService {
        public CompletableFuture<String> process(String authResult) {
            return CompletableFuture.supplyAsync(() -> {
                // 模拟业务处理过程
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "业务处理结果: " + authResult;
            });
        }
    }
    
    class DataService {
        public CompletableFuture<String> store(String businessResult) {
            return CompletableFuture.supplyAsync(() -> {
                // 模拟数据存储过程
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "数据存储成功: " + businessResult;
            });
        }
    }
    
    public class MicroserviceApp {
        public static void main(String[] args) {
            AuthService authService = new AuthService();
            BusinessService businessService = new BusinessService();
            DataService dataService = new DataService();
    
            CompletableFuture<String> finalResultFuture = authService.authenticate("用户请求")
                   .thenCompose(authResult -> businessService.process(authResult))
                   .thenCompose(businessResult -> dataService.store(businessResult));
    
            finalResultFuture.thenAccept(System.out::println);
        }
    }
    

    在这个例子中,thenCompose 方法用于将多个异步服务调用连接成一个链条,使得每个服务的输出可以作为下一个服务的输入,并且不会出现 CompletableFuture 嵌套的问题,使代码更加清晰和易于维护。

  3. 处理高并发请求 在一个 Web 应用中,可能会同时收到大量的用户请求,例如查询订单列表。可以使用 CompletableFuture 来并行处理这些请求,提高系统的吞吐量。

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    class OrderService {
        public CompletableFuture<String> getOrderList(int userId) {
            return CompletableFuture.supplyAsync(() -> {
                // 模拟查询订单列表的操作
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "用户 " + userId + " 的订单列表";
            });
        }
    }
    
    public class WebApp {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            OrderService orderService = new OrderService();
            List<Integer> userIds = new ArrayList<>();
            userIds.add(1);
            userIds.add(2);
            userIds.add(3);
    
            List<CompletableFuture<String>> futureList = new ArrayList<>();
            for (int userId : userIds) {
                CompletableFuture<String> future = orderService.getOrderList(userId);
                futureList.add(future);
            }
    
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
            allFutures.join();
    
            List<String> orderLists = new ArrayList<>();
            for (CompletableFuture<String> future : futureList) {
                orderLists.add(future.get());
            }
    
            for (String orderList : orderLists) {
                System.out.println(orderList);
            }
        }
    }
    

    上述代码中,针对多个用户的订单查询请求,通过创建多个 CompletableFuture 来并行处理,最后使用 allOf 方法等待所有请求完成,并获取每个请求的结果进行处理,从而提高了系统处理高并发请求的能力。

总结 CompletableFuture 的优势与注意事项

  1. 优势

    • 简洁的异步编程模型:相比传统的 Future 以及手动创建线程等方式,CompletableFuture 提供了更加简洁、流畅的异步编程接口,使得异步任务的编排和处理更加直观。
    • 强大的组合能力:通过 thenCombineallOfanyOf 等方法,可以方便地对多个异步任务进行组合,实现复杂的异步逻辑,提高代码的并发性能和效率。
    • 灵活的异常处理exceptionallyhandle 等方法为异步任务的异常处理提供了灵活的方式,能够更好地控制和处理异步任务中可能出现的异常情况。
  2. 注意事项

    • 线程管理:虽然 CompletableFuture 简化了异步编程,但在使用过程中仍然需要注意线程的管理。例如,过多的异步任务可能导致线程池资源耗尽,需要合理配置线程池的大小。
    • 阻塞问题get 方法等会阻塞当前线程,在使用时需要谨慎,避免在不合适的地方导致线程阻塞,影响程序的并发性能。
    • 异常处理的全面性:在复杂的异步任务链中,要确保异常能够被正确捕获和处理,避免出现未处理的异常导致程序崩溃。

通过合理使用 CompletableFuture,Java 开发者可以有效地优化项目代码,提高系统的性能、并发处理能力以及代码的可读性和可维护性。在实际项目中,需要根据具体的业务场景和需求,灵活运用 CompletableFuture 的各种功能,以达到最佳的优化效果。