Java 中 CompletableFuture 创建异步任务 supplyAsync 方法
CompletableFuture 概述
在Java的并发编程领域中,CompletableFuture
是一个强大的工具,它为异步编程提供了更为便捷和灵活的方式。CompletableFuture
类实现了 Future
接口和 CompletionStage
接口,不仅能够获取异步操作的结果,还可以对异步操作进行组合、链式调用等操作。这使得在处理复杂的异步任务时,代码的可读性和可维护性都得到了极大的提升。
supplyAsync 方法的基本介绍
CompletableFuture
类提供了 supplyAsync
方法,该方法用于异步执行有返回值的任务。它的定义如下:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
其中,Supplier
是一个函数式接口,它只有一个无参数的 get
方法,该方法会返回一个结果。这个方法会使用 ForkJoinPool.commonPool()
作为默认的线程池来异步执行 Supplier
提供的任务。
还有一个重载版本:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
这个版本允许我们指定一个自定义的 Executor
来执行异步任务,这样我们就可以根据实际需求来配置线程池的参数,例如线程数量、线程优先级等。
简单代码示例
下面通过一个简单的示例来展示 supplyAsync
方法的基本使用:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureSupplyAsyncExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, CompletableFuture!";
});
try {
// 获取异步任务的结果
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在上述代码中,我们使用 CompletableFuture.supplyAsync
方法创建了一个异步任务,该任务在后台线程中执行一个模拟的耗时操作(这里通过 Thread.sleep
模拟),并返回一个字符串结果。主线程通过 future.get()
方法获取异步任务的执行结果,这是一个阻塞操作,直到异步任务完成并返回结果。
异步任务执行原理
当我们调用 supplyAsync
方法时,实际上是将 Supplier
任务提交到了指定的线程池(如果未指定则是 ForkJoinPool.commonPool()
)。ForkJoinPool
是Java 7引入的一种特殊线程池,它采用工作窃取算法来提高线程的利用率和性能。
工作窃取算法的基本原理是,每个工作线程都有自己的双端队列来存放任务。当一个线程完成了自己队列中的任务后,它可以从其他线程的队列尾部窃取任务来执行。这样可以避免线程之间的竞争,提高整体的执行效率。
在 CompletableFuture
中,当 supplyAsync
方法提交任务后,ForkJoinPool
会安排一个线程来执行 Supplier
的 get
方法。当任务执行完成后,CompletableFuture
会将结果保存起来,并通知所有等待结果的线程。
使用自定义线程池
如前文所述,supplyAsync
方法有一个重载版本允许我们指定自定义的 Executor
。下面的示例展示了如何使用自定义线程池:
import java.util.concurrent.*;
public class CompletableFutureSupplyAsyncWithCustomExecutor {
public static void main(String[] args) {
// 创建自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello from custom executor!";
}, executor);
try {
// 获取异步任务的结果
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
// 关闭线程池
executor.shutdown();
}
}
}
在这个示例中,我们通过 Executors.newFixedThreadPool(5)
创建了一个固定大小为5的线程池,并将其作为参数传递给 supplyAsync
方法。这样,异步任务就会在这个自定义的线程池中执行。在程序结束时,我们调用 executor.shutdown()
来关闭线程池,确保所有任务执行完毕后程序正常退出。
异常处理
在异步任务执行过程中,可能会发生各种异常。CompletableFuture
提供了多种方式来处理这些异常。
一种常见的方式是使用 exceptionally
方法,该方法会在异步任务抛出异常时被调用,它接收一个 Function
作为参数,该 Function
会处理异常并返回一个默认值。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExceptionHandling {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个会抛出异常的操作
if (Math.random() < 0.5) {
throw new RuntimeException("Simulated exception");
}
return "Task completed successfully";
}).exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return "Default value";
});
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在上述代码中,supplyAsync
方法中的任务有50% 的概率会抛出一个运行时异常。通过 exceptionally
方法,我们捕获到异常并打印异常信息,同时返回一个默认值。这样,无论异步任务是否抛出异常,future.get()
方法都能获取到一个结果。
另一种处理异常的方式是使用 handle
方法,handle
方法会在异步任务完成(无论是正常完成还是异常完成)时被调用,它接收一个 BiFunction
作为参数,该 BiFunction
的第一个参数是异步任务的结果(如果任务正常完成),第二个参数是异常(如果任务抛出异常)。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
public class CompletableFutureHandleExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个会抛出异常的操作
if (Math.random() < 0.5) {
throw new RuntimeException("Simulated exception");
}
return "Task completed successfully";
}).handle((result, ex) -> {
if (ex != null) {
System.out.println("Caught exception: " + ex.getMessage());
return "Default value";
}
return result;
});
try {
String finalResult = future.get();
System.out.println(finalResult);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在这个示例中,handle
方法根据异步任务的执行情况,要么返回正常的结果,要么在发生异常时返回默认值。
与其他异步操作的组合
CompletableFuture
的强大之处不仅在于创建异步任务,还在于能够与其他异步操作进行组合。
例如,我们可以使用 thenApply
方法对异步任务的结果进行转换。thenApply
方法接收一个 Function
作为参数,该 Function
会对异步任务的结果进行处理并返回一个新的结果。示例如下:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureThenApplyExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + ", World!");
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在上述代码中,supplyAsync
方法返回一个字符串 "Hello",然后通过 thenApply
方法将其转换为 "Hello, World!"。
我们还可以使用 thenCombine
方法将两个异步任务的结果进行合并。thenCombine
方法接收另一个 CompletableFuture
和一个 BiFunction
作为参数,BiFunction
会将两个异步任务的结果进行合并并返回一个新的结果。示例如下:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureThenCombineExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + ", " + s2 + "!");
try {
String result = combinedFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在这个示例中,future1
和 future2
是两个独立的异步任务,thenCombine
方法将它们的结果合并为 "Hello, World!"。
并发控制
在使用 CompletableFuture
进行异步编程时,有时需要对并发任务进行控制。例如,我们可能希望所有的异步任务都完成后再进行下一步操作,或者只要有一个异步任务完成就进行相应处理。
CompletableFuture
提供了 allOf
方法来等待所有的异步任务完成。allOf
方法接收多个 CompletableFuture
作为参数,并返回一个新的 CompletableFuture
,只有当所有传入的 CompletableFuture
都完成时,这个新的 CompletableFuture
才会完成。示例如下:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAllOfExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 1 completed";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 2 completed";
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
allFutures.join();
try {
System.out.println(future1.get());
System.out.println(future2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在上述代码中,future1
和 future2
是两个异步任务,CompletableFuture.allOf
方法返回一个新的 CompletableFuture
,我们调用 join
方法等待所有任务完成。然后再获取每个任务的结果。
CompletableFuture
还提供了 anyOf
方法,只要有一个异步任务完成,这个方法返回的 CompletableFuture
就会完成。示例如下:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAnyOfExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 1 completed";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task 2 completed";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
try {
Object result = anyFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在这个示例中,future2
会先完成,anyOf
方法返回的 CompletableFuture
会在 future2
完成时就完成,我们通过 get
方法获取到 future2
的结果。
性能考量
在使用 CompletableFuture.supplyAsync
方法时,性能是一个需要考虑的重要因素。
首先,线程池的选择和配置对性能有很大影响。如果使用默认的 ForkJoinPool.commonPool()
,它是一个共享的线程池,可能会受到其他任务的影响。在高并发场景下,自定义线程池并合理配置线程数量、队列大小等参数可以提高性能。例如,对于 CPU 密集型任务,线程数量可以设置为 CPU 核心数;对于 I/O 密集型任务,线程数量可以适当增加以充分利用 I/O 等待时间。
其次,异步任务的粒度也会影响性能。如果异步任务过于细碎,线程的创建、调度和销毁开销可能会占据较大比例,导致性能下降。因此,需要根据实际业务需求合理划分异步任务的粒度。
另外,在处理大量异步任务时,需要注意内存的使用。CompletableFuture
会保存任务的结果和状态等信息,如果任务数量过多,可能会导致内存占用过高。可以通过及时处理完成的任务、释放不再需要的资源等方式来优化内存使用。
应用场景
CompletableFuture.supplyAsync
方法在实际开发中有广泛的应用场景。
在 Web 开发中,当需要调用多个外部服务获取数据并进行合并处理时,可以使用 CompletableFuture
来并发调用这些服务,提高响应速度。例如,一个电商应用可能需要从库存服务、价格服务和评论服务获取数据,然后合并展示给用户,通过 CompletableFuture
可以同时发起这三个服务的调用,而不是顺序调用,大大缩短了整体的响应时间。
在数据处理领域,当需要对大量数据进行并行计算时,CompletableFuture
可以将数据分块并异步处理,最后合并结果。比如,对一个大数据集进行统计分析,每个数据块的计算可以作为一个异步任务,通过 CompletableFuture
来管理这些任务,提高计算效率。
在分布式系统中,CompletableFuture
可以用于处理分布式任务的结果。例如,一个分布式爬虫系统,各个节点爬取数据后,主节点可以使用 CompletableFuture
来收集和处理这些节点返回的数据。
与其他异步框架的比较
与其他异步框架相比,CompletableFuture
有其独特的优势。
与传统的 Future
相比,CompletableFuture
提供了更丰富的功能。Future
只能获取异步任务的结果,并且获取结果的操作是阻塞的。而 CompletableFuture
不仅可以获取结果,还支持异步任务的组合、链式调用、异常处理等功能,使得异步编程更加灵活和便捷。
与 Guava 的 ListenableFuture
相比,CompletableFuture
是 Java 标准库的一部分,不需要额外引入依赖。ListenableFuture
提供了异步回调的功能,但在功能的丰富性和易用性上,CompletableFuture
更胜一筹。例如,CompletableFuture
可以更方便地进行异步任务的合并、转换等操作。
与 RxJava 相比,CompletableFuture
相对来说更轻量级,学习成本较低。RxJava 提供了强大的响应式编程模型,适用于处理复杂的异步数据流。但对于一些简单的异步任务场景,CompletableFuture
已经能够满足需求,并且其与 Java 标准库的集成度更高。
注意事项
在使用 CompletableFuture.supplyAsync
方法时,有一些注意事项需要关注。
首先,要注意线程安全问题。虽然 CompletableFuture
本身是线程安全的,但异步任务中访问的共享资源需要进行适当的同步处理,以避免数据竞争和不一致的问题。
其次,避免在异步任务中进行长时间的阻塞操作。如果异步任务中包含长时间的 I/O 操作或其他阻塞操作,可能会导致线程池中的线程被长时间占用,影响整体的并发性能。可以考虑使用异步 I/O 等技术来优化。
另外,在使用自定义线程池时,要注意合理配置线程池的参数。如果线程池大小设置过小,可能会导致任务堆积,响应时间变长;如果设置过大,可能会消耗过多的系统资源,导致系统性能下降。
最后,要注意异常处理的完整性。在异步任务中抛出的异常如果没有被正确处理,可能会导致程序出现难以调试的问题。确保在合适的位置使用 exceptionally
、handle
等方法来处理异常。
总结
CompletableFuture.supplyAsync
方法是 Java 异步编程中的一个强大工具,它为我们提供了简洁、灵活的方式来创建和管理有返回值的异步任务。通过合理使用 supplyAsync
方法,结合异常处理、任务组合、并发控制等功能,我们可以编写出高效、健壮的异步程序。在实际开发中,根据不同的应用场景,合理选择线程池、优化任务粒度、处理好异常等,可以充分发挥 CompletableFuture
的优势,提升系统的性能和响应能力。同时,与其他异步框架相比,CompletableFuture
作为 Java 标准库的一部分,具有更好的集成性和较低的学习成本,是处理异步任务的首选工具之一。