MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 多个任务 AND 组合关系

2022-05-206.5k 阅读

Java 中 CompletableFuture 多个任务 AND 组合关系

在现代的 Java 编程中,异步编程变得越来越重要。CompletableFuture 是 Java 8 引入的一个强大工具,它使得异步编程更加简洁和高效。当处理多个异步任务时,常常会遇到需要多个任务都成功完成后再进行下一步操作的场景,这就是所谓的多个任务的 AND 组合关系。

CompletableFuture 基础回顾

在深入探讨多个任务的 AND 组合关系之前,先简单回顾一下 CompletableFuture 的基础知识。CompletableFuture 实现了 FutureCompletionStage 接口,它代表一个异步计算的结果。

创建 CompletableFuture

  1. 使用 CompletableFuture.supplyAsync 创建有返回值的异步任务

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        // 模拟一个耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 1 completed";
    });
    

    在上述代码中,supplyAsync 方法接受一个 Supplier 作为参数,在新的线程中执行这个 Supplierget 方法,并返回一个 CompletableFuture,其结果就是 Supplier 的返回值。

  2. 使用 CompletableFuture.runAsync 创建无返回值的异步任务

    CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
        // 模拟一个耗时操作
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task 2 completed");
    });
    

    runAsync 方法接受一个 Runnable 作为参数,同样在新线程中执行 Runnablerun 方法,返回的 CompletableFuture 的结果类型为 Void

