MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步任务结果传递

2023-04-246.6k 阅读

Java 中 CompletableFuture 异步任务结果传递

CompletableFuture 简介

在 Java 并发编程领域,CompletableFuture 是 Java 8 引入的一个强大工具,它提供了一种异步处理任务并获取结果的机制。CompletableFuture 实现了 Future 接口和 CompletionStage 接口,不仅能像传统 Future 那样获取异步任务的结果,还支持更丰富的异步操作组合与结果传递方式,使得异步编程更加灵活和高效。

CompletableFuture 的设计理念是让开发者能够以一种链式调用的方式处理异步任务,这种方式与传统的基于回调或者轮询获取 Future 结果的方式相比,代码结构更加清晰,可读性更强。

创建 CompletableFuture

  1. 使用 CompletableFuture.supplyAsync 创建有返回值的异步任务

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        // 模拟一个耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务执行完成";
    });
    

    在上述代码中,supplyAsync 方法接受一个 Supplier 类型的参数,该参数定义了异步任务的具体逻辑。supplyAsync 方法会在一个默认的 ForkJoinPool.commonPool() 线程池中异步执行任务,并返回一个 CompletableFuture 对象,通过这个对象可以获取任务的执行结果。

  2. 使用 CompletableFuture.runAsync 创建无返回值的异步任务

    CompletableFuture<Void> futureVoid = CompletableFuture.runAsync(() -> {
        // 模拟一个耗时操作
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("无返回值任务执行完成");
    });
    

    runAsync 方法接受一个 Runnable 类型的参数,用于定义异步执行的任务。由于 Runnable 没有返回值,所以 runAsync 返回的 CompletableFuture 的泛型类型为 Void

获取 CompletableFuture 的结果

  1. 使用 get 方法阻塞获取结果

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务执行完成";
    });
    try {
        String result = future.get();
        System.out.println("获取到的结果: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    get 方法会阻塞当前线程,直到 CompletableFuture 完成任务并返回结果。如果任务执行过程中抛出异常,get 方法会将异常包装成 ExecutionException 重新抛出,同时还可能抛出 InterruptedException 表示当前线程在等待过程中被中断。

  2. 使用 get(long timeout, TimeUnit unit) 方法设置超时获取结果

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务执行完成";
    });
    try {
        String result = future.get(2, TimeUnit.SECONDS);
        System.out.println("获取到的结果: " + result);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        if (e instanceof TimeoutException) {
            System.out.println("获取结果超时");
        } else {
            e.printStackTrace();
        }
    }
    

    此方法在指定的时间内等待 CompletableFuture 完成任务并返回结果。如果超过指定时间任务仍未完成,会抛出 TimeoutException

  3. 使用 join 方法阻塞获取结果(不抛出受检异常)

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务执行完成";
    });
    String result = future.join();
    System.out.println("获取到的结果: " + result);
    

    join 方法与 get 方法类似,都会阻塞当前线程直到获取到结果。但不同的是,join 方法不会抛出 InterruptedExceptionExecutionException 这两个受检异常,而是将 ExecutionException 中的原始异常直接抛出,将 InterruptedException 包装成 CompletionException 抛出。

