MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 任务异步回调 thenRunAsync 方法

2024-02-256.2k 阅读

CompletableFuture 的 thenRunAsync 方法概述

在 Java 并发编程中,CompletableFuture 是一个强大的类,它提供了异步编程的能力,让我们能够处理异步任务的执行结果、组合多个异步任务等。thenRunAsync 方法是 CompletableFuture 提供的众多方法之一,用于在一个 CompletableFuture 完成时,异步执行另一个任务。

方法签名

CompletableFuture<Void> thenRunAsync(Runnable action)

该方法接收一个 Runnable 类型的参数 action,当调用该方法的 CompletableFuture 完成时,会异步执行 action 这个任务。返回值是一个新的 CompletableFuture,其结果在 action 执行完成后完成,且结果为 null

还有一个重载方法:

CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

这个重载方法允许我们指定一个 Executor 来执行 action 任务。如果不指定 Executor,则会使用默认的 ForkJoinPool.commonPool() 来执行任务。

示例代码 - 基本使用

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureThenRunAsyncExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务1执行完毕,线程:" + Thread.currentThread().getName());
            return "任务1的结果";
        }).thenRunAsync(() -> {
            System.out.println("任务2开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2执行完毕,线程:" + Thread.currentThread().getName());
        }).get();
    }
}

在上述代码中,首先通过 CompletableFuture.supplyAsync 创建了一个异步任务,该任务模拟了一个耗时 2 秒的操作,并返回 "任务1的结果"。然后通过 thenRunAsync 方法,在第一个任务完成后,异步执行另一个任务,该任务模拟了一个耗时 1 秒的操作。

示例代码 - 使用自定义 Executor

import java.util.concurrent.*;

public class CompletableFutureThenRunAsyncWithExecutorExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务1执行完毕,线程:" + Thread.currentThread().getName());
            return "任务1的结果";
        }).thenRunAsync(() -> {
            System.out.println("任务2开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2执行完毕,线程:" + Thread.currentThread().getName());
        }, executorService).get();
        executorService.shutdown();
    }
}

在这段代码中,我们创建了一个固定大小为 2 的线程池 executorService,并将其作为参数传递给 thenRunAsync 方法。这样,第二个任务就会在我们自定义的线程池中执行。最后记得关闭线程池 executorService.shutdown(),以确保程序正常结束。

本质原理剖析

  1. 任务的异步执行CompletableFuture 基于 ForkJoinPool 实现了异步任务的执行。当调用 thenRunAsync 方法时,会将传入的 Runnable 任务提交到默认的 ForkJoinPool.commonPool() 或者指定的 Executor 中执行。这意味着新的任务会在一个独立的线程中执行,不会阻塞主线程或者调用任务的线程。
  2. 任务的依赖关系thenRunAsync 方法建立了任务之间的依赖关系。只有当调用该方法的 CompletableFuture 完成时,才会触发新任务的执行。这种依赖关系的管理是通过 CompletableFuture 内部的状态机和回调机制实现的。
  3. 状态机管理CompletableFuture 内部维护了一个状态机,用于跟踪任务的执行状态,如未开始、正在运行、已完成、已取消等。当任务完成时,会改变状态,并触发相关的回调操作,包括执行 thenRunAsync 中传入的 Runnable 任务。
  4. 线程调度:在 ForkJoinPool 中,线程的调度采用了工作窃取算法。当一个线程完成自己的任务后,会尝试从其他忙碌的线程队列中窃取任务来执行,以提高线程的利用率和整体的执行效率。这对于 thenRunAsync 方法中异步任务的执行也起到了优化作用,使得任务能够尽快得到执行。

异常处理

在使用 thenRunAsync 方法时,如果前面的 CompletableFuture 任务抛出异常,thenRunAsync 中的任务默认不会执行。但是我们可以通过 exceptionally 方法来处理异常。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureThenRunAsyncExceptionExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1开始执行,线程:" + Thread.currentThread().getName());
            if (Math.random() < 0.5) {
                throw new RuntimeException("任务1发生异常");
            }
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务1执行完毕,线程:" + Thread.currentThread().getName());
            return "任务1的结果";
        }).exceptionally(ex -> {
            System.out.println("捕获到任务1的异常:" + ex.getMessage());
            return null;
        }).thenRunAsync(() -> {
            System.out.println("任务2开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2执行完毕,线程:" + Thread.currentThread().getName());
        }).get();
    }
}

在上述代码中,任务 1 有 50% 的概率抛出异常。通过 exceptionally 方法捕获异常并进行处理后,任务 2 依然可以正常执行。