获取 CompletableFuture 的结果

  1. 使用 get 方法阻塞获取结果

    try {
        String result = future1.get();
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    get 方法会阻塞当前线程,直到 CompletableFuture 完成并返回结果。如果 CompletableFuture 执行过程中抛出异常,get 方法会将异常包装成 ExecutionException 抛出。

  2. 使用 join 方法阻塞获取结果

    String result = future1.join();
    System.out.println(result);
    

    join 方法与 get 方法类似,也是阻塞当前线程获取结果。不同之处在于,join 方法不会抛出 InterruptedExceptionExecutionException,如果 CompletableFuture 执行过程中抛出异常,join 方法会将异常直接抛出。

多个 CompletableFuture 的 AND 组合关系

在实际应用中,常常需要多个异步任务都完成后再进行下一步操作。CompletableFuture 提供了几种方法来实现这种 AND 组合关系。

thenCombine 方法

thenCombine 方法用于将两个 CompletableFuture 的结果进行合并。当两个 CompletableFuture 都完成时,会执行 thenCombine 方法中的 BiFunction

  1. 示例代码

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 3";
    });
    
    CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 4";
    });
    
    CompletableFuture<String> combinedFuture = future3.thenCombine(future4, (result3, result4) -> {
        return result3 + " and " + result4;
    });
    
    try {
        String finalResult = combinedFuture.get();
        System.out.println(finalResult);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    在上述代码中,future3future4 是两个异步任务。thenCombine 方法将这两个任务的结果合并,BiFunction 中的参数 result3result4 分别是 future3future4 的结果。最后通过 get 方法获取合并后的结果并打印。

  2. 原理分析 thenCombine 方法返回一个新的 CompletableFuture,这个新的 CompletableFuture 会在 future3future4 都完成时被触发。当 future3future4 中的任何一个发生异常时,新的 CompletableFuture 也会以异常状态结束,并且 get 方法会抛出相应的异常。

thenAcceptBoth 方法

thenAcceptBoth 方法与 thenCombine 方法类似,不同之处在于 thenAcceptBoth 方法不返回合并后的结果,而是直接消费两个 CompletableFuture 的结果。

  1. 示例代码

    CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 5";
    });
    
    CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 6";
    });
    
    CompletableFuture<Void> voidFuture = future5.thenAcceptBoth(future6, (result5, result6) -> {
        System.out.println(result5 + " and " + result6);
    });
    
    try {
        voidFuture.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    在这个例子中,future5future6 是两个异步任务。thenAcceptBoth 方法中的 BiConsumer 直接消费 future5future6 的结果,而不返回新的结果。最后通过 get 方法等待任务完成。

  2. 原理分析 thenAcceptBoth 方法返回一个 CompletableFuture<Void>,当 future5future6 都完成时,BiConsumer 会被执行。如果 future5future6 发生异常,返回的 CompletableFuture<Void> 也会以异常状态结束,get 方法会抛出相应的异常。

allOf 方法

allOf 方法用于等待所有的 CompletableFuture 都完成。它返回一个新的 CompletableFuture<Void>,当所有传入的 CompletableFuture 都正常完成时,这个新的 CompletableFuture 也会正常完成;如果其中任何一个 CompletableFuture 发生异常,新的 CompletableFuture 会以异常状态结束。

  1. 示例代码

    CompletableFuture<String> future7 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 7";
    });
    
    CompletableFuture<String> future8 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 8";
    });
    
    CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future7, future8);
    
    allOfFuture.join();
    
    try {
        String result7 = future7.get();
        String result8 = future8.get();
        System.out.println(result7 + " and " + result8);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    在上述代码中,allOf 方法接受多个 CompletableFuture 作为参数,并返回一个新的 CompletableFuture<Void>。通过 join 方法等待所有任务完成,然后分别获取 future7future8 的结果并打印。

  2. 原理分析 allOf 方法返回的 CompletableFuture<Void> 会在所有传入的 CompletableFuture 都完成时完成。它内部使用了一种计数机制,每当一个传入的 CompletableFuture 完成时,计数减一。当计数为 0 时,表示所有任务都完成,此时返回的 CompletableFuture<Void> 也会完成。如果在这个过程中任何一个 CompletableFuture 发生异常,返回的 CompletableFuture<Void> 会立即以异常状态结束。

处理异常情况

在处理多个 CompletableFuture 的 AND 组合关系时,异常处理是非常重要的。

单个 CompletableFuture 异常处理

  1. 使用 exceptionally 方法处理异常
    CompletableFuture<String> future9 = CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("Simulated exception");
        }
        return "Normal result";
    }).exceptionally(ex -> {
        System.out.println("Caught exception: " + ex.getMessage());
        return "Default value";
    });
    
    try {
        String result = future9.get();
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    
    在上述代码中,exceptionally 方法接受一个 Function 作为参数,当 CompletableFuture 发生异常时,这个 Function 会被执行,返回一个默认值。

多个 CompletableFuture AND 组合时的异常处理

  1. thenCombinethenAcceptBoth 方法的异常处理 如前文所述,当 thenCombinethenAcceptBoth 中的任何一个 CompletableFuture 发生异常时,返回的新 CompletableFuture 也会以异常状态结束。可以通过 exceptionally 方法处理这个异常。

    CompletableFuture<String> future10 = CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("Exception in Future 10");
        }
        return "Result from Future 10";
    });
    
    CompletableFuture<String> future11 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 11";
    });
    
    CompletableFuture<String> combinedFuture2 = future10.thenCombine(future11, (result10, result11) -> {
        return result10 + " and " + result11;
    }).exceptionally(ex -> {
        System.out.println("Caught exception: " + ex.getMessage());
        return "Default combined result";
    });
    
    try {
        String finalResult = combinedFuture2.get();
        System.out.println(finalResult);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    在这个例子中,future10 有可能抛出异常,thenCombine 方法返回的 combinedFuture2 可以通过 exceptionally 方法处理异常并返回默认值。

  2. allOf 方法的异常处理 对于 allOf 方法返回的 CompletableFuture<Void>,同样可以通过 exceptionally 方法处理异常。

    CompletableFuture<String> future12 = CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("Exception in Future 12");
        }
        return "Result from Future 12";
    });
    
    CompletableFuture<String> future13 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 13";
    });
    
    CompletableFuture<Void> allOfFuture2 = CompletableFuture.allOf(future12, future13).exceptionally(ex -> {
        System.out.println("Caught exception: " + ex.getMessage());
        return null;
    });
    
    allOfFuture2.join();
    
    try {
        String result12 = future12.get();
        String result13 = future13.get();
        System.out.println(result12 + " and " + result13);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    在上述代码中,allOfFuture2allOf 方法返回的 CompletableFuture<Void>,通过 exceptionally 方法处理可能发生的异常。

应用场景

多个 CompletableFuture 的 AND 组合关系在实际开发中有很多应用场景。

微服务调用

在微服务架构中,一个业务操作可能需要调用多个微服务。例如,一个订单处理系统可能需要同时调用库存微服务和用户信息微服务。只有当这两个微服务都成功返回结果后,才能继续进行订单的处理。

CompletableFuture<InventoryInfo> inventoryFuture = CompletableFuture.supplyAsync(() -> {
    // 调用库存微服务
    return inventoryService.getInventoryInfo();
});

CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> {
    // 调用用户信息微服务
    return userService.getUserInfo();
});

