Java 中 CompletableFuture 优化项目代码
Java 中 CompletableFuture 优化项目代码
CompletableFuture 基础介绍
在现代 Java 编程中,随着项目规模和复杂度的增长,异步编程变得愈发重要。CompletableFuture
作为 Java 8 引入的一个强大工具,极大地简化了异步编程模型,使得代码能够更加高效、简洁地处理异步任务。
CompletableFuture
实现了 Future
接口和 CompletionStage
接口。Future
接口是 Java 早期用于异步计算的机制,它允许我们发起一个异步任务,并在稍后获取任务的结果。然而,Future
存在一些局限性,比如在获取结果时可能会导致阻塞,并且缺乏对异步任务链、组合以及异常处理的便捷支持。而 CompletionStage
接口则定义了一系列方法,用于对异步计算进行编排,CompletableFuture
正是基于这些方法,提供了丰富的功能来处理异步任务。
创建 CompletableFuture 对象
-
使用
CompletableFuture.supplyAsync
创建有返回值的异步任务import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟一个耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "异步任务执行完成"; }); String result = future.get(); System.out.println(result); } }
在上述代码中,
CompletableFuture.supplyAsync
方法接受一个Supplier
作为参数,这个Supplier
会在一个新的线程中执行。方法返回一个CompletableFuture
对象,通过调用get
方法可以获取异步任务的执行结果。需要注意的是,get
方法会阻塞当前线程,直到异步任务完成。 -
使用
CompletableFuture.runAsync
创建无返回值的异步任务import java.util.concurrent.CompletableFuture; public class CompletableFutureExample2 { public static void main(String[] args) { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 模拟一个耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("无返回值的异步任务执行完成"); }); } }
CompletableFuture.runAsync
方法接受一个Runnable
作为参数,同样在新线程中执行任务,但不返回任何结果。这里返回的CompletableFuture<Void>
只是用于表示任务的完成状态。
异步任务的链式调用
-
thenApply 方法
thenApply
方法用于对异步任务的结果进行转换。它接受一个Function
作为参数,该Function
的输入是上一个异步任务的结果,输出是新的结果。import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureChaining { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "初始结果"; }).thenApply(result -> { return "转换后的结果: " + result; }); String result = future.get(); System.out.println(result); } }
在上述代码中,
supplyAsync
方法返回的CompletableFuture
对象通过thenApply
方法进行了结果转换。thenApply
方法中的Function
接收supplyAsync
方法返回的字符串,并在其前面添加了一段文本。 -
thenAccept 方法
thenAccept
方法用于在异步任务完成后执行一个消费操作,它接受一个Consumer
作为参数,这个Consumer
会处理异步任务的结果,但不返回新的结果。import java.util.concurrent.CompletableFuture; public class CompletableFutureChaining2 { public static void main(String[] args) { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务结果"; }).thenAccept(result -> { System.out.println("处理结果: " + result); }); } }
这里
supplyAsync
方法返回的CompletableFuture
对象通过thenAccept
方法,使用Consumer
打印出了异步任务的结果。 -
thenRun 方法
thenRun
方法在异步任务完成后执行一个Runnable
,它不关心异步任务的结果。import java.util.concurrent.CompletableFuture; public class CompletableFutureChaining3 { public static void main(String[] args) { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务完成"; }).thenRun(() -> { System.out.println("不关心结果,只执行此操作"); }); } }
在这个例子中,
supplyAsync
完成后,thenRun
中的Runnable
会被执行,而不会处理supplyAsync
的返回结果。
异步任务的组合
-
thenCombine 方法
thenCombine
方法用于将两个异步任务的结果进行合并。它接受另一个CompletableFuture
和一个BiFunction
作为参数,BiFunction
用于处理两个异步任务的结果并返回一个新的结果。import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureCombination { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "结果1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "结果2"; }); CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> { return result1 + " 和 " + result2; }); String result = combinedFuture.get(); System.out.println(result); } }
在上述代码中,
future1
和future2
是两个异步任务,thenCombine
方法将它们的结果合并成一个新的字符串。 -
allOf 方法
allOf
方法用于等待所有给定的CompletableFuture
都完成。它接受多个CompletableFuture
作为参数,并返回一个新的CompletableFuture<Void>
。当所有传入的CompletableFuture
都完成时,返回的CompletableFuture
也完成。import java.util.concurrent.CompletableFuture; public class CompletableFutureAllOf { public static void main(String[] args) { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务1完成"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务2完成"; }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2); allOfFuture.join(); System.out.println("所有任务都完成了"); } }
这里
allOf
方法等待future1
和future2
都完成后,才继续执行后续的打印操作。 -
anyOf 方法
anyOf
方法用于等待给定的CompletableFuture
中任意一个完成。它接受多个CompletableFuture
作为参数,并返回一个新的CompletableFuture
,这个新的CompletableFuture
在任意一个传入的CompletableFuture
完成时就完成,其结果是第一个完成的CompletableFuture
的结果。import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureAnyOf { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务1完成"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "任务2完成"; }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2); Object result = anyOfFuture.get(); System.out.println("第一个完成的任务结果: " + result); } }
在这个例子中,
future2
会先完成,所以anyOfFuture
获取到的结果就是future2
的结果。
异常处理
-
exceptionally 方法
exceptionally
方法用于处理异步任务中的异常。它接受一个Function
作为参数,当异步任务抛出异常时,这个Function
会被执行,参数为抛出的异常,返回值作为CompletableFuture
的结果。import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureExceptionHandling { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("模拟异常"); } return "正常结果"; }).exceptionally(ex -> { System.out.println("捕获到异常: " + ex.getMessage()); return "默认结果"; }); try { String result = future.get(); System.out.println("最终结果: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
在上述代码中,如果
supplyAsync
中的任务抛出异常,exceptionally
方法中的Function
会处理异常,并返回一个默认结果。 -
handle 方法
handle
方法既可以处理正常的结果,也可以处理异常。它接受一个BiFunction
作为参数,第一个参数是异步任务的结果(如果任务正常完成),第二个参数是抛出的异常(如果任务异常)。import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureHandle { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("模拟异常"); } return "正常结果"; }).handle((result, ex) -> { if (ex != null) { System.out.println("捕获到异常: " + ex.getMessage()); return "默认结果"; } return "处理后的结果: " + result; }); try { String finalResult = future.get(); System.out.println(finalResult); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
这里
handle
方法根据任务是否异常,分别处理结果或异常,并返回相应的值。
在项目中应用 CompletableFuture 进行优化
-
提高并发性能 在一个电商项目中,可能需要从多个不同的数据源获取商品信息,比如商品基本信息、库存信息、价格信息等。如果使用传统的同步方式,每个数据源的获取都需要依次等待,这会导致整个获取商品信息的过程非常耗时。
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; class ProductInfo { private String basicInfo; private int stock; private double price; // 省略getter和setter方法 } class DataSource { public String getBasicInfo() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "商品基本信息"; } public int getStock() { try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } return 100; } public double getPrice() { try { Thread.sleep(2500); } catch (InterruptedException e) { e.printStackTrace(); } return 99.9; } } public class EcommerceApp { public static void main(String[] args) throws ExecutionException, InterruptedException { DataSource dataSource = new DataSource(); CompletableFuture<String> basicInfoFuture = CompletableFuture.supplyAsync(dataSource::getBasicInfo); CompletableFuture<Integer> stockFuture = CompletableFuture.supplyAsync(dataSource::getStock); CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(dataSource::getPrice); CompletableFuture<ProductInfo> productInfoFuture = CompletableFuture.allOf(basicInfoFuture, stockFuture, priceFuture) .thenApplyAsync(v -> { ProductInfo productInfo = new ProductInfo(); try { productInfo.setBasicInfo(basicInfoFuture.get()); productInfo.setStock(stockFuture.get()); productInfo.setPrice(priceFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return productInfo; }); ProductInfo productInfo = productInfoFuture.get(); System.out.println("商品基本信息: " + productInfo.getBasicInfo()); System.out.println("库存: " + productInfo.getStock()); System.out.println("价格: " + productInfo.getPrice()); } }
在上述代码中,通过
CompletableFuture
分别异步获取商品的基本信息、库存和价格,然后使用allOf
方法等待所有任务完成后,再组合成完整的商品信息。这样大大提高了获取商品信息的效率,相比同步方式,总耗时约为最长的单个任务耗时(这里是获取价格的 2500 毫秒),而不是所有任务耗时之和。 -
优化服务调用链 在一个微服务架构的项目中,可能存在一系列的服务调用,比如用户请求先经过认证服务,再到业务处理服务,最后到数据存储服务。每个服务的调用都可能是异步的,并且需要将前一个服务的结果作为下一个服务的输入。
import java.util.concurrent.CompletableFuture; class AuthService { public CompletableFuture<String> authenticate(String request) { return CompletableFuture.supplyAsync(() -> { // 模拟认证过程 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "认证成功"; }); } } class BusinessService { public CompletableFuture<String> process(String authResult) { return CompletableFuture.supplyAsync(() -> { // 模拟业务处理过程 try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } return "业务处理结果: " + authResult; }); } } class DataService { public CompletableFuture<String> store(String businessResult) { return CompletableFuture.supplyAsync(() -> { // 模拟数据存储过程 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "数据存储成功: " + businessResult; }); } } public class MicroserviceApp { public static void main(String[] args) { AuthService authService = new AuthService(); BusinessService businessService = new BusinessService(); DataService dataService = new DataService(); CompletableFuture<String> finalResultFuture = authService.authenticate("用户请求") .thenCompose(authResult -> businessService.process(authResult)) .thenCompose(businessResult -> dataService.store(businessResult)); finalResultFuture.thenAccept(System.out::println); } }
在这个例子中,
thenCompose
方法用于将多个异步服务调用连接成一个链条,使得每个服务的输出可以作为下一个服务的输入,并且不会出现CompletableFuture
嵌套的问题,使代码更加清晰和易于维护。 -
处理高并发请求 在一个 Web 应用中,可能会同时收到大量的用户请求,例如查询订单列表。可以使用
CompletableFuture
来并行处理这些请求,提高系统的吞吐量。import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; class OrderService { public CompletableFuture<String> getOrderList(int userId) { return CompletableFuture.supplyAsync(() -> { // 模拟查询订单列表的操作 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "用户 " + userId + " 的订单列表"; }); } } public class WebApp { public static void main(String[] args) throws ExecutionException, InterruptedException { OrderService orderService = new OrderService(); List<Integer> userIds = new ArrayList<>(); userIds.add(1); userIds.add(2); userIds.add(3); List<CompletableFuture<String>> futureList = new ArrayList<>(); for (int userId : userIds) { CompletableFuture<String> future = orderService.getOrderList(userId); futureList.add(future); } CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])); allFutures.join(); List<String> orderLists = new ArrayList<>(); for (CompletableFuture<String> future : futureList) { orderLists.add(future.get()); } for (String orderList : orderLists) { System.out.println(orderList); } } }
上述代码中,针对多个用户的订单查询请求,通过创建多个
CompletableFuture
来并行处理,最后使用allOf
方法等待所有请求完成,并获取每个请求的结果进行处理,从而提高了系统处理高并发请求的能力。
总结 CompletableFuture 的优势与注意事项
-
优势
- 简洁的异步编程模型:相比传统的
Future
以及手动创建线程等方式,CompletableFuture
提供了更加简洁、流畅的异步编程接口,使得异步任务的编排和处理更加直观。 - 强大的组合能力:通过
thenCombine
、allOf
、anyOf
等方法,可以方便地对多个异步任务进行组合,实现复杂的异步逻辑,提高代码的并发性能和效率。 - 灵活的异常处理:
exceptionally
和handle
等方法为异步任务的异常处理提供了灵活的方式,能够更好地控制和处理异步任务中可能出现的异常情况。
- 简洁的异步编程模型:相比传统的
-
注意事项
- 线程管理:虽然
CompletableFuture
简化了异步编程,但在使用过程中仍然需要注意线程的管理。例如,过多的异步任务可能导致线程池资源耗尽,需要合理配置线程池的大小。 - 阻塞问题:
get
方法等会阻塞当前线程,在使用时需要谨慎,避免在不合适的地方导致线程阻塞,影响程序的并发性能。 - 异常处理的全面性:在复杂的异步任务链中,要确保异常能够被正确捕获和处理,避免出现未处理的异常导致程序崩溃。
- 线程管理:虽然
通过合理使用 CompletableFuture
,Java 开发者可以有效地优化项目代码,提高系统的性能、并发处理能力以及代码的可读性和可维护性。在实际项目中,需要根据具体的业务场景和需求,灵活运用 CompletableFuture
的各种功能,以达到最佳的优化效果。