MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture优化多步骤异步操作的技巧

2024-09-046.2k 阅读

Java CompletableFuture 基础概述

CompletableFuture 是什么

在 Java 8 引入 CompletableFuture 之前,处理异步操作相对繁琐。CompletableFuture 实现了 Future 接口和 CompletionStage 接口,它不仅能获取异步操作的结果,还能在异步操作完成时触发回调,以一种更灵活、高效的方式管理异步任务。

异步操作的常见场景

在实际开发中,很多场景都需要异步处理。比如,在一个电商系统中,查询商品信息、获取库存、计算促销价格等操作可能来自不同的数据源或服务,将这些操作异步化可以显著提高系统的响应速度。又比如,在一个数据分析系统中,从多个数据库表中读取数据进行分析,如果这些读取操作串行执行会耗费大量时间,异步处理则能充分利用多核 CPU 的优势,加快整体处理速度。

创建 CompletableFuture

简单异步任务创建

使用 CompletableFuture.supplyAsync 方法可以创建一个异步任务并返回结果。例如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        });
        String result = future.get();
        System.out.println(result);
    }
}

在上述代码中,supplyAsync 接受一个 Supplier 作为参数,该 Supplier 在一个新的线程中执行,执行完毕后返回结果。future.get() 方法会阻塞当前线程,直到异步任务完成并返回结果。

无返回值的异步任务

如果异步任务不需要返回值,可以使用 CompletableFuture.runAsync 方法。示例如下:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureRunAsyncExample {
    public static void main(String[] args) {
        CompletableFuture.runAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务执行完毕,无返回值");
        });
        System.out.println("主线程继续执行");
    }
}

这里 runAsync 接受一个 Runnable,在新线程中执行该 Runnable,但不会返回值。主线程不会等待异步任务完成,而是继续执行后续代码。

链式调用优化多步骤异步操作

顺序执行多个异步任务

假设我们有一系列的异步任务,需要顺序执行。例如,先从数据库读取用户信息,然后根据用户信息查询订单,最后计算订单总价。可以使用 thenApply 方法来实现:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class User {
    private String name;

    public User(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

class Order {
    private double price;

    public Order(double price) {
        this.price = price;
    }

    public double getPrice() {
        return price;
    }
}

public class SequentialAsyncTasks {
    public static CompletableFuture<Double> calculateTotalPrice() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据库读取用户信息
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new User("John");
        }).thenApply(user -> {
            // 模拟根据用户查询订单
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Order(100.0);
        }).thenApply(order -> {
            // 模拟计算订单总价
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return order.getPrice();
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        double totalPrice = calculateTotalPrice().get();
        System.out.println("订单总价: " + totalPrice);
    }
}

在上述代码中,thenApply 方法接收一个 Function,它会在前面的异步任务完成后执行,并且将前一个任务的结果作为参数传递给 Function。这样就实现了多个异步任务的顺序执行。

处理异步任务的异常

在异步任务执行过程中,可能会出现异常。CompletableFuture 提供了 exceptionally 方法来处理异常。例如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class HandleAsyncException {
    public static CompletableFuture<String> asyncTaskWithException() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟可能抛出异常的操作
            if (Math.random() < 0.5) {
                throw new RuntimeException("模拟异常");
            }
            return "任务成功完成";
        }).exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return "默认结果";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String result = asyncTaskWithException().get();
        System.out.println("结果: " + result);
    }
}

在这个例子中,如果 supplyAsync 中的任务抛出异常,exceptionally 中的 Function 会被执行,返回一个默认结果,避免整个异步任务链因为异常而中断。

异步任务完成时的回调

whenComplete 方法可以在异步任务完成(无论成功还是失败)时执行回调。示例如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletionCallback {
    public static CompletableFuture<String> asyncTaskWithCallback() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务完成";
        }).whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("任务成功,结果: " + result);
            } else {
                System.out.println("任务失败,异常: " + ex.getMessage());
            }
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String result = asyncTaskWithCallback().get();
        System.out.println("最终结果: " + result);
    }
}

这里 whenComplete 接收一个 BiConsumer,第一个参数是任务的结果,第二个参数是可能出现的异常。通过这种方式,可以在任务完成时进行一些额外的处理,比如记录日志等。

并行执行多个异步任务

并行任务的创建与合并

在一些场景下,我们希望多个异步任务并行执行,然后合并它们的结果。例如,在一个搜索引擎系统中,可能需要同时从多个数据源获取搜索结果,然后合并这些结果返回给用户。可以使用 CompletableFuture.allOfCompletableFuture.join 方法来实现:

import java.util.concurrent.CompletableFuture;

public class ParallelAsyncTasks {
    public static void main(String[] args) {
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            // 模拟从数据源1获取数据
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "数据源1的结果";
        });
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            // 模拟从数据源2获取数据
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "数据源2的结果";
        });

        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2);
        allTasks.join();

        String combinedResult = task1.join() + " " + task2.join();
        System.out.println("合并结果: " + combinedResult);
    }
}

