Java Future与Callable接口的使用
Java Future与Callable接口的使用
在Java并发编程中,Future
与Callable
接口是非常重要的工具,它们为我们提供了一种异步执行任务并获取结果的方式。传统的Runnable
接口可以用于创建线程来执行任务,但它无法返回执行结果。而Callable
接口弥补了这一不足,它允许任务返回一个结果,并且Future
接口则用于获取这个结果。
1. Callable接口
Callable
接口定义在java.util.concurrent
包中,它只有一个方法call()
,这个方法可以抛出异常并且返回一个泛型类型的结果。与Runnable
接口的run()
方法不同,call()
方法允许返回值和抛出异常。
import java.util.concurrent.Callable;
public class FactorialCalculator implements Callable<Integer> {
private int number;
public FactorialCalculator(int number) {
this.number = number;
}
@Override
public Integer call() throws Exception {
int factorial = 1;
if (number == 0 || number == 1) {
factorial = 1;
} else {
for (int i = 2; i <= number; i++) {
factorial *= i;
}
}
return factorial;
}
}
在上述代码中,FactorialCalculator
类实现了Callable
接口,call()
方法计算给定数字的阶乘并返回结果。如果在计算过程中出现问题(例如输入负数),可以抛出相应的异常。
2. Future接口
Future
接口同样位于java.util.concurrent
包中,它提供了方法来检查任务是否完成,等待任务完成并获取任务执行的结果,甚至可以取消任务。
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
FactorialCalculator calculator = new FactorialCalculator(5);
Future<Integer> future = executorService.submit(calculator);
executorService.shutdown();
try {
while (!future.isDone()) {
System.out.println("Task is still running...");
Thread.sleep(100);
}
System.out.println("Task completed. Result: " + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在上述代码中:
- 首先创建了一个
ExecutorService
,这里使用Executors.newSingleThreadExecutor()
创建了一个单线程的线程池。 - 然后创建了
FactorialCalculator
实例,并将其提交给ExecutorService
的submit()
方法,该方法返回一个Future
对象。 - 通过
future.isDone()
方法检查任务是否完成,如果未完成则打印提示信息并等待。 - 最后通过
future.get()
方法获取任务的执行结果。如果任务在执行过程中抛出异常,get()
方法会重新抛出这些异常,由调用者捕获处理。
3. Future的方法详解
boolean cancel(boolean mayInterruptIfRunning)
:尝试取消任务的执行。如果任务已经完成、已经被取消或者因为某些原因无法取消,则此方法会返回false
。如果任务还没有开始执行,那么任务将被取消且此方法返回true
。如果任务已经开始执行,并且mayInterruptIfRunning
参数为true
,那么正在执行任务的线程可能会被中断,以尝试停止任务,此时此方法也返回true
。
import java.util.concurrent.*;
public class CancelFutureExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> callable = () -> {
try {
Thread.sleep(5000);
return "Task completed";
} catch (InterruptedException e) {
return "Task interrupted";
}
};
Future<String> future = executorService.submit(callable);
try {
Thread.sleep(1000);
boolean cancelled = future.cancel(true);
System.out.println("Task cancelled: " + cancelled);
try {
System.out.println("Result: " + future.get());
} catch (CancellationException e) {
System.out.println("Task was cancelled, cannot get result.");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
在上述代码中,创建了一个会睡眠5秒的Callable
任务,提交后主线程睡眠1秒,然后尝试取消任务。如果取消成功,再尝试获取任务结果时会抛出CancellationException
。
boolean isCancelled()
:判断任务是否在完成之前被取消。如果任务在正常完成之前被取消,则返回true
。
import java.util.concurrent.*;
public class IsCancelledExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> callable = () -> {
try {
Thread.sleep(5000);
return "Task completed";
} catch (InterruptedException e) {
return "Task interrupted";
}
};
Future<String> future = executorService.submit(callable);
try {
Thread.sleep(1000);
boolean cancelled = future.cancel(true);
System.out.println("Task cancelled: " + cancelled);
boolean isCancelled = future.isCancelled();
System.out.println("Is task cancelled? " + isCancelled);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
在这段代码中,在尝试取消任务后,通过isCancelled()
方法判断任务是否已被取消。
boolean isDone()
:判断任务是否已完成。任务完成的情况包括正常结束、因异常结束或者被取消。
import java.util.concurrent.*;
public class IsDoneExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> callable = () -> {
try {
Thread.sleep(3000);
return "Task completed";
} catch (InterruptedException e) {
return "Task interrupted";
}
};
Future<String> future = executorService.submit(callable);
try {
while (!future.isDone()) {
System.out.println("Task is still running...");
Thread.sleep(500);
}
System.out.println("Task completed. Result: " + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
在此代码中,通过isDone()
方法在循环中检查任务是否完成,直到任务完成才获取结果。
V get()
throws InterruptedException, ExecutionException:等待任务完成,并返回任务的结果。如果任务在等待过程中被中断,会抛出InterruptedException
;如果任务执行过程中抛出异常,会抛出ExecutionException
,并且可以通过ExecutionException.getCause()
获取原始异常。
import java.util.concurrent.*;
public class GetResultExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<Integer> callable = () -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Task failed");
}
return 42;
};
Future<Integer> future = executorService.submit(callable);
try {
Integer result = future.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
System.out.println("Exception occurred: " + e.getMessage());
if (e instanceof ExecutionException) {
Throwable cause = e.getCause();
System.out.println("Original cause: " + cause.getMessage());
}
} finally {
executorService.shutdown();
}
}
}
在这段代码中,Callable
任务以一定概率抛出异常。通过get()
方法获取结果时,如果任务抛出异常,ExecutionException
会被捕获,并且可以获取原始异常信息。
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
:在指定的时间内等待任务完成并返回结果。如果在指定时间内任务未完成,会抛出TimeoutException
。
import java.util.concurrent.*;
public class GetWithTimeoutExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> callable = () -> {
Thread.sleep(5000);
return "Task completed";
};
Future<String> future = executorService.submit(callable);
try {
String result = future.get(2, TimeUnit.SECONDS);
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
if (e instanceof TimeoutException) {
System.out.println("Task timed out");
} else {
e.printStackTrace();
}
} finally {
executorService.shutdown();
}
}
}
在上述代码中,get(2, TimeUnit.SECONDS)
方法等待任务在2秒内完成。由于任务需要5秒完成,因此会抛出TimeoutException
。
4. FutureTask类
FutureTask
类实现了Future
和Runnable
接口,它既可以作为Runnable
被线程执行,也可以作为Future
来获取任务执行的结果。
import java.util.concurrent.*;
public class FutureTaskExample {
public static void main(String[] args) {
Callable<Integer> callable = () -> {
Thread.sleep(3000);
return 42;
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
Thread thread = new Thread(futureTask);
thread.start();
try {
while (!futureTask.isDone()) {
System.out.println("Task is still running...");
Thread.sleep(500);
}
System.out.println("Task completed. Result: " + futureTask.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在上述代码中:
- 首先创建了一个
Callable
任务。 - 然后使用这个
Callable
创建了一个FutureTask
实例。 - 接着创建一个线程并将
FutureTask
作为Runnable
传入,启动线程执行任务。 - 主线程通过
futureTask.isDone()
和futureTask.get()
方法来检查任务状态并获取结果,与使用ExecutorService
和Future
的方式类似,但这里直接使用Thread
来执行任务。
5. CompletionService
CompletionService
结合了Executor
和BlockingQueue
的功能。它允许提交一组Callable
任务,并且当任务完成时,其结果会被放置在一个阻塞队列中。这使得我们可以按照任务完成的顺序来获取结果,而不是按照提交任务的顺序。
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);
for (int i = 1; i <= 5; i++) {
int number = i;
completionService.submit(() -> {
Thread.sleep((long) (Math.random() * 3000));
return number * number;
});
}
for (int i = 1; i <= 5; i++) {
try {
Future<Integer> future = completionService.take();
System.out.println("Result of task " + i + ": " + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executorService.shutdown();
}
}
在上述代码中:
- 创建了一个固定大小为3的线程池和一个
ExecutorCompletionService
。 - 提交了5个
Callable
任务,每个任务随机睡眠一段时间后返回一个数的平方。 - 通过
completionService.take()
方法从阻塞队列中获取已完成任务的Future
对象,然后使用future.get()
获取任务结果。这样可以确保按照任务完成的顺序获取结果,而不是按照提交的顺序。
6. 使用场景
- 异步计算:当某个计算任务比较耗时,且不希望主线程等待其完成时,可以使用
Callable
和Future
进行异步计算。例如,在一个Web应用中,可能需要从多个不同的数据源获取数据并进行汇总,这些数据源的获取操作可以异步执行,最后汇总结果返回给用户。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class AsyncDataFetching {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
Callable<Integer> dataSource1 = () -> {
Thread.sleep(2000);
return 10;
};
Callable<Integer> dataSource2 = () -> {
Thread.sleep(3000);
return 20;
};
Callable<Integer> dataSource3 = () -> {
Thread.sleep(1500);
return 15;
};
Future<Integer> future1 = executorService.submit(dataSource1);
Future<Integer> future2 = executorService.submit(dataSource2);
Future<Integer> future3 = executorService.submit(dataSource3);
AtomicInteger total = new AtomicInteger();
try {
total.addAndGet(future1.get());
total.addAndGet(future2.get());
total.addAndGet(future3.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("Total: " + total.get());
executorService.shutdown();
}
}
在上述代码中,模拟从三个不同数据源获取数据,每个数据源获取数据的操作是异步的。主线程等待所有异步任务完成后汇总结果。
- 并行计算:对于一些可以并行处理的任务,使用
Callable
和Future
可以充分利用多核CPU的优势。例如,对一个大数据集进行排序,可以将数据集分成多个部分,每个部分使用一个Callable
任务并行排序,最后合并结果。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class ParallelSorting {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Integer> data = Arrays.asList(5, 3, 8, 1, 9, 2, 7, 4, 6);
int chunkSize = data.size() / 4;
List<Future<List<Integer>>> futures = new ArrayList<>();
for (int i = 0; i < 4; i++) {
int start = i * chunkSize;
int end = (i == 3)? data.size() : (i + 1) * chunkSize;
List<Integer> subList = data.subList(start, end);
Callable<List<Integer>> sorter = () -> {
List<Integer> sortedSubList = new ArrayList<>(subList);
sortedSubList.sort(null);
return sortedSubList;
};
futures.add(executorService.submit(sorter));
}
List<Integer> finalSortedList = new ArrayList<>();
for (Future<List<Integer>> future : futures) {
try {
finalSortedList.addAll(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
finalSortedList.sort(null);
System.out.println("Final sorted list: " + finalSortedList);
executorService.shutdown();
}
}
在这段代码中,将一个整数列表分成4个部分,每个部分由一个Callable
任务并行排序,最后合并并再次排序得到最终结果。
- 任务取消:在一些场景下,可能需要在任务执行过程中取消任务。例如,用户在一个长时间运行的操作中点击了取消按钮,这时可以通过
Future
的cancel()
方法尝试取消任务。
import java.util.concurrent.*;
public class CancelableTaskExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> callable = () -> {
for (int i = 0; i < 10; i++) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
Thread.sleep(1000);
System.out.println("Task is running: " + i);
}
return "Task completed";
};
Future<String> future = executorService.submit(callable);
try {
Thread.sleep(3500);
boolean cancelled = future.cancel(true);
System.out.println("Task cancelled: " + cancelled);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
在上述代码中,Callable
任务在执行过程中会检查线程是否被中断。主线程在任务执行3.5秒后尝试取消任务,如果任务正在执行,会因为线程中断而提前结束。
7. 注意事项
-
阻塞问题:
Future.get()
方法是阻塞的,如果在调用此方法时任务还未完成,调用线程会一直等待,直到任务完成。这可能会导致死锁或性能问题,尤其是在多个线程互相等待对方的Future
结果时。为避免死锁,可以使用get(long timeout, TimeUnit unit)
方法设置等待超时时间。 -
异常处理:任务执行过程中抛出的异常会被封装在
ExecutionException
中,通过get()
方法获取结果时会重新抛出。因此在调用get()
方法时需要妥善处理ExecutionException
和InterruptedException
。 -
内存泄漏:如果
Future
对象没有被正确管理,例如在任务完成后没有及时释放相关资源,可能会导致内存泄漏。特别是在使用线程池时,如果Future
对象一直持有对任务的引用,而任务又持有大量资源,可能会影响系统性能。 -
任务取消的不确定性:调用
Future.cancel(true)
方法尝试取消任务时,并不保证任务一定能被取消。如果任务在执行一些无法中断的操作(如I/O操作),可能无法响应中断请求,导致任务无法取消。在编写Callable
任务时,应尽量使任务可中断,例如在循环中定期检查Thread.currentThread().isInterrupted()
。
通过合理使用Future
与Callable
接口,Java开发者可以更加灵活地控制并发任务,提高程序的性能和响应性,同时避免一些常见的并发编程问题。无论是异步计算、并行处理还是任务取消等场景,这两个接口都为我们提供了强大的工具。在实际应用中,需要根据具体需求,结合线程池、FutureTask
、CompletionService
等相关类,选择最合适的方式来实现高效的并发编程。