MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步任务执行顺序控制

2021-08-135.3k 阅读

Java 中 CompletableFuture 异步任务执行顺序控制

CompletableFuture 简介

在 Java 编程中,随着应用程序复杂性的增加,处理异步任务变得越来越重要。CompletableFuture 是 Java 8 引入的一个强大工具,用于处理异步计算。它实现了 Future 接口和 CompletionStage 接口,不仅能够表示一个异步操作的结果,还提供了丰富的方法来控制异步任务的执行顺序、组合多个异步任务等。

CompletableFuture 的核心优势在于它的灵活性和易用性。与传统的 Future 相比,CompletableFuture 可以在任务完成时自动触发后续操作,而不需要像 Future 那样通过轮询或阻塞的方式获取结果。这使得编写异步代码变得更加简洁和高效。

创建 CompletableFuture

  1. 使用 supplyAsync 创建有返回值的异步任务
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        // 模拟一个耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 1 completed";
    });
    
    在上述代码中,supplyAsync 方法接受一个 Supplier 接口的实现,这个 Supplier 接口的 get 方法中定义了异步任务的具体逻辑。supplyAsync 方法会在一个默认的 ForkJoinPool.commonPool() 线程池中执行这个任务,并返回一个 CompletableFuture 对象,该对象最终会包含任务的返回结果。
  2. 使用 runAsync 创建无返回值的异步任务
    CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
        // 模拟一个耗时操作
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task 2 completed");
    });
    
    runAsync 方法接受一个 Runnable 接口的实现,它适用于不需要返回值的异步任务。同样,这个任务会在默认的线程池中执行,返回的 CompletableFuture 对象在任务完成时会包含 null 值。

异步任务执行顺序控制基础

  1. thenApply 方法 thenApply 方法用于在当前 CompletableFuture 完成后,对其结果进行处理并返回一个新的 CompletableFuture
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Hello")
           .thenApply(s -> s + ", World");
    future3.join(); // "Hello, World"
    
    在这里,首先通过 supplyAsync 创建了一个异步任务,该任务返回字符串 "Hello"。然后使用 thenApply 方法,对这个结果进行处理,将其与 ", World" 拼接,返回一个新的 CompletableFuture,其结果为 "Hello, World"thenApply 方法接受一个 Function 接口的实现,该 Function 接口的 apply 方法定义了对结果的处理逻辑。
  2. thenAccept 方法 thenAccept 方法用于在当前 CompletableFuture 完成后,消费其结果,但不返回新的结果。
    CompletableFuture.supplyAsync(() -> "Message")
           .thenAccept(System.out::println);
    
    上述代码中,异步任务返回 "Message"thenAccept 方法接受这个结果并通过 System.out::println 进行打印。thenAccept 方法接受一个 Consumer 接口的实现,Consumer 接口的 accept 方法定义了对结果的消费逻辑。
  3. thenRun 方法 thenRun 方法用于在当前 CompletableFuture 完成后,执行一个无参数的 Runnable
    CompletableFuture.supplyAsync(() -> "Result")
           .thenRun(() -> System.out.println("Task is completed"));
    
    异步任务先返回 "Result",然后 thenRun 方法执行定义的 Runnable,打印出 "Task is completed"thenRun 方法接受一个 Runnable 接口的实现,它不关心之前任务的结果。

顺序执行多个异步任务

  1. 串行任务链 可以通过多次调用 thenApplythenAcceptthenRun 方法来创建一个串行的任务链。
    CompletableFuture.supplyAsync(() -> "Step 1")
           .thenApply(s -> s + " -> Step 2")
           .thenApply(s -> s + " -> Step 3")
           .thenAccept(System.out::println);
    
    上述代码中,第一个异步任务返回 "Step 1",然后依次通过 thenApply 方法对结果进行处理,添加 " -> Step 2"" -> Step 3",最后通过 thenAccept 方法打印最终结果 "Step 1 -> Step 2 -> Step 3"。这样就实现了多个异步任务的顺序执行,每个任务依赖前一个任务的结果。
  2. 复杂任务链中的异常处理 在任务链执行过程中,如果某个任务出现异常,整个任务链会中断。可以使用 exceptionally 方法来处理异常。
    CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("Simulated error");
        }
        return "Success";
    })
           .thenApply(s -> s + " -> Processed")
           .exceptionally(ex -> {
                System.err.println("Caught exception: " + ex.getMessage());
                return "Default value";
            })
           .thenAccept(System.out::println);
    
    在这个例子中,supplyAsync 中的任务有 50% 的概率抛出异常。如果抛出异常,exceptionally 方法会捕获异常,打印异常信息,并返回一个默认值 "Default value"。如果没有异常,任务链会正常执行,返回 "Success -> Processed"

