MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 创建异步任务 supplyAsync 方法

2022-02-152.9k 阅读

CompletableFuture 概述

在Java的并发编程领域中,CompletableFuture 是一个强大的工具,它为异步编程提供了更为便捷和灵活的方式。CompletableFuture 类实现了 Future 接口和 CompletionStage 接口,不仅能够获取异步操作的结果,还可以对异步操作进行组合、链式调用等操作。这使得在处理复杂的异步任务时,代码的可读性和可维护性都得到了极大的提升。

supplyAsync 方法的基本介绍

CompletableFuture 类提供了 supplyAsync 方法,该方法用于异步执行有返回值的任务。它的定义如下:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

其中,Supplier 是一个函数式接口,它只有一个无参数的 get 方法,该方法会返回一个结果。这个方法会使用 ForkJoinPool.commonPool() 作为默认的线程池来异步执行 Supplier 提供的任务。

还有一个重载版本:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

这个版本允许我们指定一个自定义的 Executor 来执行异步任务,这样我们就可以根据实际需求来配置线程池的参数,例如线程数量、线程优先级等。

简单代码示例

下面通过一个简单的示例来展示 supplyAsync 方法的基本使用:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureSupplyAsyncExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        });

        try {
            // 获取异步任务的结果
            String result = future.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,我们使用 CompletableFuture.supplyAsync 方法创建了一个异步任务,该任务在后台线程中执行一个模拟的耗时操作(这里通过 Thread.sleep 模拟),并返回一个字符串结果。主线程通过 future.get() 方法获取异步任务的执行结果,这是一个阻塞操作,直到异步任务完成并返回结果。

异步任务执行原理

当我们调用 supplyAsync 方法时,实际上是将 Supplier 任务提交到了指定的线程池(如果未指定则是 ForkJoinPool.commonPool())。ForkJoinPool 是Java 7引入的一种特殊线程池,它采用工作窃取算法来提高线程的利用率和性能。

工作窃取算法的基本原理是,每个工作线程都有自己的双端队列来存放任务。当一个线程完成了自己队列中的任务后,它可以从其他线程的队列尾部窃取任务来执行。这样可以避免线程之间的竞争,提高整体的执行效率。

CompletableFuture 中,当 supplyAsync 方法提交任务后,ForkJoinPool 会安排一个线程来执行 Supplierget 方法。当任务执行完成后,CompletableFuture 会将结果保存起来,并通知所有等待结果的线程。

使用自定义线程池

如前文所述,supplyAsync 方法有一个重载版本允许我们指定自定义的 Executor。下面的示例展示了如何使用自定义线程池:

import java.util.concurrent.*;

public class CompletableFutureSupplyAsyncWithCustomExecutor {
    public static void main(String[] args) {
        // 创建自定义线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello from custom executor!";
        }, executor);

        try {
            // 获取异步任务的结果
            String result = future.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            executor.shutdown();
        }
    }
}

在这个示例中,我们通过 Executors.newFixedThreadPool(5) 创建了一个固定大小为5的线程池,并将其作为参数传递给 supplyAsync 方法。这样,异步任务就会在这个自定义的线程池中执行。在程序结束时,我们调用 executor.shutdown() 来关闭线程池,确保所有任务执行完毕后程序正常退出。

异常处理

在异步任务执行过程中,可能会发生各种异常。CompletableFuture 提供了多种方式来处理这些异常。

