Java 中 CompletableFuture 异步任务线程资源管理
Java 中 CompletableFuture 异步任务线程资源管理
CompletableFuture 简介
在 Java 8 引入 CompletableFuture
之前,处理异步任务相对繁琐。Future
接口虽然提供了一种异步执行任务并获取结果的方式,但它有一些局限性。例如,它缺乏对异步任务完成后的链式操作支持,并且难以处理多个异步任务之间的依赖关系。CompletableFuture
弥补了这些不足,它不仅实现了 Future
接口,还提供了丰富的方法用于异步任务的组合、转换和结果处理。
CompletableFuture
允许我们以更简洁、更灵活的方式编写异步代码,使得异步编程变得更加直观。它支持同步和异步的方式获取任务结果,并且可以在任务完成时执行回调函数。这对于处理 I/O 密集型任务,如网络请求、数据库查询等,能够显著提高应用程序的性能和响应性。
基本使用
创建 CompletableFuture
-
使用
CompletableFuture.supplyAsync
创建有返回值的异步任务CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task completed"; });
在上述代码中,
supplyAsync
方法接收一个Supplier
函数式接口,它会在一个新的线程中执行该接口的get
方法,并返回一个CompletableFuture
对象,该对象最终会包含任务的执行结果。 -
使用
CompletableFuture.runAsync
创建无返回值的异步任务CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task without return value completed"); });
runAsync
方法接收一个Runnable
接口,同样会在新线程中执行run
方法,但返回的CompletableFuture
对象的泛型为Void
,因为任务没有返回值。
获取任务结果
-
使用
get
方法同步获取结果CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task completed"; }); try { String result = future.get(); System.out.println("Result: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
get
方法会阻塞当前线程,直到异步任务完成并返回结果。如果任务执行过程中抛出异常,get
方法会将异常包装成ExecutionException
抛出。 -
使用
get(long timeout, TimeUnit unit)
方法设置超时获取结果CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task completed"; }); try { String result = future.get(2, TimeUnit.SECONDS); System.out.println("Result: " + result); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); }
上述代码中,如果在 2 秒内任务没有完成,
get
方法会抛出TimeoutException
。 -
使用
join
方法同步获取结果CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task completed"; }); String result = future.join(); System.out.println("Result: " + result);
join
方法与get
方法类似,也是阻塞当前线程获取结果。但不同的是,如果任务执行过程中抛出异常,join
方法会直接抛出CompletionException
,而不是ExecutionException
。 -
使用
whenComplete
方法异步获取结果CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task completed"; }); future.whenComplete((result, exception) -> { if (exception == null) { System.out.println("Result: " + result); } else { exception.printStackTrace(); } });
whenComplete
方法接收一个BiConsumer
,当异步任务完成时(无论成功还是失败),会异步执行该BiConsumer
。它不会阻塞当前线程,适用于在任务完成后执行一些后续操作,如日志记录等。 -
使用
thenApply
方法处理任务结果CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task completed"; }).thenApply(result -> result + " and processed"); try { String finalResult = future.get(); System.out.println("Final Result: " + finalResult); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
thenApply
方法接收一个Function
,它会在异步任务成功完成后,将任务的结果作为参数传递给Function
进行处理,并返回一个新的CompletableFuture
,其结果为Function
的返回值。
异步任务的组合
多个异步任务串行执行
-
使用
thenCompose
方法CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Step 1 completed"; }); CompletableFuture<String> future2 = future1.thenCompose(result1 -> CompletableFuture.supplyAsync(() -> { System.out.println(result1); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Step 2 completed"; })); try { String finalResult = future2.get(); System.out.println("Final Result: " + finalResult); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
thenCompose
方法接收一个Function
,该Function
的返回值必须是一个CompletableFuture
。它会将前一个任务的结果作为参数传递给Function
,并将返回的CompletableFuture
与当前CompletableFuture
进行组合,形成一个新的串行异步任务链。 -
使用
thenApply
和thenAccept
组合实现串行执行CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Step 1 completed"; }).thenApply(result1 -> { System.out.println(result1); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Step 2 completed"; }).thenAccept(finalResult -> System.out.println("Final Result: " + finalResult));
这种方式通过
thenApply
依次处理任务结果,并通过thenAccept
在最后一步进行结果的消费。但与thenCompose
不同的是,thenApply
返回的是一个包含处理结果的CompletableFuture
,而thenCompose
会将返回的CompletableFuture
进行扁平化处理,使得代码更简洁,更适合处理复杂的异步任务链。
多个异步任务并行执行
-
使用
CompletableFuture.allOf
方法CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 1 completed"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 2 completed"; }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2); allOfFuture.join(); try { String result1 = future1.get(); String result2 = future2.get(); System.out.println(result1); System.out.println(result2); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
allOf
方法接收多个CompletableFuture
作为参数,返回一个新的CompletableFuture
。这个新的CompletableFuture
会在所有传入的CompletableFuture
都完成时才完成。通过调用join
方法等待所有任务完成,然后可以分别获取每个任务的结果。 -
使用
CompletableFuture.anyOf
方法CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 1 completed"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 2 completed"; }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2); try { Object result = anyOfFuture.get(); System.out.println("First completed task result: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
anyOf
方法同样接收多个CompletableFuture
作为参数,返回的新CompletableFuture
会在任何一个传入的CompletableFuture
完成时就完成,其结果为第一个完成的任务的结果。
线程资源管理
默认线程池
当我们使用 CompletableFuture.supplyAsync
或 CompletableFuture.runAsync
方法时,如果不指定线程池,它们会使用 ForkJoinPool.commonPool()
作为默认线程池。ForkJoinPool
是 Java 7 引入的一种线程池,它采用工作窃取算法,能够有效提高多核 CPU 环境下的任务并行执行效率。
CompletableFuture.supplyAsync(() -> {
// 任务代码
return "Result";
});
上述代码使用默认线程池执行异步任务。然而,ForkJoinPool.commonPool()
是一个共享的线程池,可能会被其他异步任务共享使用。如果应用程序中有大量不同类型的异步任务,可能会导致线程资源竞争,影响性能。
自定义线程池
为了更好地管理线程资源,我们可以创建自定义线程池并传递给 CompletableFuture
的异步方法。
-
使用
Executors
创建线程池ExecutorService executor = Executors.newFixedThreadPool(5); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task completed"; }, executor); try { String result = future.get(); System.out.println("Result: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { executor.shutdown(); }
在上述代码中,我们使用
Executors.newFixedThreadPool(5)
创建了一个固定大小为 5 的线程池,并将其传递给supplyAsync
方法。这样,异步任务就会在这个自定义线程池中执行。注意,在使用完线程池后,需要调用executor.shutdown()
方法关闭线程池,以避免资源泄漏。 -
使用
ThreadPoolExecutor
创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor( 3, 5, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task completed"; }, executor); try { String result = future.get(); System.out.println("Result: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { executor.shutdown(); }
ThreadPoolExecutor
提供了更灵活的线程池配置参数。我们可以指定核心线程数(corePoolSize
)、最大线程数(maximumPoolSize
)、线程存活时间(keepAliveTime
)和任务队列(workQueue
)等。通过合理配置这些参数,可以根据应用程序的负载情况优化线程资源的使用。
线程池参数调优
- 核心线程数的选择 核心线程数的选择需要考虑任务的类型。如果是 CPU 密集型任务,核心线程数一般设置为 CPU 核心数加 1。例如,对于一个 4 核 CPU 的机器,核心线程数可以设置为 5。这是因为 CPU 密集型任务几乎一直在使用 CPU,多一个线程可以在某个线程因偶尔的页缺失等原因暂停时,利用 CPU 资源。
int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
cores + 1,
cores + 1,
0,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
如果是 I/O 密集型任务,核心线程数可以设置为 CPU 核心数的 2 倍左右。因为 I/O 密集型任务大部分时间在等待 I/O 操作完成,CPU 有空闲时间,所以可以多分配一些线程来充分利用 CPU 资源。
int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
cores * 2,
cores * 2,
0,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
-
最大线程数的设置 最大线程数一般不应设置得过大,否则可能会导致系统资源耗尽。对于 I/O 密集型任务,最大线程数可以适当比核心线程数大一些,但也要根据系统的内存等资源情况来调整。例如,对于一个内存有限的系统,过多的线程可能会导致频繁的内存交换,反而降低性能。
-
任务队列的选择 任务队列的选择也很关键。
LinkedBlockingQueue
是一个无界队列,它可以无限添加任务,但可能会导致任务堆积,占用大量内存。ArrayBlockingQueue
是一个有界队列,需要指定队列容量。如果队列满了,新的任务会根据线程池的拒绝策略进行处理。常用的拒绝策略有AbortPolicy
(默认,直接抛出异常)、CallerRunsPolicy
(由调用者线程执行任务)、DiscardPolicy
(丢弃任务)和DiscardOldestPolicy
(丢弃队列中最老的任务)。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new CallerRunsPolicy());
在上述代码中,我们使用 ArrayBlockingQueue
作为任务队列,并设置容量为 10,同时使用 CallerRunsPolicy
拒绝策略,当队列满时,新任务会由调用者线程执行。
异常处理
传统的异常处理方式
在使用 CompletableFuture
时,如果任务执行过程中抛出异常,我们可以使用传统的 try - catch
块来捕获异常。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Task failed");
});
try {
String result = future.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
在上述代码中,异步任务抛出了 RuntimeException
,get
方法会将其包装成 ExecutionException
抛出,我们可以在 catch
块中捕获并处理。
使用 exceptionally
方法处理异常
exceptionally
方法提供了一种更优雅的方式来处理异步任务中的异常。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Task failed");
}).exceptionally(exception -> {
System.out.println("Caught exception: " + exception.getMessage());
return "Default result";
});
try {
String result = future.get();
System.out.println("Result: " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
exceptionally
方法接收一个 Function
,当异步任务抛出异常时,会执行这个 Function
,并返回一个默认值或进行异常处理。在上述代码中,当任务抛出异常时,exceptionally
方法捕获异常并返回了一个默认结果。
使用 handle
方法同时处理结果和异常
handle
方法可以同时处理异步任务的正常结果和异常情况。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Task failed");
}).handle((result, exception) -> {
if (exception != null) {
System.out.println("Caught exception: " + exception.getMessage());
return "Default result";
} else {
return result;
}
});
try {
String finalResult = future.get();
System.out.println("Final Result: " + finalResult);
} catch (InterruptedException e) {
e.printStackTrace();
}
handle
方法接收一个 BiFunction
,它会在任务完成时(无论成功还是失败)被调用。BiFunction
的第一个参数是任务的结果(如果任务成功),第二个参数是异常(如果任务失败)。通过这种方式,我们可以在一个方法中统一处理结果和异常。
CompletableFuture 在实际项目中的应用场景
微服务调用
在微服务架构中,经常需要调用多个不同的微服务接口来获取数据并进行处理。使用 CompletableFuture
可以并行调用这些微服务接口,提高整体响应速度。
// 假设这是两个微服务调用方法
CompletableFuture<String> service1Future = CompletableFuture.supplyAsync(() -> {
// 模拟调用微服务1
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Service 1 result";
});
CompletableFuture<String> service2Future = CompletableFuture.supplyAsync(() -> {
// 模拟调用微服务2
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Service 2 result";
});
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(service1Future, service2Future);
allOfFuture.join();
try {
String result1 = service1Future.get();
String result2 = service2Future.get();
// 对两个微服务的结果进行处理
System.out.println("Combined result: " + result1 + " and " + result2);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
在上述代码中,我们并行调用了两个微服务接口,并在所有调用完成后处理结果。这样可以避免顺序调用微服务接口带来的时间浪费,提高系统的性能。
数据批处理
当需要对大量数据进行处理时,可以将数据分成多个批次,使用 CompletableFuture
并行处理每个批次,最后合并结果。
List<Integer> dataList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<CompletableFuture<Integer>> futureList = new ArrayList<>();
int batchSize = 2;
for (int i = 0; i < dataList.size(); i += batchSize) {
List<Integer> batch = dataList.subList(i, Math.min(i + batchSize, dataList.size()));
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int sum = 0;
for (int num : batch) {
sum += num;
}
return sum;
});
futureList.add(future);
}
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
allOfFuture.join();
int totalSum = 0;
for (CompletableFuture<Integer> future : futureList) {
try {
totalSum += future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("Total sum: " + totalSum);
在上述代码中,我们将数据分成批次,每个批次在一个异步任务中进行求和计算,最后将所有批次的结果累加得到最终结果。这种方式可以充分利用多核 CPU 的性能,提高数据处理效率。
性能优化与注意事项
减少线程创建和销毁开销
频繁地创建和销毁线程会带来较大的开销,因此使用线程池可以复用线程,减少这种开销。通过合理配置线程池的参数,如核心线程数、最大线程数和任务队列,可以使线程池在不同的负载情况下都能高效运行。
避免任务堆积
如果任务队列设置不合理,可能会导致任务堆积,占用大量内存。对于有界队列,要根据系统的负载情况设置合适的队列容量,并选择合适的拒绝策略。对于无界队列,要密切关注任务的处理速度,避免任务无限堆积。
注意线程安全
在异步任务中,如果多个任务需要共享数据,要注意线程安全问题。可以使用线程安全的数据结构,如 ConcurrentHashMap
、CopyOnWriteArrayList
等,或者使用同步机制,如 synchronized
关键字、Lock
接口等。
监控和调优
在实际应用中,要对异步任务的执行情况进行监控,例如监控线程池的状态(活跃线程数、任务队列大小等)、任务的执行时间和成功率等。根据监控数据,对线程池参数和异步任务的逻辑进行调优,以提高系统的性能和稳定性。
综上所述,CompletableFuture
为 Java 开发者提供了强大的异步编程能力,通过合理管理线程资源和正确处理异常,能够显著提高应用程序的性能和响应性。在实际项目中,要根据具体的业务场景和需求,灵活运用 CompletableFuture
的各种特性,以实现高效、稳定的异步任务处理。