并行执行异步任务并合并结果

  1. allOf 方法 allOf 方法用于等待所有给定的 CompletableFuture 都完成。它返回一个新的 CompletableFuture,当所有输入的 CompletableFuture 都完成时,这个新的 CompletableFuture 也完成。
    CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Future 4 result";
    });
    CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Future 5 result";
    });
    CompletableFuture<Void> allFutures = CompletableFuture.allOf(future4, future5);
    allFutures.join();
    try {
        String result4 = future4.get();
        String result5 = future5.get();
        System.out.println(result4 + " and " + result5);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    
    这里创建了两个异步任务 future4future5,它们会并行执行。allOf 方法返回的 CompletableFuture allFutures 会在 future4future5 都完成时完成。通过调用 join 方法等待所有任务完成,然后通过 get 方法获取每个任务的结果。
  2. anyOf 方法 anyOf 方法用于等待任何一个给定的 CompletableFuture 完成。它返回一个新的 CompletableFuture,当任何一个输入的 CompletableFuture 完成时,这个新的 CompletableFuture 也完成,其结果为第一个完成的 CompletableFuture 的结果。
    CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Future 6 result";
    });
    CompletableFuture<String> future7 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Future 7 result";
    });
    CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future6, future7);
    try {
        Object result = anyFuture.get();
        System.out.println("First completed result: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    
    在这个例子中,future6future7 并行执行,anyOf 方法返回的 anyFuture 会在 future6future7 任何一个完成时完成。通过 get 方法获取第一个完成的任务的结果,这里由于 future7 耗时较短,会先完成,所以打印出 "First completed result: Future 7 result"

基于依赖关系的异步任务执行顺序控制

  1. thenCompose 方法 thenCompose 方法用于将一个 CompletableFuture 的结果作为另一个 CompletableFuture 的输入,并返回一个新的 CompletableFuture。它与 thenApply 的区别在于,thenApply 返回的是一个已经包含处理结果的 CompletableFuture,而 thenCompose 返回的是一个由另一个 CompletableFuture 构成的新 CompletableFuture
    CompletableFuture<String> future8 = CompletableFuture.supplyAsync(() -> "Input")
           .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " -> Processed"));
    future8.join(); // "Input -> Processed"
    
    这里第一个异步任务返回 "Input"thenCompose 方法接受这个结果,并将其作为新的 CompletableFuture 的输入,这个新的 CompletableFuture 会返回 "Input -> Processed"
  2. handle 方法 handle 方法用于在 CompletableFuture 完成(无论是正常完成还是异常完成)时,对结果或异常进行处理,并返回一个新的 CompletableFuture
    CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("Simulated error");
        }
        return "Success";
    })
           .handle((result, ex) -> {
                if (ex != null) {
                    System.err.println("Caught exception: " + ex.getMessage());
                    return "Default value";
                }
                return result + " -> Processed";
            })
           .thenAccept(System.out::println);
    
    exceptionally 方法类似,handle 方法可以处理异常,但它同时也能处理正常完成的情况。在上述代码中,如果任务正常完成,会在结果后添加 " -> Processed";如果任务抛出异常,会捕获异常并返回默认值。