一种常见的方式是使用 exceptionally 方法,该方法会在异步任务抛出异常时被调用,它接收一个 Function 作为参数,该 Function 会处理异常并返回一个默认值。例如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExceptionHandling {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个会抛出异常的操作
            if (Math.random() < 0.5) {
                throw new RuntimeException("Simulated exception");
            }
            return "Task completed successfully";
        }).exceptionally(ex -> {
            System.out.println("Caught exception: " + ex.getMessage());
            return "Default value";
        });

        try {
            String result = future.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,supplyAsync 方法中的任务有50% 的概率会抛出一个运行时异常。通过 exceptionally 方法,我们捕获到异常并打印异常信息,同时返回一个默认值。这样,无论异步任务是否抛出异常,future.get() 方法都能获取到一个结果。

另一种处理异常的方式是使用 handle 方法,handle 方法会在异步任务完成(无论是正常完成还是异常完成)时被调用,它接收一个 BiFunction 作为参数,该 BiFunction 的第一个参数是异步任务的结果(如果任务正常完成),第二个参数是异常(如果任务抛出异常)。例如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;

public class CompletableFutureHandleExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个会抛出异常的操作
            if (Math.random() < 0.5) {
                throw new RuntimeException("Simulated exception");
            }
            return "Task completed successfully";
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println("Caught exception: " + ex.getMessage());
                return "Default value";
            }
            return result;
        });

        try {
            String finalResult = future.get();
            System.out.println(finalResult);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,handle 方法根据异步任务的执行情况,要么返回正常的结果,要么在发生异常时返回默认值。

与其他异步操作的组合

CompletableFuture 的强大之处不仅在于创建异步任务,还在于能够与其他异步操作进行组合。

例如,我们可以使用 thenApply 方法对异步任务的结果进行转换。thenApply 方法接收一个 Function 作为参数,该 Function 会对异步任务的结果进行处理并返回一个新的结果。示例如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureThenApplyExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
               .thenApply(s -> s + ", World!");

        try {
            String result = future.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,supplyAsync 方法返回一个字符串 "Hello",然后通过 thenApply 方法将其转换为 "Hello, World!"。

我们还可以使用 thenCombine 方法将两个异步任务的结果进行合并。thenCombine 方法接收另一个 CompletableFuture 和一个 BiFunction 作为参数,BiFunction 会将两个异步任务的结果进行合并并返回一个新的结果。示例如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureThenCombineExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + ", " + s2 + "!");

        try {
            String result = combinedFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,future1future2 是两个独立的异步任务,thenCombine 方法将它们的结果合并为 "Hello, World!"。

并发控制

在使用 CompletableFuture 进行异步编程时,有时需要对并发任务进行控制。例如,我们可能希望所有的异步任务都完成后再进行下一步操作,或者只要有一个异步任务完成就进行相应处理。

CompletableFuture 提供了 allOf 方法来等待所有的异步任务完成。allOf 方法接收多个 CompletableFuture 作为参数,并返回一个新的 CompletableFuture,只有当所有传入的 CompletableFuture 都完成时,这个新的 CompletableFuture 才会完成。示例如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAllOfExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 1 completed";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 2 completed";
        });

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

        allFutures.join();

        try {
            System.out.println(future1.get());
            System.out.println(future2.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,future1future2 是两个异步任务,CompletableFuture.allOf 方法返回一个新的 CompletableFuture,我们调用 join 方法等待所有任务完成。然后再获取每个任务的结果。

CompletableFuture 还提供了 anyOf 方法,只要有一个异步任务完成,这个方法返回的 CompletableFuture 就会完成。示例如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureAnyOfExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 1 completed";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 2 completed";
        });

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);

        try {
            Object result = anyFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,future2 会先完成,anyOf 方法返回的 CompletableFuture 会在 future2 完成时就完成,我们通过 get 方法获取到 future2 的结果。

性能考量

在使用 CompletableFuture.supplyAsync 方法时,性能是一个需要考虑的重要因素。

首先,线程池的选择和配置对性能有很大影响。如果使用默认的 ForkJoinPool.commonPool(),它是一个共享的线程池,可能会受到其他任务的影响。在高并发场景下,自定义线程池并合理配置线程数量、队列大小等参数可以提高性能。例如,对于 CPU 密集型任务,线程数量可以设置为 CPU 核心数;对于 I/O 密集型任务,线程数量可以适当增加以充分利用 I/O 等待时间。

其次,异步任务的粒度也会影响性能。如果异步任务过于细碎,线程的创建、调度和销毁开销可能会占据较大比例,导致性能下降。因此,需要根据实际业务需求合理划分异步任务的粒度。

另外,在处理大量异步任务时,需要注意内存的使用。CompletableFuture 会保存任务的结果和状态等信息,如果任务数量过多,可能会导致内存占用过高。可以通过及时处理完成的任务、释放不再需要的资源等方式来优化内存使用。

应用场景

CompletableFuture.supplyAsync 方法在实际开发中有广泛的应用场景。

在 Web 开发中,当需要调用多个外部服务获取数据并进行合并处理时,可以使用 CompletableFuture 来并发调用这些服务,提高响应速度。例如,一个电商应用可能需要从库存服务、价格服务和评论服务获取数据,然后合并展示给用户,通过 CompletableFuture 可以同时发起这三个服务的调用,而不是顺序调用,大大缩短了整体的响应时间。

在数据处理领域,当需要对大量数据进行并行计算时,CompletableFuture 可以将数据分块并异步处理,最后合并结果。比如,对一个大数据集进行统计分析,每个数据块的计算可以作为一个异步任务,通过 CompletableFuture 来管理这些任务,提高计算效率。

在分布式系统中,CompletableFuture 可以用于处理分布式任务的结果。例如,一个分布式爬虫系统,各个节点爬取数据后,主节点可以使用 CompletableFuture 来收集和处理这些节点返回的数据。

与其他异步框架的比较

与其他异步框架相比,CompletableFuture 有其独特的优势。

与传统的 Future 相比,CompletableFuture 提供了更丰富的功能。Future 只能获取异步任务的结果,并且获取结果的操作是阻塞的。而 CompletableFuture 不仅可以获取结果,还支持异步任务的组合、链式调用、异常处理等功能,使得异步编程更加灵活和便捷。

与 Guava 的 ListenableFuture 相比,CompletableFuture 是 Java 标准库的一部分,不需要额外引入依赖。ListenableFuture 提供了异步回调的功能,但在功能的丰富性和易用性上,CompletableFuture 更胜一筹。例如,CompletableFuture 可以更方便地进行异步任务的合并、转换等操作。

与 RxJava 相比,CompletableFuture 相对来说更轻量级,学习成本较低。RxJava 提供了强大的响应式编程模型,适用于处理复杂的异步数据流。但对于一些简单的异步任务场景,CompletableFuture 已经能够满足需求,并且其与 Java 标准库的集成度更高。

注意事项

在使用 CompletableFuture.supplyAsync 方法时,有一些注意事项需要关注。

首先,要注意线程安全问题。虽然 CompletableFuture 本身是线程安全的,但异步任务中访问的共享资源需要进行适当的同步处理,以避免数据竞争和不一致的问题。

其次,避免在异步任务中进行长时间的阻塞操作。如果异步任务中包含长时间的 I/O 操作或其他阻塞操作,可能会导致线程池中的线程被长时间占用,影响整体的并发性能。可以考虑使用异步 I/O 等技术来优化。

另外,在使用自定义线程池时,要注意合理配置线程池的参数。如果线程池大小设置过小,可能会导致任务堆积,响应时间变长;如果设置过大,可能会消耗过多的系统资源,导致系统性能下降。

最后,要注意异常处理的完整性。在异步任务中抛出的异常如果没有被正确处理,可能会导致程序出现难以调试的问题。确保在合适的位置使用 exceptionallyhandle 等方法来处理异常。

总结

CompletableFuture.supplyAsync 方法是 Java 异步编程中的一个强大工具,它为我们提供了简洁、灵活的方式来创建和管理有返回值的异步任务。通过合理使用 supplyAsync 方法,结合异常处理、任务组合、并发控制等功能,我们可以编写出高效、健壮的异步程序。在实际开发中,根据不同的应用场景,合理选择线程池、优化任务粒度、处理好异常等,可以充分发挥 CompletableFuture 的优势,提升系统的性能和响应能力。同时,与其他异步框架相比,CompletableFuture 作为 Java 标准库的一部分,具有更好的集成性和较低的学习成本,是处理异步任务的首选工具之一。