在上述代码中,CompletableFuture.allOf 方法接收多个 CompletableFuture 作为参数,它返回一个新的 CompletableFuture,当所有传入的 CompletableFuture 都完成时,这个新的 CompletableFuture 才完成。join 方法用于等待 CompletableFuture 完成并获取结果。

并行任务中的异常处理

在并行任务执行过程中,如果某个任务抛出异常,allOf 返回的 CompletableFuture 也会异常完成。可以通过 exceptionally 方法来处理异常。例如:

import java.util.concurrent.CompletableFuture;

public class ParallelAsyncTasksWithException {
    public static void main(String[] args) {
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            // 模拟从数据源1获取数据
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "数据源1的结果";
        });
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            // 模拟从数据源2获取数据,可能抛出异常
            if (Math.random() < 0.5) {
                throw new RuntimeException("模拟异常");
            }
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "数据源2的结果";
        });

        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2);
        allTasks.exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return null;
        }).join();

        try {
            String combinedResult = task1.join() + " " + task2.join();
            System.out.println("合并结果: " + combinedResult);
        } catch (Exception e) {
            System.out.println("获取结果时发生异常: " + e.getMessage());
        }
    }
}

在这个例子中,allTasks.exceptionally 用于捕获并行任务中的异常。如果有任务抛出异常,exceptionally 中的 Function 会被执行,打印异常信息。在获取任务结果时,也需要进行异常处理,以避免程序因为异常而崩溃。

使用自定义线程池优化异步操作

为什么要使用自定义线程池

默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 来执行异步任务。然而,在一些高并发场景下,commonPool 可能会因为任务过多而导致线程饥饿等问题。使用自定义线程池可以根据业务需求灵活调整线程数量、线程优先级等参数,提高系统的性能和稳定性。

创建并使用自定义线程池

下面是一个使用自定义线程池的示例:

import java.util.concurrent.*;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务完成";
        }, executor);

        future.thenAccept(result -> {
            System.out.println("结果: " + result);
            executor.shutdown();
        });
    }
}

在上述代码中,首先创建了一个固定大小为 5 的线程池 executor。然后,CompletableFuture.supplyAsync 方法除了接收 Supplier 外,还接收一个 Executor,这里传入了自定义的线程池 executor。任务执行完成后,通过 thenAccept 方法处理结果,并关闭线程池。这样就确保了异步任务在自定义的线程池中执行,提高了资源管理的灵活性。

CompletableFuture 的高级特性

异步任务的超时处理

在实际应用中,有些异步任务可能因为网络问题或其他原因长时间无法完成,这时候需要设置一个超时时间,避免程序一直等待。CompletableFuture 可以通过 get 方法的重载版本来实现超时处理。例如:

import java.util.concurrent.*;

public class TimeoutExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务完成";
        });

        try {
            String result = future.get(3, TimeUnit.SECONDS);
            System.out.println("结果: " + result);
        } catch (TimeoutException e) {
            System.out.println("任务超时");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,future.get(3, TimeUnit.SECONDS) 表示等待异步任务完成的最长时间为 3 秒。如果 3 秒内任务没有完成,会抛出 TimeoutException,程序可以捕获该异常并进行相应处理。

异步任务的依赖管理

有时候,一个异步任务的执行依赖于其他多个异步任务的部分结果。例如,在一个机器学习模型训练系统中,模型训练任务可能依赖于数据预处理任务的部分结果。CompletableFuture 可以通过 thenCompose 等方法来实现这种复杂的依赖关系。

假设我们有两个异步任务,一个是获取用户数据,另一个是根据用户数据生成报告,但生成报告只需要用户数据中的部分信息。代码示例如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class UserData {
    private String name;
    private int age;

    public UserData(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }
}

class Report {
    private String content;

    public Report(String content) {
        this.content = content;
    }

    public String getContent() {
        return content;
    }
}

public class DependencyManagement {
    public static CompletableFuture<Report> generateReport() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟获取用户数据
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new UserData("John", 30);
        }).thenCompose(userData -> CompletableFuture.supplyAsync(() -> {
            // 模拟根据用户数据中的部分信息生成报告
            String reportContent = "用户 " + userData.getName() + " 的报告";
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Report(reportContent);
        }));
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Report report = generateReport().get();
        System.out.println("报告内容: " + report.getContent());
    }
}

在上述代码中,thenCompose 方法接收一个 Function,该 Function 的参数是前一个异步任务的结果。通过这种方式,可以在第二个异步任务中根据第一个异步任务的部分结果进行操作,实现了复杂的异步任务依赖管理。

通过以上对 CompletableFuture 在优化多步骤异步操作方面的详细介绍,包括基础概念、创建方式、链式调用、并行执行、自定义线程池以及高级特性等内容,相信开发者能够更好地利用 CompletableFuture 来提升 Java 应用程序的性能和响应速度,在处理复杂异步场景时更加得心应手。在实际应用中,需要根据具体的业务需求和系统架构,合理选择和组合这些技巧,以达到最优的效果。