Java CompletableFuture并行处理应对高并发场景的方案
Java CompletableFuture并行处理应对高并发场景的方案
高并发场景下的挑战
在当今的互联网应用开发中,高并发场景随处可见。例如电商平台的抢购活动、在线支付系统、大型网站的流量高峰时段等。在这些场景下,传统的顺序处理方式会遇到严重的性能瓶颈。
假设一个简单的业务场景,我们需要从多个不同的数据源获取数据,然后进行整合处理。如果按照顺序依次从每个数据源获取数据,每个数据源的获取操作可能都需要一定的时间,在高并发环境下,大量用户同时请求,这种顺序处理方式会导致响应时间过长,用户体验变差,甚至可能使服务器因长时间处理请求而不堪重负。
CompletableFuture简介
CompletableFuture
是Java 8引入的一个强大的类,它实现了 Future
和 CompletionStage
接口。Future
接口是Java早期用于异步计算的工具,它允许我们异步执行任务并获取任务的结果,但存在一些局限性,比如无法直接处理异步任务的完成事件,获取结果时需要阻塞等待。
而 CompletableFuture
弥补了这些不足,它支持链式调用、组合多个异步任务、处理异步任务的完成事件等。通过 CompletableFuture
,我们可以更灵活、高效地处理异步计算,特别适合应对高并发场景。
CompletableFuture基础使用
- 创建CompletableFuture
- 使用
supplyAsync
创建有返回值的CompletableFuture
- 使用
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, CompletableFuture!";
});
String result = future.get();
System.out.println(result);
}
}
在上述代码中,CompletableFuture.supplyAsync
方法接受一个 Supplier
作为参数,在一个新的线程中异步执行该 Supplier
的 get
方法,并返回一个 CompletableFuture
。future.get()
方法会阻塞当前线程,直到异步任务完成并返回结果。
- **使用 `runAsync` 创建无返回值的CompletableFuture**
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 模拟一个耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task completed without return value.");
});
future.get();
}
}
CompletableFuture.runAsync
方法接受一个 Runnable
作为参数,在新线程中异步执行该 Runnable
,返回的 CompletableFuture
没有返回值(Void
)。
- 获取CompletableFuture的结果
get
方法:如上述例子所示,get
方法会阻塞当前线程,直到CompletableFuture
完成并返回结果。如果异步任务抛出异常,get
方法会将异常包装成ExecutionException
或InterruptedException
抛出。getNow(T valueIfAbsent)
方法:该方法不会阻塞,如果CompletableFuture
已经完成,返回其结果;否则返回传入的valueIfAbsent
参数。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureGetNowExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result";
});
String result1 = future.getNow("Default");
System.out.println("Result1: " + result1);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result2 = future.getNow("Default");
System.out.println("Result2: " + result2);
}
}
在这个例子中,第一次调用 getNow
时,异步任务还未完成,所以返回 Default
;第二次调用时,异步任务已完成,返回实际的结果 Result
。
- **`join` 方法**:与 `get` 方法类似,但如果异步任务抛出异常,`join` 方法会直接抛出原始异常,而不是包装异常。
CompletableFuture链式调用
thenApply
方法:用于对CompletableFuture
的结果进行转换。它接受一个Function
作为参数,该Function
会在异步任务完成后,对任务的结果进行处理,并返回一个新的CompletableFuture
。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureThenApplyExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + ", World")
.thenApply(String::toUpperCase)
.thenAccept(System.out::println);
}
}
在这个例子中,supplyAsync
方法创建了一个异步任务,返回字符串 Hello
。然后通过 thenApply
方法依次对结果进行追加字符串 , World
和转换为大写的操作,最后通过 thenAccept
方法打印最终结果。
thenAccept
方法:用于在CompletableFuture
完成后执行一个Consumer
,但不返回新的CompletableFuture
。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureThenAcceptExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(s -> System.out.println(s + ", World"));
}
}
这里 thenAccept
方法接受一个 Consumer
,在异步任务返回 Hello
后,将其与 , World
拼接并打印。
thenRun
方法:与thenAccept
类似,但thenRun
接受的是一个Runnable
,不处理CompletableFuture
的结果,只是在任务完成后执行Runnable
。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureThenRunExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("Task completed"));
}
}
在这个例子中,异步任务返回 Hello
后,thenRun
执行打印 Task completed
的操作。
CompletableFuture处理异常
exceptionally
方法:用于处理CompletableFuture
执行过程中抛出的异常。它接受一个Function
作为参数,当异步任务抛出异常时,该Function
会被调用,参数为异常对象,返回值作为新的CompletableFuture
的结果。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExceptionallyExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Simulated exception");
}
return "Success";
})
.exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return "Default value";
})
.thenAccept(System.out::println);
}
}
在上述代码中,异步任务有一定概率抛出异常,exceptionally
方法捕获到异常后,打印异常信息并返回默认值 Default value
。
handle
方法:既能处理正常的结果,也能处理异常。它接受一个BiFunction
作为参数,第一个参数是正常的结果(如果有异常则为null
),第二个参数是异常对象(如果任务正常完成则为null
),返回值作为新的CompletableFuture
的结果。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureHandleExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Simulated exception");
}
return "Success";
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println("Caught exception: " + ex.getMessage());
return "Default value";
}
return result + " processed";
})
.thenAccept(System.out::println);
}
}
这里 handle
方法根据任务是否成功,对结果进行不同的处理,成功时对结果进行追加字符串处理,失败时返回默认值并打印异常信息。
CompletableFuture组合多个异步任务
thenCombine
方法:用于组合两个CompletableFuture
的结果。它接受另一个CompletableFuture
和一个BiFunction
作为参数,当两个CompletableFuture
都完成时,BiFunction
会被调用,参数为两个CompletableFuture
的结果,返回值作为新的CompletableFuture
的结果。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureThenCombineExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
future1.thenCombine(future2, (s1, s2) -> s1 + ", " + s2)
.thenAccept(System.out::println);
}
}
在这个例子中,future1
和 future2
异步执行,当它们都完成后,thenCombine
方法将两个结果拼接并打印。
allOf
方法:用于等待所有CompletableFuture
都完成。它接受多个CompletableFuture
作为参数,返回一个新的CompletableFuture<Void>
,当所有传入的CompletableFuture
都完成时,这个新的CompletableFuture
才完成。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureAllOfExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result2";
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
allFuture.join();
try {
System.out.println("Future1 result: " + future1.get());
System.out.println("Future2 result: " + future2.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
在这个例子中,allOf
方法等待 future1
和 future2
都完成,然后通过 join
方法等待 allFuture
完成,最后获取并打印 future1
和 future2
的结果。
anyOf
方法:只要有一个CompletableFuture
完成,就返回这个完成的CompletableFuture
的结果。它接受多个CompletableFuture
作为参数,返回一个CompletableFuture<Object>
,其结果是第一个完成的CompletableFuture
的结果。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureAnyOfExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result2";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
try {
System.out.println("First completed result: " + anyFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
在这个例子中,future2
会先完成,anyOf
方法返回的 anyFuture
的结果就是 future2
的结果 Result2
。
CompletableFuture在高并发场景中的应用方案
- 并行获取多个数据源的数据 假设我们有三个数据源,分别获取用户信息、订单信息和商品信息,然后整合这些信息返回给用户。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
class DataSource {
public static String getUserInfo() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "User information";
}
public static String getOrderInfo() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Order information";
}
public static String getProductInfo() {
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Product information";
}
}
public class HighConcurrencyDataFetching {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(DataSource::getUserInfo);
CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(DataSource::getOrderInfo);
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(DataSource::getProductInfo);
CompletableFuture<Void> allFuture = CompletableFuture.allOf(userFuture, orderFuture, productFuture);
allFuture.join();
String userInfo = userFuture.get();
String orderInfo = orderFuture.get();
String productInfo = productFuture.get();
String combinedInfo = userInfo + ", " + orderInfo + ", " + productInfo;
System.out.println(combinedInfo);
}
}
在这个例子中,通过 CompletableFuture.supplyAsync
方法并行地从三个数据源获取数据,allOf
方法等待所有数据获取完成,然后将结果整合并打印。这样可以大大减少总响应时间,提高系统在高并发场景下的性能。
- 异步处理任务队列
在高并发场景下,可能会有大量的任务需要处理,我们可以将这些任务放入队列,然后使用
CompletableFuture
异步处理。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
public class TaskQueueProcessing {
public static void main(String[] args) {
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
// 模拟添加任务到队列
for (int i = 0; i < 10; i++) {
int taskNumber = i;
taskQueue.add(() -> {
System.out.println("Processing task " + taskNumber);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 异步处理任务队列
for (int i = 0; i < 5; i++) {
CompletableFuture.runAsync(() -> {
while (true) {
Runnable task = null;
try {
task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
}
}
}
在这个例子中,我们创建了一个任务队列,并向队列中添加了10个任务。然后通过 CompletableFuture.runAsync
启动5个异步线程从队列中取出任务并执行,提高任务处理的效率。
- 处理依赖关系复杂的任务
有时候任务之间存在复杂的依赖关系,比如任务B依赖任务A的结果,任务C依赖任务B和另一个任务D的结果等。
CompletableFuture
可以很好地处理这种情况。
import java.util.concurrent.CompletableFuture;
public class ComplexDependencyTasks {
public static void main(String[] args) {
CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result of task A";
});
CompletableFuture<String> taskB = taskA.thenApply(resultA -> {
System.out.println("Using result of task A: " + resultA);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result of task B based on A";
});
CompletableFuture<String> taskD = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result of task D";
});
CompletableFuture<String> taskC = taskB.thenCombine(taskD, (resultB, resultD) -> {
System.out.println("Using result of task B: " + resultB);
System.out.println("Using result of task D: " + resultD);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result of task C based on B and D";
});
taskC.thenAccept(System.out::println);
}
}
在这个例子中,taskB
依赖 taskA
的结果,taskC
依赖 taskB
和 taskD
的结果。通过 CompletableFuture
的链式调用和组合方法,清晰地表达了任务之间的依赖关系,并异步执行这些任务。
性能优化与注意事项
- 线程池的使用
在使用
CompletableFuture
时,默认情况下,supplyAsync
和runAsync
方法使用的是ForkJoinPool.commonPool()
。对于高并发场景,如果任务量较大且任务执行时间较长,可能会导致commonPool
线程池饱和,影响性能。因此,建议根据实际情况创建自定义的线程池。
import java.util.concurrent.*;
public class CustomThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result";
}, executor)
.thenAccept(System.out::println);
executor.shutdown();
}
}
在这个例子中,我们创建了一个固定大小为10的线程池,并将其传递给 supplyAsync
方法。这样可以更好地控制线程资源的使用。
-
资源管理 在高并发场景下,要注意资源的合理使用和释放。例如,在异步任务中可能会使用数据库连接、文件句柄等资源,确保在任务完成后及时释放这些资源,避免资源泄漏。
-
异常处理策略 在处理多个
CompletableFuture
组合的场景中,要确保异常能够被正确捕获和处理。如果一个CompletableFuture
抛出异常,可能会影响整个任务链的执行,因此需要根据业务需求制定合适的异常处理策略,如重试、回滚等。 -
避免过度并行 虽然并行处理可以提高性能,但过度并行可能会导致线程上下文切换开销增大、资源竞争加剧等问题。需要根据系统的硬件资源(如CPU核心数、内存等)和任务的特性(如I/O密集型还是CPU密集型)来合理设置并行度。
总结
CompletableFuture
为Java开发者提供了强大的异步编程能力,在高并发场景下,通过合理使用 CompletableFuture
的各种特性,如链式调用、异常处理、任务组合等,可以显著提高系统的性能和响应速度。同时,要注意线程池的使用、资源管理、异常处理和并行度的控制等方面,以确保系统在高并发环境下的稳定运行。通过不断实践和优化,我们能够更好地利用 CompletableFuture
来构建高效、可靠的高并发应用程序。
希望以上内容对你深入理解和应用 CompletableFuture
应对高并发场景有所帮助。在实际开发中,根据具体业务需求灵活运用这些知识,将能有效提升系统的性能和用户体验。