Java CompletableFuture supplyAsync异步任务创建技巧
Java CompletableFuture supplyAsync 异步任务创建技巧
理解 CompletableFuture
在 Java 并发编程领域,CompletableFuture
是 Java 8 引入的一个强大工具,它极大地简化了异步编程。CompletableFuture
代表一个异步计算的结果,它不仅可以手动完成(设置计算结果),还支持一系列函数式编程风格的组合操作,比如 thenApply
、thenAccept
、thenRun
等,这些操作可以方便地将多个异步任务串联起来,形成复杂的异步计算流程。
supplyAsync 方法基础
CompletableFuture
的 supplyAsync
方法用于异步执行有返回值的任务。它有两个重载形式:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
第一个重载形式使用默认的 ForkJoinPool.commonPool()
作为执行任务的线程池。第二个重载形式允许我们指定一个自定义的 Executor
来执行任务。Supplier
是一个函数式接口,它只有一个 get
方法,该方法不接受参数并返回一个结果,这正好适合用来定义异步执行的有返回值的任务。
简单示例
下面是一个简单的使用 supplyAsync
创建异步任务的示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务执行完成";
});
// 获取异步任务的结果,如果任务还未完成,get 方法会阻塞
String result = future.get();
System.out.println(result);
}
}
在这个示例中,supplyAsync
方法接受一个 Supplier
,在这个 Supplier
的 get
方法中模拟了一个耗时 2 秒的操作,然后返回一个字符串。future.get()
方法用于获取异步任务的结果,如果任务尚未完成,它会阻塞当前线程直到任务完成。
自定义线程池
使用默认的 ForkJoinPool.commonPool()
可能在某些情况下不能满足需求,比如需要控制线程池的大小、线程的优先级等。这时我们可以使用第二个重载形式,传入一个自定义的 Executor
。
import java.util.concurrent.*;
public class CustomExecutorExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务执行完成";
}, executor);
String result = future.get();
System.out.println(result);
// 关闭线程池
executor.shutdown();
}
}
在这个示例中,我们创建了一个固定大小为 5 的线程池 executor
,并将其作为参数传递给 supplyAsync
方法。这样异步任务就会在我们自定义的线程池中执行。任务完成后,我们调用 executor.shutdown()
方法关闭线程池。
链式调用与组合操作
CompletableFuture
的强大之处在于它支持链式调用和各种组合操作。当 supplyAsync
创建的异步任务完成后,我们可以使用 thenApply
、thenAccept
、thenRun
等方法进行后续处理。
thenApply
:接受一个Function
,将异步任务的结果作为输入,返回一个新的CompletableFuture
,新的CompletableFuture
的结果是Function
应用后的返回值。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenApplyExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "初始结果")
.thenApply(result -> result + " 经过处理");
String finalResult = future.get();
System.out.println(finalResult);
}
}
在这个示例中,supplyAsync
创建的异步任务返回字符串 “初始结果”,然后 thenApply
方法接受这个结果,并在其基础上进行处理,返回新的字符串 “初始结果 经过处理”。
thenAccept
:接受一个Consumer
,将异步任务的结果作为输入,但是不返回新的CompletableFuture
,它主要用于执行一些不需要返回值的操作,比如打印结果。
import java.util.concurrent.CompletableFuture;
public class ThenAcceptExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "任务结果")
.thenAccept(result -> System.out.println("处理结果: " + result));
}
}
在这个示例中,supplyAsync
创建的异步任务返回字符串 “任务结果”,thenAccept
方法接受这个结果并打印出来。
thenRun
:接受一个Runnable
,不接受异步任务的结果作为输入,也不返回新的CompletableFuture
,它主要用于在异步任务完成后执行一些与结果无关的操作。
import java.util.concurrent.CompletableFuture;
public class ThenRunExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "任务结果")
.thenRun(() -> System.out.println("任务已完成,执行一些收尾操作"));
}
}
在这个示例中,supplyAsync
创建的异步任务返回字符串 “任务结果”,thenRun
方法在任务完成后执行打印操作,它不关心任务的具体结果。
异常处理
在异步任务执行过程中,可能会出现异常。CompletableFuture
提供了多种方法来处理异常。
exceptionally
:当异步任务抛出异常时,exceptionally
方法会捕获这个异常,并返回一个替代结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ExceptionallyExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("任务执行出错");
}
return "任务执行成功";
})
.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "默认结果";
});
String result = future.get();
System.out.println(result);
}
}
在这个示例中,supplyAsync
中的任务有 50% 的概率抛出异常。如果抛出异常,exceptionally
方法会捕获异常并返回 “默认结果”,同时打印异常信息。
handle
:handle
方法既可以处理正常的结果,也可以处理异常情况。它接受一个BiFunction
,第一个参数是任务的结果(如果任务正常完成),第二个参数是异常(如果任务抛出异常)。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class HandleExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("任务执行出错");
}
return "任务执行成功";
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println("捕获到异常: " + ex.getMessage());
return "默认结果";
}
return result + " 处理后";
});
String finalResult = future.get();
System.out.println(finalResult);
}
}
在这个示例中,如果任务正常完成,handle
方法会在结果后加上 “ 处理后”;如果任务抛出异常,handle
方法会捕获异常并返回 “默认结果”,同时打印异常信息。
多个异步任务的组合
在实际应用中,经常需要组合多个异步任务。CompletableFuture
提供了一些方法来实现这一点。
thenCombine
:将两个CompletableFuture
的结果合并,返回一个新的CompletableFuture
。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenCombineExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (r1, r2) -> r1 + " 和 " + r2);
String result = combinedFuture.get();
System.out.println(result);
}
}
在这个示例中,future1
和 future2
是两个异步任务,thenCombine
方法将它们的结果合并,返回一个新的 CompletableFuture
,其结果是 “结果1 和 结果2”。
allOf
:等待所有的CompletableFuture
都完成。allOf
方法返回一个CompletableFuture<Void>
,当所有输入的CompletableFuture
都完成时,这个CompletableFuture<Void>
才会完成。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AllOfExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务1完成";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务2完成";
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
allFuture.get(); // 等待所有任务完成
System.out.println(future1.get() + ", " + future2.get());
}
}
在这个示例中,future1
和 future2
是两个异步任务,allOf
方法返回一个 CompletableFuture<Void>
,我们调用 allFuture.get()
方法等待所有任务完成,然后再获取 future1
和 future2
的结果并打印。
anyOf
:只要有一个CompletableFuture
完成,anyOf
方法返回的CompletableFuture
就会完成,其结果是第一个完成的CompletableFuture
的结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AnyOfExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务1完成";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务2完成";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
Object result = anyFuture.get();
System.out.println(result);
}
}
在这个示例中,future2
会先完成,所以 anyFuture.get()
获取到的结果是 “任务2完成”。
与传统异步编程方式的对比
在 Java 8 引入 CompletableFuture
之前,实现异步编程主要依赖 Future
和 Callable
接口。Future
接口提供了一种获取异步任务结果的方式,但是它有一些局限性。例如,Future
只能通过 get
方法阻塞获取结果,无法方便地进行链式调用和组合操作。而 CompletableFuture
不仅继承了 Future
接口,还提供了丰富的函数式编程风格的方法,使得异步编程更加简洁和灵活。
下面是一个使用 Future
和 Callable
的示例:
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
Callable<String> task = () -> {
Thread.sleep(2000);
return "任务执行完成";
};
Future<String> future = executor.submit(task);
String result = future.get();
System.out.println(result);
executor.shutdown();
}
}
对比这个示例和前面使用 CompletableFuture
的示例,可以明显看出 CompletableFuture
在异步任务的创建、组合和结果处理上更加简洁和直观。
实际应用场景
- 网络请求:在进行多个网络请求时,可以使用
CompletableFuture
异步发起请求,并在所有请求完成后进行统一处理。例如,一个电商应用可能需要同时从多个供应商获取商品价格,然后比较价格并选择最优的供应商。 - 数据处理流水线:在大数据处理场景中,数据可能需要经过多个处理步骤,每个步骤可以作为一个异步任务,使用
CompletableFuture
可以方便地构建数据处理流水线,提高处理效率。 - 并行计算:对于一些可以并行执行的计算任务,
CompletableFuture
可以充分利用多核 CPU 的优势,通过创建多个异步任务并行执行,然后合并结果,加快计算速度。
性能考虑
虽然 CompletableFuture
为异步编程带来了很多便利,但在使用时也需要考虑性能问题。
- 线程池的选择:选择合适的线程池非常重要。如果使用默认的
ForkJoinPool.commonPool()
,在高并发场景下可能会出现线程饥饿等问题。对于 I/O 密集型任务,可以适当增加线程池的大小;对于 CPU 密集型任务,线程池大小应该根据 CPU 核心数进行合理设置。 - 任务粒度:任务的粒度不宜过大或过小。如果任务粒度太大,可能无法充分利用多核优势;如果任务粒度太小,线程的创建和切换开销可能会影响性能。需要根据具体业务场景进行权衡。
- 异常处理开销:过多的异常处理可能会带来一定的性能开销,尤其是在高并发场景下。因此,在编写异步任务时,应该尽量避免不必要的异常抛出,提高代码的健壮性。
总结常用技巧
- 合理选择线程池:根据任务类型(I/O 密集型或 CPU 密集型)和并发量,选择合适的线程池。对于简单应用,默认的
ForkJoinPool.commonPool()
可能够用,但对于复杂的高并发场景,自定义线程池可以更好地控制资源。 - 善用链式调用和组合操作:通过
thenApply
、thenAccept
、thenRun
等方法进行链式调用,以及使用thenCombine
、allOf
、anyOf
等方法组合多个异步任务,可以构建出复杂而高效的异步计算流程。 - 正确处理异常:使用
exceptionally
或handle
方法来处理异步任务中的异常,确保程序的稳定性和健壮性。 - 优化任务粒度:根据实际业务场景,合理划分任务粒度,避免过大或过小的任务粒度对性能产生不利影响。
通过深入理解和掌握 CompletableFuture
的 supplyAsync
方法以及相关的异步任务创建技巧,可以大大提高 Java 程序的异步编程能力,使程序在处理并发任务时更加高效和灵活。在实际应用中,需要根据具体的业务需求和性能要求,合理运用这些技巧,以达到最佳的编程效果。