CompletableFuture<Void> orderProcessFuture = inventoryFuture.thenAcceptBoth(userFuture, (inventoryInfo, userInfo) -> {
    // 根据库存信息和用户信息处理订单
    orderService.processOrder(inventoryInfo, userInfo);
});

try {
    orderProcessFuture.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

数据聚合

在数据处理中,可能需要从多个数据源获取数据,然后将这些数据聚合起来。例如,从一个数据库获取用户基本信息,从另一个数据库获取用户的交易记录,只有当这两个数据源的数据都获取成功后,才能生成完整的用户报告。

CompletableFuture<UserBasicInfo> basicInfoFuture = CompletableFuture.supplyAsync(() -> {
    // 从数据库获取用户基本信息
    return userBasicInfoDao.getUserBasicInfo();
});

CompletableFuture<UserTransactionRecord> transactionFuture = CompletableFuture.supplyAsync(() -> {
    // 从数据库获取用户交易记录
    return userTransactionRecordDao.getUserTransactionRecord();
});

CompletableFuture<UserReport> reportFuture = basicInfoFuture.thenCombine(transactionFuture, (basicInfo, transactionRecord) -> {
    // 生成用户报告
    return reportGenerator.generateReport(basicInfo, transactionRecord);
});

try {
    UserReport report = reportFuture.get();
    System.out.println(report);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

性能优化与注意事项

  1. 线程池的使用 在使用 CompletableFuture 时,默认情况下,supplyAsyncrunAsync 方法会使用 ForkJoinPool.commonPool()。对于一些需要大量异步任务的场景,commonPool 可能会出现线程饥饿等性能问题。可以通过创建自定义的 Executor 来解决这个问题。

    ExecutorService executor = Executors.newFixedThreadPool(10);
    CompletableFuture<String> future14 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 14";
    }, executor);
    

    在上述代码中,创建了一个固定大小为 10 的线程池,并将其作为参数传递给 supplyAsync 方法。

  2. 避免不必要的阻塞 虽然 CompletableFuture 提供了 getjoin 等阻塞方法,但在实际应用中,应尽量避免在异步任务链中过早地使用这些方法,以免失去异步编程的优势。可以通过链式调用的方式,让 CompletableFuture 自动处理任务的依赖关系。

    CompletableFuture<String> future15 = CompletableFuture.supplyAsync(() -> "Initial result")
           .thenApply(result -> result + " processed")
           .thenApply(finalResult -> {
                System.out.println(finalResult);
                return finalResult;
            });
    

    在这个例子中,通过链式调用 thenApply 方法,避免了中间使用阻塞方法。

  3. 异常处理的一致性 在处理多个 CompletableFuture 的 AND 组合关系时,异常处理应该保持一致性。无论是单个 CompletableFuture 还是组合后的 CompletableFuture,都应该有合理的异常处理机制,以确保程序的健壮性。

  4. 资源管理 在异步任务中,如果涉及到资源的获取和释放,如数据库连接、文件句柄等,要确保在任务完成后正确地释放资源。可以使用 try - finally 块或者 Java 7 引入的 try - with - resources 语法来管理资源。

    CompletableFuture<String> future16 = CompletableFuture.supplyAsync(() -> {
        Connection connection = null;
        try {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password");
            // 使用连接进行数据库操作
            return "Database operation result";
        } catch (SQLException e) {
            e.printStackTrace();
            return "Error result";
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    });
    

    在上述代码中,通过 try - finally 块确保数据库连接在任务完成后被正确关闭。

通过合理使用 CompletableFuture 的多个任务 AND 组合关系,可以大大提高 Java 程序的异步处理能力和性能,使代码更加简洁和易于维护。在实际应用中,要根据具体的业务场景和性能需求,选择合适的方法和策略来处理异步任务。同时,要注意异常处理、资源管理等方面的问题,以确保程序的健壮性和稳定性。

希望通过本文的介绍,你对 Java 中 CompletableFuture 的多个任务 AND 组合关系有了更深入的理解,并能够在实际项目中灵活运用。