与其他类似方法的比较

  1. thenRun 与 thenRunAsyncthenRun 方法也是在 CompletableFuture 完成时执行一个 Runnable 任务,但它是在调用任务的线程中执行,而 thenRunAsync 是异步执行,会使用线程池来执行任务。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureThenRunVsThenRunAsyncExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务1执行完毕,线程:" + Thread.currentThread().getName());
            return "任务1的结果";
        }).thenRun(() -> {
            System.out.println("任务2(thenRun)开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2(thenRun)执行完毕,线程:" + Thread.currentThread().getName());
        }).get();
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务3开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务3执行完毕,线程:" + Thread.currentThread().getName());
            return "任务3的结果";
        }).thenRunAsync(() -> {
            System.out.println("任务4(thenRunAsync)开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务4(thenRunAsync)执行完毕,线程:" + Thread.currentThread().getName());
        }).get();
    }
}

在上述代码中,通过观察线程名称可以发现,thenRun 中的任务和前面的任务在同一个线程中执行,而 thenRunAsync 中的任务在不同的线程(线程池中的线程)中执行。 2. thenApplyAsync 与 thenRunAsyncthenApplyAsync 方法用于在 CompletableFuture 完成时,异步执行一个函数,并将前一个任务的结果作为参数传递给这个函数,返回一个新的 CompletableFuture,其结果是函数的返回值。而 thenRunAsync 不关心前一个任务的结果,只在任务完成时执行一个 Runnable 任务,返回的 CompletableFuture 结果为 null。例如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureThenApplyAsyncVsThenRunAsyncExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务1执行完毕,线程:" + Thread.currentThread().getName());
            return "任务1的结果";
        }).thenApplyAsync(result -> {
            System.out.println("任务2(thenApplyAsync)开始执行,线程:" + Thread.currentThread().getName());
            System.out.println("接收到任务1的结果:" + result);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2(thenApplyAsync)执行完毕,线程:" + Thread.currentThread().getName());
            return "处理后的结果";
        }).get();
        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务3开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务3执行完毕,线程:" + Thread.currentThread().getName());
            return "任务3的结果";
        }).thenRunAsync(() -> {
            System.out.println("任务4(thenRunAsync)开始执行,线程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务4(thenRunAsync)执行完毕,线程:" + Thread.currentThread().getName());
        }).get();
    }
}

从上述代码可以看出,thenApplyAsync 能够获取并处理前一个任务的结果,而 thenRunAsync 对前一个任务的结果不感兴趣。

在实际项目中的应用场景

  1. 异步日志记录:在一个 Web 应用中,当用户完成某个操作后,需要记录操作日志。我们可以将日志记录操作放在 thenRunAsync 中异步执行,这样不会影响用户操作的响应速度。例如,用户下单后,订单处理任务完成后,通过 thenRunAsync 执行日志记录任务。
import java.util.concurrent.CompletableFuture;

public class OrderProcessingWithLoggingExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("开始处理订单,线程:" + Thread.currentThread().getName());
            // 模拟订单处理逻辑
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("订单处理完毕,线程:" + Thread.currentThread().getName());
            return "订单处理结果";
        }).thenRunAsync(() -> {
            System.out.println("开始记录订单日志,线程:" + Thread.currentThread().getName());
            // 模拟日志记录逻辑
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("订单日志记录完毕,线程:" + Thread.currentThread().getName());
        });
    }
}
  1. 缓存更新:在一个基于缓存的应用中,当数据更新操作完成后,需要更新缓存。可以使用 thenRunAsync 在数据更新任务完成后异步执行缓存更新任务,避免影响数据更新操作的性能。
import java.util.concurrent.CompletableFuture;

public class DataUpdateWithCacheRefreshExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("开始更新数据,线程:" + Thread.currentThread().getName());
            // 模拟数据更新逻辑
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("数据更新完毕,线程:" + Thread.currentThread().getName());
            return "数据更新结果";
        }).thenRunAsync(() -> {
            System.out.println("开始更新缓存,线程:" + Thread.currentThread().getName());
            // 模拟缓存更新逻辑
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("缓存更新完毕,线程:" + Thread.currentThread().getName());
        });
    }
}
  1. 异步通知:在分布式系统中,当某个核心业务流程完成后,需要通知其他系统。可以通过 thenRunAsync 将通知任务异步执行,减少核心业务流程的执行时间。例如,在电商系统中,订单支付完成后,异步通知物流系统准备发货。
import java.util.concurrent.CompletableFuture;

public class PaymentWithNotificationExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("开始处理支付,线程:" + Thread.currentThread().getName());
            // 模拟支付处理逻辑
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("支付处理完毕,线程:" + Thread.currentThread().getName());
            return "支付结果";
        }).thenRunAsync(() -> {
            System.out.println("开始通知物流系统,线程:" + Thread.currentThread().getName());
            // 模拟通知物流系统逻辑
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("物流系统通知完毕,线程:" + Thread.currentThread().getName());
        });
    }
}

通过深入理解 CompletableFuturethenRunAsync 方法,我们能够更好地利用 Java 的异步编程能力,提高程序的性能和响应性,在实际项目中灵活应用于各种场景。同时,要注意合理使用线程池和异常处理,确保异步任务的正确执行和系统的稳定性。