CompletableFuture 执行顺序控制在实际项目中的应用场景

  1. 微服务调用链 在微服务架构中,一个业务操作可能需要调用多个微服务。例如,一个电商应用中,获取商品详情可能需要调用商品服务获取基本信息,再调用库存服务获取库存信息,最后调用评论服务获取评论信息。可以使用 CompletableFuture 来并行调用这些微服务,然后合并结果。
    CompletableFuture<String> productInfoFuture = CompletableFuture.supplyAsync(() -> {
        // 模拟调用商品服务
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Product details";
    });
    CompletableFuture<String> stockInfoFuture = CompletableFuture.supplyAsync(() -> {
        // 模拟调用库存服务
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Stock available";
    });
    CompletableFuture<String> reviewInfoFuture = CompletableFuture.supplyAsync(() -> {
        // 模拟调用评论服务
        try {
            Thread.sleep(2500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Positive reviews";
    });
    CompletableFuture<Void> allServices = CompletableFuture.allOf(productInfoFuture, stockInfoFuture, reviewInfoFuture);
    allServices.join();
    try {
        String productInfo = productInfoFuture.get();
        String stockInfo = stockInfoFuture.get();
        String reviewInfo = reviewInfoFuture.get();
        System.out.println("Combined result: " + productInfo + ", " + stockInfo + ", " + reviewInfo);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    
    通过这种方式,可以显著提高系统的响应速度,因为各个微服务调用是并行进行的,而不是串行依次调用。
  2. 数据预处理和后处理流水线 在大数据处理场景中,可能需要对数据进行一系列的预处理操作,如清洗、转换,然后进行计算,最后进行后处理,如结果格式化。可以使用 CompletableFuture 来构建一个异步处理流水线。
    CompletableFuture<String> dataFuture = CompletableFuture.supplyAsync(() -> "Dirty data");
    CompletableFuture<String> cleanedDataFuture = dataFuture.thenApply(s -> {
        // 模拟数据清洗
        return s.replace("Dirty", "Cleaned");
    });
    CompletableFuture<String> processedDataFuture = cleanedDataFuture.thenApply(s -> {
        // 模拟数据处理
        return s + " -> Processed";
    });
    CompletableFuture<String> formattedDataFuture = processedDataFuture.thenApply(s -> {
        // 模拟结果格式化
        return "Formatted: " + s;
    });
    formattedDataFuture.thenAccept(System.out::println);
    
    这样,数据在不同的异步任务中依次进行清洗、处理和格式化,每个任务依赖前一个任务的结果,实现了高效的异步数据处理流水线。

高级技巧:自定义线程池

  1. 使用自定义线程池执行 CompletableFuture 任务 默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 线程池来执行任务。但在一些场景下,可能需要使用自定义线程池来满足特定的需求,比如限制并发数、设置线程优先级等。
    ExecutorService executor = Executors.newFixedThreadPool(3);
    CompletableFuture<String> future9 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task result";
    }, executor);
    future9.join();
    executor.shutdown();
    
    在上述代码中,通过 Executors.newFixedThreadPool(3) 创建了一个固定大小为 3 的线程池 executor。然后使用 supplyAsync 方法的第二个参数,将这个线程池传递进去,这样异步任务就会在这个自定义线程池中执行。最后,任务完成后关闭线程池。
  2. 线程池对任务执行顺序的影响 自定义线程池的配置会影响 CompletableFuture 任务的执行顺序。例如,如果使用一个单线程的线程池,那么多个 CompletableFuture 任务会依次执行,而不是并行执行。
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    CompletableFuture<String> future10 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 10 result";
    }, singleThreadExecutor);
    CompletableFuture<String> future11 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 11 result";
    }, singleThreadExecutor);
    CompletableFuture<Void> allTasks = CompletableFuture.allOf(future10, future11);
    allTasks.join();
    try {
        String result10 = future10.get();
        String result11 = future11.get();
        System.out.println(result10 + " and " + result11);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    singleThreadExecutor.shutdown();
    
    在这个例子中,由于使用了单线程的线程池,future10future11 会依次执行,先执行 future10,再执行 future11,最后打印出两个任务的结果。

注意事项

  1. 内存泄漏风险 如果在 CompletableFuture 中创建了资源(如数据库连接、文件句柄等),并且没有正确释放,可能会导致内存泄漏。确保在任务完成后及时关闭或释放这些资源。
    CompletableFuture<Void> future12 = CompletableFuture.runAsync(() -> {
        // 模拟打开一个数据库连接
        Connection connection = null;
        try {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password");
            // 执行数据库操作
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    });
    
  2. 异常处理的完整性 在使用 CompletableFuture 时,要确保异常处理的完整性。如果没有正确处理异常,可能会导致程序出现未捕获的异常,影响系统的稳定性。不仅要在任务执行过程中处理异常,还要在任务链的各个环节进行适当的异常处理。
    CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("Simulated error");
        }
        return "Success";
    })
           .thenApply(s -> s + " -> Processed")
           .exceptionally(ex -> {
                System.err.println("Caught exception: " + ex.getMessage());
                return "Default value";
            })
           .thenAccept(System.out::println);
    
    上述代码在任务执行和结果处理过程中都考虑了异常情况,保证了程序的健壮性。

通过深入理解和合理运用 CompletableFuture 的各种方法,开发者可以在 Java 中高效地控制异步任务的执行顺序,提高程序的性能和响应能力,使其在各种复杂的应用场景中发挥重要作用。无论是在微服务架构、大数据处理还是其他异步编程场景中,CompletableFuture 都是一个不可或缺的工具。