Java 中 CompletableFuture 任务异步回调 thenRunAsync 方法
CompletableFuture 的 thenRunAsync 方法概述
在 Java 并发编程中,CompletableFuture
是一个强大的类,它提供了异步编程的能力,让我们能够处理异步任务的执行结果、组合多个异步任务等。thenRunAsync
方法是 CompletableFuture
提供的众多方法之一,用于在一个 CompletableFuture
完成时,异步执行另一个任务。
方法签名
CompletableFuture<Void> thenRunAsync(Runnable action)
该方法接收一个 Runnable
类型的参数 action
,当调用该方法的 CompletableFuture
完成时,会异步执行 action
这个任务。返回值是一个新的 CompletableFuture
,其结果在 action
执行完成后完成,且结果为 null
。
还有一个重载方法:
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
这个重载方法允许我们指定一个 Executor
来执行 action
任务。如果不指定 Executor
,则会使用默认的 ForkJoinPool.commonPool()
来执行任务。
示例代码 - 基本使用
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureThenRunAsyncExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("任务1开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1执行完毕,线程:" + Thread.currentThread().getName());
return "任务1的结果";
}).thenRunAsync(() -> {
System.out.println("任务2开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2执行完毕,线程:" + Thread.currentThread().getName());
}).get();
}
}
在上述代码中,首先通过 CompletableFuture.supplyAsync
创建了一个异步任务,该任务模拟了一个耗时 2 秒的操作,并返回 "任务1的结果"。然后通过 thenRunAsync
方法,在第一个任务完成后,异步执行另一个任务,该任务模拟了一个耗时 1 秒的操作。
示例代码 - 使用自定义 Executor
import java.util.concurrent.*;
public class CompletableFutureThenRunAsyncWithExecutorExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
System.out.println("任务1开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1执行完毕,线程:" + Thread.currentThread().getName());
return "任务1的结果";
}).thenRunAsync(() -> {
System.out.println("任务2开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2执行完毕,线程:" + Thread.currentThread().getName());
}, executorService).get();
executorService.shutdown();
}
}
在这段代码中,我们创建了一个固定大小为 2 的线程池 executorService
,并将其作为参数传递给 thenRunAsync
方法。这样,第二个任务就会在我们自定义的线程池中执行。最后记得关闭线程池 executorService.shutdown()
,以确保程序正常结束。
本质原理剖析
- 任务的异步执行:
CompletableFuture
基于ForkJoinPool
实现了异步任务的执行。当调用thenRunAsync
方法时,会将传入的Runnable
任务提交到默认的ForkJoinPool.commonPool()
或者指定的Executor
中执行。这意味着新的任务会在一个独立的线程中执行,不会阻塞主线程或者调用任务的线程。 - 任务的依赖关系:
thenRunAsync
方法建立了任务之间的依赖关系。只有当调用该方法的CompletableFuture
完成时,才会触发新任务的执行。这种依赖关系的管理是通过CompletableFuture
内部的状态机和回调机制实现的。 - 状态机管理:
CompletableFuture
内部维护了一个状态机,用于跟踪任务的执行状态,如未开始、正在运行、已完成、已取消等。当任务完成时,会改变状态,并触发相关的回调操作,包括执行thenRunAsync
中传入的Runnable
任务。 - 线程调度:在
ForkJoinPool
中,线程的调度采用了工作窃取算法。当一个线程完成自己的任务后,会尝试从其他忙碌的线程队列中窃取任务来执行,以提高线程的利用率和整体的执行效率。这对于thenRunAsync
方法中异步任务的执行也起到了优化作用,使得任务能够尽快得到执行。
异常处理
在使用 thenRunAsync
方法时,如果前面的 CompletableFuture
任务抛出异常,thenRunAsync
中的任务默认不会执行。但是我们可以通过 exceptionally
方法来处理异常。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureThenRunAsyncExceptionExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("任务1开始执行,线程:" + Thread.currentThread().getName());
if (Math.random() < 0.5) {
throw new RuntimeException("任务1发生异常");
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1执行完毕,线程:" + Thread.currentThread().getName());
return "任务1的结果";
}).exceptionally(ex -> {
System.out.println("捕获到任务1的异常:" + ex.getMessage());
return null;
}).thenRunAsync(() -> {
System.out.println("任务2开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2执行完毕,线程:" + Thread.currentThread().getName());
}).get();
}
}
在上述代码中,任务 1 有 50% 的概率抛出异常。通过 exceptionally
方法捕获异常并进行处理后,任务 2 依然可以正常执行。
与其他类似方法的比较
- thenRun 与 thenRunAsync:
thenRun
方法也是在CompletableFuture
完成时执行一个Runnable
任务,但它是在调用任务的线程中执行,而thenRunAsync
是异步执行,会使用线程池来执行任务。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureThenRunVsThenRunAsyncExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("任务1开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1执行完毕,线程:" + Thread.currentThread().getName());
return "任务1的结果";
}).thenRun(() -> {
System.out.println("任务2(thenRun)开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2(thenRun)执行完毕,线程:" + Thread.currentThread().getName());
}).get();
CompletableFuture.supplyAsync(() -> {
System.out.println("任务3开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务3执行完毕,线程:" + Thread.currentThread().getName());
return "任务3的结果";
}).thenRunAsync(() -> {
System.out.println("任务4(thenRunAsync)开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务4(thenRunAsync)执行完毕,线程:" + Thread.currentThread().getName());
}).get();
}
}
在上述代码中,通过观察线程名称可以发现,thenRun
中的任务和前面的任务在同一个线程中执行,而 thenRunAsync
中的任务在不同的线程(线程池中的线程)中执行。
2. thenApplyAsync 与 thenRunAsync:thenApplyAsync
方法用于在 CompletableFuture
完成时,异步执行一个函数,并将前一个任务的结果作为参数传递给这个函数,返回一个新的 CompletableFuture
,其结果是函数的返回值。而 thenRunAsync
不关心前一个任务的结果,只在任务完成时执行一个 Runnable
任务,返回的 CompletableFuture
结果为 null
。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureThenApplyAsyncVsThenRunAsyncExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("任务1开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1执行完毕,线程:" + Thread.currentThread().getName());
return "任务1的结果";
}).thenApplyAsync(result -> {
System.out.println("任务2(thenApplyAsync)开始执行,线程:" + Thread.currentThread().getName());
System.out.println("接收到任务1的结果:" + result);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2(thenApplyAsync)执行完毕,线程:" + Thread.currentThread().getName());
return "处理后的结果";
}).get();
CompletableFuture.supplyAsync(() -> {
System.out.println("任务3开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务3执行完毕,线程:" + Thread.currentThread().getName());
return "任务3的结果";
}).thenRunAsync(() -> {
System.out.println("任务4(thenRunAsync)开始执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务4(thenRunAsync)执行完毕,线程:" + Thread.currentThread().getName());
}).get();
}
}
从上述代码可以看出,thenApplyAsync
能够获取并处理前一个任务的结果,而 thenRunAsync
对前一个任务的结果不感兴趣。
在实际项目中的应用场景
- 异步日志记录:在一个 Web 应用中,当用户完成某个操作后,需要记录操作日志。我们可以将日志记录操作放在
thenRunAsync
中异步执行,这样不会影响用户操作的响应速度。例如,用户下单后,订单处理任务完成后,通过thenRunAsync
执行日志记录任务。
import java.util.concurrent.CompletableFuture;
public class OrderProcessingWithLoggingExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println("开始处理订单,线程:" + Thread.currentThread().getName());
// 模拟订单处理逻辑
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("订单处理完毕,线程:" + Thread.currentThread().getName());
return "订单处理结果";
}).thenRunAsync(() -> {
System.out.println("开始记录订单日志,线程:" + Thread.currentThread().getName());
// 模拟日志记录逻辑
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("订单日志记录完毕,线程:" + Thread.currentThread().getName());
});
}
}
- 缓存更新:在一个基于缓存的应用中,当数据更新操作完成后,需要更新缓存。可以使用
thenRunAsync
在数据更新任务完成后异步执行缓存更新任务,避免影响数据更新操作的性能。
import java.util.concurrent.CompletableFuture;
public class DataUpdateWithCacheRefreshExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println("开始更新数据,线程:" + Thread.currentThread().getName());
// 模拟数据更新逻辑
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("数据更新完毕,线程:" + Thread.currentThread().getName());
return "数据更新结果";
}).thenRunAsync(() -> {
System.out.println("开始更新缓存,线程:" + Thread.currentThread().getName());
// 模拟缓存更新逻辑
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("缓存更新完毕,线程:" + Thread.currentThread().getName());
});
}
}
- 异步通知:在分布式系统中,当某个核心业务流程完成后,需要通知其他系统。可以通过
thenRunAsync
将通知任务异步执行,减少核心业务流程的执行时间。例如,在电商系统中,订单支付完成后,异步通知物流系统准备发货。
import java.util.concurrent.CompletableFuture;
public class PaymentWithNotificationExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println("开始处理支付,线程:" + Thread.currentThread().getName());
// 模拟支付处理逻辑
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("支付处理完毕,线程:" + Thread.currentThread().getName());
return "支付结果";
}).thenRunAsync(() -> {
System.out.println("开始通知物流系统,线程:" + Thread.currentThread().getName());
// 模拟通知物流系统逻辑
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("物流系统通知完毕,线程:" + Thread.currentThread().getName());
});
}
}
通过深入理解 CompletableFuture
的 thenRunAsync
方法,我们能够更好地利用 Java 的异步编程能力,提高程序的性能和响应性,在实际项目中灵活应用于各种场景。同时,要注意合理使用线程池和异常处理,确保异步任务的正确执行和系统的稳定性。