MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步执行功能

2021-01-195.3k 阅读

Java 中 CompletableFuture 异步执行功能概述

在Java编程中,随着应用程序复杂度的增加以及对性能和响应性要求的提高,异步编程变得愈发重要。CompletableFuture 作为Java 8引入的一个强大工具,为异步编程提供了一种简洁且高效的方式。它不仅允许我们异步执行任务,还能方便地处理异步操作的结果、错误以及组合多个异步操作。

CompletableFuture 的基础使用

创建 CompletableFuture

  1. 使用 CompletableFuture.runAsync 创建无返回值的异步任务 CompletableFuture.runAsync 方法用于异步执行一个没有返回值的任务。它接收一个 Runnable 对象作为参数,并返回一个 CompletableFuture<Void>。示例代码如下:

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureExample {
        public static void main(String[] args) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                // 模拟耗时操作
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("异步任务执行完毕");
            });
    
            // 主线程可以继续执行其他任务
            System.out.println("主线程继续执行");
    
            // 等待异步任务完成
            try {
                future.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    在上述代码中,CompletableFuture.runAsync 启动了一个异步任务,该任务会睡眠2秒后输出“异步任务执行完毕”。主线程会继续执行并输出“主线程继续执行”,然后通过 future.get() 等待异步任务完成。

  2. 使用 CompletableFuture.supplyAsync 创建有返回值的异步任务 CompletableFuture.supplyAsync 方法用于异步执行一个有返回值的任务。它接收一个 Supplier 对象作为参数,并返回一个 CompletableFuture<T>,其中 T 是返回值的类型。示例代码如下:

    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureSupplyExample {
        public static void main(String[] args) {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                // 模拟耗时操作
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 42;
            });
    
            // 主线程可以继续执行其他任务
            System.out.println("主线程继续执行");
    
            // 获取异步任务的返回值
            try {
                Integer result = future.get();
                System.out.println("异步任务的返回值: " + result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    这里,CompletableFuture.supplyAsync 启动的异步任务睡眠2秒后返回42。主线程获取到异步任务的返回值并输出。

获取异步任务的结果

  1. 使用 get() 方法 get() 方法用于获取异步任务的结果。如果异步任务尚未完成,调用 get() 方法的线程会被阻塞,直到任务完成。例如:

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello, CompletableFuture!");
    try {
        String result = future.get();
        System.out.println(result);
    } catch (Exception e) {
        e.printStackTrace();
    }
    

    上述代码中,future.get() 会阻塞主线程,直到异步任务返回“Hello, CompletableFuture!”并赋值给 result

  2. 使用 get(long timeout, TimeUnit unit) 方法 get(long timeout, TimeUnit unit) 方法允许设置等待异步任务完成的最长时间。如果在指定时间内任务未完成,会抛出 TimeoutException。示例如下:

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureTimeoutExample {
        public static void main(String[] args) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "任务完成";
            });
    
            try {
                String result = future.get(2, TimeUnit.SECONDS);
                System.out.println(result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    在这个例子中,异步任务需要3秒完成,但只设置了2秒的等待时间,因此会抛出 TimeoutException

处理异步任务的完成和错误

处理任务完成

  1. 使用 thenAccept 方法 thenAccept 方法用于在异步任务完成后执行一个 Consumer 操作。它接收一个 Consumer 对象作为参数,该 Consumer 会处理异步任务的结果,但不返回新的结果。示例代码如下:

    CompletableFuture.supplyAsync(() -> "异步结果")
                     .thenAccept(result -> System.out.println("处理异步结果: " + result));
    

    这里,异步任务返回“异步结果”,thenAccept 中的 Consumer 会输出“处理异步结果: 异步结果”。

  2. 使用 thenApply 方法 thenApply 方法用于在异步任务完成后执行一个 Function 操作,并返回新的 CompletableFutureFunction 会处理异步任务的结果并返回一个新的值。示例如下:

    CompletableFuture.supplyAsync(() -> 10)
                     .thenApply(result -> result * 2)
                     .thenAccept(finalResult -> System.out.println("最终结果: " + finalResult));
    

    异步任务先返回10,thenApply 将其乘以2,最终输出“最终结果: 20”。

处理任务错误

  1. 使用 exceptionally 方法 exceptionally 方法用于在异步任务发生异常时提供一个替代结果。它接收一个 Function 对象作为参数,该 FunctionThrowable 为输入并返回一个替代值。示例代码如下:

    CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("模拟异常");
        }
        return "正常结果";
    })
                     .exceptionally(ex -> {
                         System.out.println("捕获到异常: " + ex.getMessage());
                         return "替代结果";
                     })
                     .thenAccept(result -> System.out.println("结果: " + result));
    

    上述代码中,异步任务有50%的概率抛出异常,exceptionally 捕获到异常并返回“替代结果”。

  2. 使用 handle 方法 handle 方法可以同时处理异步任务的正常结果和异常情况。它接收一个 BiFunction 对象作为参数,该 BiFunction 的第一个参数是异步任务的结果(如果没有异常),第二个参数是异常(如果有异常),并返回一个新的值。示例如下:

    CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("模拟异常");
        }
        return "正常结果";
    })
                     .handle((result, ex) -> {
                         if (ex != null) {
                             System.out.println("捕获到异常: " + ex.getMessage());
                             return "替代结果";
                         }
                         return result;
                     })
                     .thenAccept(finalResult -> System.out.println("最终结果: " + finalResult));
    

    此例中,handle 同样处理了异常并返回相应结果。

