Java中的CompletableFuture与协程式异步编程
Java中的CompletableFuture基础
在Java的后端开发网络编程中,异步编程是提升程序性能和响应能力的关键技术。CompletableFuture
作为Java 8引入的重要类,极大地简化了异步编程模型。它实现了Future
和CompletionStage
接口,不仅能获取异步操作的结果,还支持链式调用、组合多个异步操作等强大功能。
创建CompletableFuture
-
使用
CompletableFuture.supplyAsync
创建有返回值的异步任务import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello, CompletableFuture!"; }); String result = future.get(); System.out.println(result); } }
在上述代码中,
CompletableFuture.supplyAsync
接受一个Supplier
接口的实现,这里是一个Lambda表达式。它会在一个新的线程中执行这个Supplier
的get
方法,并返回一个CompletableFuture
对象。通过future.get()
方法可以获取异步操作的结果,不过get
方法是阻塞的,直到异步任务完成。 -
使用
CompletableFuture.runAsync
创建无返回值的异步任务import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureNoReturnExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 模拟耗时操作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("This is a no - return CompletableFuture task."); }); future.get(); } }
CompletableFuture.runAsync
接受一个Runnable
接口的实现,同样在新线程中执行。由于Runnable
没有返回值,所以CompletableFuture
的泛型为Void
。
获取异步任务结果
-
get
方法get
方法会阻塞当前线程,直到CompletableFuture
完成并返回结果。如果异步任务抛出异常,get
方法会将异常包装成ExecutionException
或InterruptedException
抛出。- 如前面示例中
String result = future.get();
,主线程会等待future
任务完成并返回结果。
-
get(long timeout, TimeUnit unit)
方法- 该方法允许设置一个超时时间。如果在指定的时间内
CompletableFuture
没有完成,会抛出TimeoutException
。
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CompletableFutureTimeoutExample { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result"; }); try { String result = future.get(2, TimeUnit.SECONDS); System.out.println(result); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); } } }
在这个示例中,设置了2秒的超时时间,而异步任务需要3秒完成,所以会抛出
TimeoutException
。 - 该方法允许设置一个超时时间。如果在指定的时间内
-
join
方法join
方法与get
方法类似,也是阻塞当前线程获取结果。不同的是,如果异步任务抛出异常,join
方法会直接抛出原始异常,而不是包装成ExecutionException
或InterruptedException
。
import java.util.concurrent.CompletableFuture; public class CompletableFutureJoinExample { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Task failed"); }); try { String result = future.join(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } } }
这里异步任务抛出了
RuntimeException
,join
方法会直接抛出这个异常,在catch
块中可以捕获到原始异常。 -
getNow(T valueIfAbsent)
方法- 如果
CompletableFuture
已经完成,getNow
方法返回结果;如果尚未完成,返回传入的valueIfAbsent
参数。
import java.util.concurrent.CompletableFuture; public class CompletableFutureGetNowExample { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Completed"; }); String result1 = future.getNow("Not completed yet"); System.out.println(result1); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } String result2 = future.getNow("Not completed yet"); System.out.println(result2); } }
第一次调用
getNow
时,异步任务尚未完成,所以返回"Not completed yet"
;第二次调用时,异步任务已完成,返回"Completed"
。 - 如果
-
complete(T value)
方法- 可以手动设置
CompletableFuture
的结果。如果CompletableFuture
已经完成,调用complete
方法不会生效。
import java.util.concurrent.CompletableFuture; public class CompletableFutureCompleteExample { public static void main(String[] args) { CompletableFuture<String> future = new CompletableFuture<>(); new Thread(() -> { try { Thread.sleep(2000); future.complete("Manually completed"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); try { String result = future.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
在这个例子中,通过
complete
方法手动设置了CompletableFuture
的结果,主线程通过get
方法获取这个结果。 - 可以手动设置
CompletableFuture的链式调用
CompletableFuture
的强大之处在于其支持链式调用,能够方便地对异步任务进行组合和转换。
thenApply方法
thenApply
方法用于对CompletableFuture
的结果进行转换,它接受一个Function
接口的实现。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureThenApplyExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + ", World")
.thenApply(String::toUpperCase)
.thenAccept(System.out::println);
}
}
在上述代码中,首先通过CompletableFuture.supplyAsync
创建一个异步任务返回"Hello"
。接着使用thenApply
方法,第一个thenApply
将字符串转换为"Hello, World"
,第二个thenApply
将其转换为大写"HELLO, WORLD"
,最后通过thenAccept
方法消费这个结果并打印。
thenAccept方法
thenAccept
方法用于消费CompletableFuture
的结果,它接受一个Consumer
接口的实现,没有返回值。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureThenAcceptExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Result")
.thenAccept(System.out::println);
}
}
这里CompletableFuture
完成后,thenAccept
中的Consumer
会处理返回的结果,在这个例子中就是将结果打印出来。
thenRun方法
thenRun
方法在CompletableFuture
完成后执行一个Runnable
任务,不关心CompletableFuture
的结果。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureThenRunExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Some result")
.thenRun(() -> System.out.println("Task completed, but I don't care about the result"));
}
}
异步任务完成后,thenRun
中的Runnable
任务会被执行,打印出相应的信息。
handle方法
handle
方法会在CompletableFuture
完成时(无论成功还是失败)执行,它接受一个BiFunction
接口的实现,该接口的第一个参数是CompletableFuture
的结果,第二个参数是异常(如果有)。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureHandleExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
return "Success";
} else {
throw new RuntimeException("Failure");
}
}).handle((result, ex) -> {
if (ex != null) {
System.out.println("Exception occurred: " + ex.getMessage());
return "Default value";
} else {
System.out.println("Result: " + result);
return result;
}
}).thenAccept(System.out::println);
}
}
在这个例子中,根据随机数决定是否抛出异常。handle
方法会处理任务的结果或异常,并返回一个新的值,这个值会传递给后续的thenAccept
方法。
exceptionally方法
exceptionally
方法用于处理CompletableFuture
中的异常,它接受一个Function
接口的实现,该接口的参数是异常对象,返回值是用于替代异常情况的结果。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExceptionallyExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Task failed");
}).exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return "Default result";
}).thenAccept(System.out::println);
}
}
当异步任务抛出异常时,exceptionally
中的Function
会被调用,打印异常信息并返回一个默认结果,这个结果会被后续的thenAccept
方法处理。
CompletableFuture的组合操作
在实际应用中,常常需要组合多个异步任务。CompletableFuture
提供了丰富的方法来实现这一需求。
thenCombine方法
thenCombine
方法用于将两个CompletableFuture
的结果进行合并,它接受另一个CompletableFuture
和一个BiFunction
接口的实现。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureThenCombineExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + ", " + s2);
combinedFuture.thenAccept(System.out::println);
}
}
在这个例子中,future1
和future2
是两个异步任务,thenCombine
方法将它们的结果合并成一个新的字符串"Hello, World"
,并通过thenAccept
方法打印出来。
applyToEither方法
applyToEither
方法表示两个CompletableFuture
中只要有一个完成,就对其结果应用一个Function
。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureApplyToEitherExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 1 result";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 2 result";
});
CompletableFuture<String> resultFuture = future1.applyToEither(future2, s -> s + " processed");
resultFuture.thenAccept(System.out::println);
}
}
这里future2
会先完成,所以applyToEither
方法会对future2
的结果应用Function
,打印出"Future 2 result processed"
。
runAfterEither方法
runAfterEither
方法在两个CompletableFuture
中任意一个完成后执行一个Runnable
任务。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureRunAfterEitherExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 1 result";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 2 result";
});
CompletableFuture<Void> voidFuture = future1.runAfterEither(future2, () -> System.out.println("One of the futures completed"));
try {
voidFuture.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}
当future2
先完成时,runAfterEither
中的Runnable
任务会被执行,打印出相应信息。
allOf方法
allOf
方法用于等待所有CompletableFuture
都完成。它返回一个新的CompletableFuture<Void>
,当所有传入的CompletableFuture
都完成时,这个新的CompletableFuture
才完成。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureAllOfExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 1 result";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 2 result";
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
allFuture.thenRun(() -> {
try {
System.out.println(future1.get());
System.out.println(future2.get());
} catch (Exception e) {
e.printStackTrace();
}
}).join();
}
}
在这个例子中,allOf
方法返回的CompletableFuture<Void>
会在future1
和future2
都完成后才完成。然后通过thenRun
方法获取并打印两个CompletableFuture
的结果。
anyOf方法
anyOf
方法返回一个新的CompletableFuture
,当传入的多个CompletableFuture
中任意一个完成时,这个新的CompletableFuture
就完成,并且其结果就是第一个完成的CompletableFuture
的结果。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureAnyOfExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 1 result";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Future 2 result";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
anyFuture.thenAccept(System.out::println).join();
}
}
由于future2
先完成,所以anyFuture
的结果就是future2
的结果,会打印出"Future 2 result"
。
协程式异步编程概念
协程是一种轻量级的线程,与传统线程不同,协程的调度由用户空间控制,而不是操作系统内核。在Java中,虽然没有原生的协程支持,但通过一些框架如Quasar可以实现协程式异步编程。
协程的优势
- 轻量级:协程的创建和销毁开销比线程小得多。一个应用程序可以创建数以万计的协程,而创建过多的线程会导致资源耗尽。
- 非抢占式调度:协程的调度是协作式的,即一个协程在执行过程中可以主动让出执行权,而不像线程那样由操作系统强制抢占。这使得编程模型更加可控,避免了线程切换带来的开销和资源竞争问题。
协程与线程的关系
协程可以运行在一个或多个线程之上。多个协程可以复用同一个线程,当一个协程让出执行权时,线程可以执行其他协程。这种复用机制提高了线程的利用率,减少了线程上下文切换的开销。
使用Quasar实现协程式异步编程
Quasar是一个基于Java的库,它提供了协程的支持。
引入Quasar依赖
在pom.xml
文件中添加Quasar依赖:
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.8.10</version>
</dependency>
创建和运行协程
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
public class QuasarCoroutineExample {
public static void main(String[] args) {
Fiber<Void> fiber = new Fiber<Void>() {
@Override
protected Void run() throws SuspendExecution, InterruptedException {
System.out.println("Coroutine started");
Fiber.sleep(2000);
System.out.println("Coroutine resumed after 2 seconds");
return null;
}
};
fiber.start();
try {
fiber.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在上述代码中,通过继承Fiber
类创建了一个协程。在run
方法中,使用Fiber.sleep
方法模拟了一个耗时操作,这里的Fiber.sleep
不会阻塞线程,而是让出协程的执行权。fiber.start()
启动协程,fiber.join()
等待协程执行完毕。
协程的异步调用
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.async.Async;
public class QuasarAsyncExample {
public static void main(String[] args) {
Fiber<Void> fiber = new Fiber<Void>() {
@Override
protected Void run() throws SuspendExecution, InterruptedException {
Fiber<String> asyncFiber = Async.future(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Async result";
});
String result = asyncFiber.get();
System.out.println("Result from async operation: " + result);
return null;
}
};
fiber.start();
try {
fiber.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这里使用Async.future
创建了一个异步的协程操作,asyncFiber.get()
会等待异步操作完成并获取结果,整个过程不会阻塞主线程所在的线程,体现了协程式异步编程的优势。
CompletableFuture与协程式异步编程对比
-
编程模型
- CompletableFuture:基于回调和链式调用的方式,通过
Future
接口获取异步结果。它的编程模型相对传统,易于理解和掌握,适合处理简单到中等复杂度的异步任务。 - 协程式异步编程:采用类似同步编程的方式编写异步代码,通过
yield
或await
等方式暂停和恢复协程执行。这种方式使异步代码看起来更像同步代码,对于复杂的异步逻辑,代码的可读性和维护性更好。
- CompletableFuture:基于回调和链式调用的方式,通过
-
性能和资源消耗
- CompletableFuture:依赖于Java的线程池机制,虽然在处理异步任务时能提高效率,但线程的创建和上下文切换仍然有一定开销。当有大量异步任务时,可能会消耗较多系统资源。
- 协程式异步编程:协程是轻量级的,创建和销毁开销小,多个协程可以复用同一个线程,大大减少了资源消耗。在处理高并发、大量异步任务时,协程式异步编程在性能和资源利用上更具优势。
-
应用场景
- CompletableFuture:适合于一般的后端异步任务处理,如数据库查询、网络请求等。对于不需要大量并发异步任务的场景,它能很好地满足需求。
- 协程式异步编程:更适合高并发、I/O密集型的场景,如网络服务器开发、海量数据处理等。在这些场景下,协程的轻量级特性和高效的调度机制能显著提升系统性能。
在实际的后端开发网络编程中,开发者需要根据具体的业务需求和场景来选择合适的异步编程方式。如果是简单的异步任务,CompletableFuture
是一个不错的选择;而对于高并发、资源敏感的场景,协程式异步编程可能会带来更好的效果。通过合理运用这两种异步编程技术,能够开发出更高效、更健壮的后端应用程序。