异步任务结果传递

  1. 使用 thenApply 方法对结果进行转换

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "原始结果")
          .thenApply(result -> result + " 转换后");
    try {
        String finalResult = future.get();
        System.out.println(finalResult);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    thenApply 方法接受一个 Function 类型的参数,该函数会在 CompletableFuture 完成任务后被调用,传入任务的结果,并返回一个新的结果。thenApply 方法返回一个新的 CompletableFuture,其结果为函数转换后的结果。

  2. 使用 thenAccept 方法消费结果

    CompletableFuture.supplyAsync(() -> "任务结果")
          .thenAccept(result -> System.out.println("消费结果: " + result));
    // 为了确保主线程不退出,添加如下代码
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    

    thenAccept 方法接受一个 Consumer 类型的参数,该消费者会在 CompletableFuture 完成任务后被调用,传入任务的结果,但不返回新的结果。thenAccept 方法返回的 CompletableFuture 的结果为 Void

  3. 使用 thenRun 方法执行后续无结果任务

    CompletableFuture.supplyAsync(() -> "任务完成")
          .thenRun(() -> System.out.println("后续无结果任务执行"));
    // 为了确保主线程不退出,添加如下代码
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    

    thenRun 方法接受一个 Runnable 类型的参数,该任务会在 CompletableFuture 完成任务后被调用,不接受任务的结果,也不返回新的结果。thenRun 方法返回的 CompletableFuture 的结果为 Void

多个 CompletableFuture 组合与结果传递

  1. 使用 thenCompose 方法组合两个 CompletableFuture

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "第一个任务结果");
    CompletableFuture<String> future2 = future1.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " 组合后"));
    try {
        String finalResult = future2.get();
        System.out.println(finalResult);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    thenCompose 方法接受一个函数,该函数接受前一个 CompletableFuture 的结果,并返回一个新的 CompletableFuture。它会将前一个 CompletableFuture 的结果传递给函数,然后将函数返回的 CompletableFuture 与前一个 CompletableFuture 进行组合,最终返回组合后的 CompletableFuture

  2. 使用 thenCombine 方法合并两个 CompletableFuture 的结果

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1");
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2");
    CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (r1, r2) -> r1 + " 和 " + r2);
    try {
        String result = combinedFuture.get();
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    thenCombine 方法接受另一个 CompletableFuture 和一个 BiFunction。当两个 CompletableFuture 都完成时,BiFunction 会被调用,传入两个 CompletableFuture 的结果,并返回一个新的结果。thenCombine 方法返回一个新的 CompletableFuture,其结果为 BiFunction 合并后的结果。

  3. 使用 allOf 方法等待所有 CompletableFuture 完成

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务1完成";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务2完成";
    });
    CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
    allFuture.join();
    try {
        String result1 = future1.get();
        String result2 = future2.get();
        System.out.println(result1);
        System.out.println(result2);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    allOf 方法接受多个 CompletableFuture 作为参数,返回一个新的 CompletableFuture。这个新的 CompletableFuture 会在所有传入的 CompletableFuture 都完成时完成,其结果为 Void。通过 join 方法等待所有任务完成后,可以分别获取每个 CompletableFuture 的结果。

  4. 使用 anyOf 方法等待任意一个 CompletableFuture 完成

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务1完成";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务2完成";
    });
    CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
    try {
        Object result = anyFuture.get();
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    anyOf 方法接受多个 CompletableFuture 作为参数,返回一个新的 CompletableFuture。这个新的 CompletableFuture 会在任意一个传入的 CompletableFuture 完成时完成,其结果为第一个完成的 CompletableFuture 的结果。

CompletableFuture 的异常处理

  1. 使用 exceptionally 方法处理异常

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("模拟异常");
        }
        return "正常结果";
    }).exceptionally(ex -> {
        System.out.println("捕获到异常: " + ex.getMessage());
        return "异常处理结果";
    });
    try {
        String result = future.get();
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    exceptionally 方法接受一个 Function 类型的参数,该函数会在 CompletableFuture 执行过程中抛出异常时被调用,传入异常对象,并返回一个替代结果。这样可以在不影响其他异步操作的情况下,对异常进行处理并提供一个默认结果。

  2. 使用 handle 方法同时处理正常结果和异常

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("模拟异常");
        }
        return "正常结果";
    }).handle((result, ex) -> {
        if (ex != null) {
            System.out.println("捕获到异常: " + ex.getMessage());
            return "异常处理结果";
        }
        return result + " 处理后";
    });
    try {
        String finalResult = future.get();
        System.out.println(finalResult);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    handle 方法接受一个 BiFunction 类型的参数,无论 CompletableFuture 是正常完成还是抛出异常,该函数都会被调用。函数的第一个参数为正常完成时的结果(如果有异常则为 null),第二个参数为异常对象(如果正常完成则为 null)。通过判断 ex 是否为 null,可以分别处理正常结果和异常情况,并返回最终结果。

CompletableFuture 的执行线程控制

  1. 使用自定义线程池

    ExecutorService executor = Executors.newFixedThreadPool(3);
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务执行完成";
    }, executor);
    try {
        String result = future.get();
        System.out.println("获取到的结果: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        executor.shutdown();
    }
    

    supplyAsync 方法的第二个参数中传入自定义的 Executor,可以指定 CompletableFuture 异步任务在自定义的线程池中执行。这样可以更好地控制线程资源,例如设置线程池的大小、线程的优先级等。

  2. 使用 thenApplyAsync 等方法指定后续任务执行线程

    ExecutorService executor = Executors.newFixedThreadPool(3);
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "原始结果")
          .thenApplyAsync(result -> result + " 转换后", executor);
    try {
        String finalResult = future.get();
        System.out.println(finalResult);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        executor.shutdown();
    }
    

    thenApplyAsync 方法与 thenApply 方法类似,但它接受一个 Executor 参数,可以指定后续转换任务在特定的线程池中执行。同样,thenAcceptAsyncthenRunAsync 等方法也支持这种方式来指定后续任务的执行线程。

CompletableFuture 在实际项目中的应用场景

  1. 高并发数据获取与处理 在大型电商系统中,可能需要同时从多个数据源获取商品信息、库存信息、价格信息等,然后对这些数据进行整合和处理。使用 CompletableFuture 可以并发地发起这些异步请求,并在所有请求完成后统一处理结果,大大提高系统的响应速度。

  2. 异步任务流水线处理 在一些数据处理流程中,可能需要按照一定的顺序依次执行多个异步任务,例如数据采集、数据清洗、数据分析等。CompletableFuture 的链式调用和结果传递机制可以很好地满足这种需求,使得代码结构清晰,易于维护。

  3. 分布式系统中的异步通信 在分布式系统中,不同服务之间的调用可能是异步的。CompletableFuture 可以用于处理这些异步调用的结果,例如在微服务架构中,一个服务调用多个其他服务获取数据,然后对这些数据进行合并和返回。通过 CompletableFuture 可以方便地管理这些异步调用,提高系统的整体性能和可靠性。

通过深入理解和灵活运用 CompletableFuture 的异步任务结果传递机制,开发者能够编写出更高效、更灵活的并发程序,提升系统的性能和响应能力。无论是在小型应用还是大型分布式系统中,CompletableFuture 都为异步编程提供了强大而便捷的工具。在实际开发中,需要根据具体的业务需求和场景,合理选择 CompletableFuture 的各种方法,以实现最优的异步处理逻辑。同时,注意线程资源的管理和异常处理,确保程序的稳定性和可靠性。