组合多个 CompletableFuture

串行组合

  1. 使用 thenCompose 方法 thenCompose 方法用于将两个 CompletableFuture 串行组合。它接收一个 Function 对象作为参数,该 Function 以第一个 CompletableFuture 的结果为输入,并返回另一个 CompletableFuture。示例代码如下:

    CompletableFuture.supplyAsync(() -> "Hello")
                     .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ", World!"))
                     .thenAccept(System.out::println);
    

    首先,第一个异步任务返回“Hello”,thenCompose 接收这个结果并启动第二个异步任务,将其拼接为“Hello, World!”并输出。

  2. 使用 thenApplyflatMap 的对比 thenApply 方法返回的是一个包装了新结果的 CompletableFuture,而 thenCompose(类似于 flatMap)会将内部的 CompletableFuture 展开。例如:

    CompletableFuture.supplyAsync(() -> "Hello")
                     .thenApply(s -> CompletableFuture.supplyAsync(() -> s + ", World!"))
                     .thenAccept(future -> {
                         try {
                             System.out.println(future.get());
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
                     });
    

    这里 thenApply 返回的是 CompletableFuture<CompletableFuture<String>>,需要额外的 get() 操作来获取最终结果。而 thenCompose 直接返回 CompletableFuture<String>

并行组合

  1. 使用 CompletableFuture.allOf 方法 CompletableFuture.allOf 方法用于等待所有给定的 CompletableFuture 都完成。它接收多个 CompletableFuture 作为参数,并返回一个 CompletableFuture<Void>。示例如下:

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务1完成";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务2完成";
    });
    
    CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
    allFuture.join();
    
    try {
        System.out.println(future1.get());
        System.out.println(future2.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
    

    这里 allOf 等待 future1future2 都完成,然后可以获取它们各自的结果。

  2. 使用 CompletableFuture.anyOf 方法 CompletableFuture.anyOf 方法用于等待任何一个给定的 CompletableFuture 完成。它接收多个 CompletableFuture 作为参数,并返回一个 CompletableFuture<Object>,其结果是第一个完成的 CompletableFuture 的结果。示例代码如下:

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务3完成";
    });
    CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务4完成";
    });
    
    CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future3, future4);
    try {
        System.out.println(anyFuture.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
    

    由于 future4 耗时更短,anyOf 会返回 future4 的结果“任务4完成”。

CompletableFuture 的原理与实现细节

异步执行的线程模型

CompletableFuture 的异步执行依赖于Java的线程池。runAsyncsupplyAsync 方法如果不传入自定义的 Executor,会使用 ForkJoinPool.commonPool() 作为默认的线程池。ForkJoinPool 是Java 7引入的一种特殊线程池,它采用工作窃取算法,能够有效地利用多线程环境下的计算资源。 例如,在多核CPU环境中,ForkJoinPool 可以将任务细分并分配到不同的线程中执行,提高整体的执行效率。当一个线程完成自己的任务后,它可以从其他忙碌线程的任务队列中窃取任务来执行,从而减少线程的空闲时间。

结果的存储与传播

CompletableFuture 通过内部的状态变量来存储任务的执行状态(如未完成、已完成、异常完成等)以及结果。当异步任务完成时,会更新这些状态变量,并通知等待的线程。 例如,thenAcceptthenApply 等方法注册的回调函数会被存储在一个内部队列中。当任务完成时,会按照顺序依次执行这些回调函数。如果任务异常完成,exceptionallyhandle 等方法注册的异常处理逻辑也会被执行。

与其他异步框架的比较

  1. 与 Future 的比较 Future 是Java 5引入的用于异步操作的接口,它提供了基本的异步任务管理功能,如获取任务结果、检查任务是否完成等。然而,Future 存在一些局限性。例如,它缺乏对异步任务完成后的处理能力,不能方便地组合多个异步任务,并且获取结果时只能通过阻塞方式(get() 方法)。 相比之下,CompletableFuture 继承自 Future 并扩展了其功能。它提供了丰富的方法来处理异步任务的完成、错误以及组合多个异步任务,使得异步编程更加简洁和灵活。

  2. 与 RxJava 的比较 RxJava 是一个基于观察者模式的异步编程框架,它提供了强大的异步流处理功能。与 CompletableFuture 相比,RxJava 更加侧重于处理异步数据流,支持复杂的操作符(如 map、filter、flatMap 等)来操作数据流。 CompletableFuture 则更专注于单个异步任务的处理以及任务之间的简单组合。在简单的异步任务场景下,CompletableFuture 的使用更加简洁,而在处理复杂的异步数据流场景下,RxJava 则更具优势。

在实际项目中的应用场景

高并发请求处理

在Web应用开发中,当需要同时处理多个外部API请求时,使用 CompletableFuture 可以将这些请求异步化,提高系统的响应速度。例如,一个电商应用需要同时获取商品信息、库存信息和用户评价信息,可以通过 CompletableFuture 并行发起这三个请求,然后等待所有请求完成后再进行结果的整合和展示。

CompletableFuture<String> productInfoFuture = CompletableFuture.supplyAsync(() -> {
    // 模拟获取商品信息
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "商品信息";
});
CompletableFuture<String> stockInfoFuture = CompletableFuture.supplyAsync(() -> {
    // 模拟获取库存信息
    try {
        Thread.sleep(1500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "库存信息";
});
CompletableFuture<String> reviewInfoFuture = CompletableFuture.supplyAsync(() -> {
    // 模拟获取用户评价信息
    try {
        Thread.sleep(2500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "用户评价信息";
});

CompletableFuture<Void> allFuture = CompletableFuture.allOf(productInfoFuture, stockInfoFuture, reviewInfoFuture);
allFuture.join();

try {
    System.out.println("整合结果: " + productInfoFuture.get() + ", " + stockInfoFuture.get() + ", " + reviewInfoFuture.get());
} catch (Exception e) {
    e.printStackTrace();
}

异步计算与数据处理

在大数据处理场景中,有时需要对大量数据进行异步计算。例如,对一批用户数据进行统计分析,每个用户的数据处理可以作为一个异步任务,使用 CompletableFuture 并行处理,最后汇总结果。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class DataAnalysisExample {
    public static void main(String[] args) {
        List<Integer> userData = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            userData.add(i);
        }

        List<CompletableFuture<Integer>> futures = new ArrayList<>();
        for (Integer data : userData) {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                // 模拟数据处理
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return data * 2;
            });
            futures.add(future);
        }

        List<Integer> results = new ArrayList<>();
        for (CompletableFuture<Integer> future : futures) {
            try {
                results.add(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        System.out.println("汇总结果: " + results);
    }
}

任务编排与流程控制

在复杂的业务流程中,可能需要按照一定的顺序执行多个异步任务,并且在任务之间传递数据。CompletableFuture 的串行和并行组合方法可以很好地满足这种需求。例如,一个订单处理流程,首先需要异步验证用户信息,然后异步检查库存,最后异步生成订单。

CompletableFuture.supplyAsync(() -> {
    // 模拟用户信息验证
    try {
        Thread.sleep(1500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "用户信息验证通过";
})
.thenCompose(s -> CompletableFuture.supplyAsync(() -> {
    // 模拟库存检查
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return s + ", 库存检查通过";
}))
.thenCompose(s -> CompletableFuture.supplyAsync(() -> {
    // 模拟订单生成
    try {
        Thread.sleep(2500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return s + ", 订单生成成功";
}))
.thenAccept(System.out::println);

通过以上详细的介绍、丰富的代码示例以及对原理和实际应用场景的探讨,相信你对Java中 CompletableFuture 的异步执行功能有了深入的理解。在实际编程中,可以根据具体的需求灵活运用 CompletableFuture,提升应用程序的性能和响应性。