MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java Future与Callable接口的使用

2023-04-286.0k 阅读

Java Future与Callable接口的使用

在Java并发编程中,FutureCallable接口是非常重要的工具,它们为我们提供了一种异步执行任务并获取结果的方式。传统的Runnable接口可以用于创建线程来执行任务,但它无法返回执行结果。而Callable接口弥补了这一不足,它允许任务返回一个结果,并且Future接口则用于获取这个结果。

1. Callable接口

Callable接口定义在java.util.concurrent包中,它只有一个方法call(),这个方法可以抛出异常并且返回一个泛型类型的结果。与Runnable接口的run()方法不同,call()方法允许返回值和抛出异常。

import java.util.concurrent.Callable;

public class FactorialCalculator implements Callable<Integer> {
    private int number;

    public FactorialCalculator(int number) {
        this.number = number;
    }

    @Override
    public Integer call() throws Exception {
        int factorial = 1;
        if (number == 0 || number == 1) {
            factorial = 1;
        } else {
            for (int i = 2; i <= number; i++) {
                factorial *= i;
            }
        }
        return factorial;
    }
}

在上述代码中,FactorialCalculator类实现了Callable接口,call()方法计算给定数字的阶乘并返回结果。如果在计算过程中出现问题(例如输入负数),可以抛出相应的异常。

2. Future接口

Future接口同样位于java.util.concurrent包中,它提供了方法来检查任务是否完成,等待任务完成并获取任务执行的结果,甚至可以取消任务。

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        FactorialCalculator calculator = new FactorialCalculator(5);
        Future<Integer> future = executorService.submit(calculator);

        executorService.shutdown();

        try {
            while (!future.isDone()) {
                System.out.println("Task is still running...");
                Thread.sleep(100);
            }
            System.out.println("Task completed. Result: " + future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中:

  • 首先创建了一个ExecutorService,这里使用Executors.newSingleThreadExecutor()创建了一个单线程的线程池。
  • 然后创建了FactorialCalculator实例,并将其提交给ExecutorServicesubmit()方法,该方法返回一个Future对象。
  • 通过future.isDone()方法检查任务是否完成,如果未完成则打印提示信息并等待。
  • 最后通过future.get()方法获取任务的执行结果。如果任务在执行过程中抛出异常,get()方法会重新抛出这些异常,由调用者捕获处理。

3. Future的方法详解

  • boolean cancel(boolean mayInterruptIfRunning):尝试取消任务的执行。如果任务已经完成、已经被取消或者因为某些原因无法取消,则此方法会返回false。如果任务还没有开始执行,那么任务将被取消且此方法返回true。如果任务已经开始执行,并且mayInterruptIfRunning参数为true,那么正在执行任务的线程可能会被中断,以尝试停止任务,此时此方法也返回true
import java.util.concurrent.*;

public class CancelFutureExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Callable<String> callable = () -> {
            try {
                Thread.sleep(5000);
                return "Task completed";
            } catch (InterruptedException e) {
                return "Task interrupted";
            }
        };

        Future<String> future = executorService.submit(callable);

        try {
            Thread.sleep(1000);
            boolean cancelled = future.cancel(true);
            System.out.println("Task cancelled: " + cancelled);
            try {
                System.out.println("Result: " + future.get());
            } catch (CancellationException e) {
                System.out.println("Task was cancelled, cannot get result.");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

在上述代码中,创建了一个会睡眠5秒的Callable任务,提交后主线程睡眠1秒,然后尝试取消任务。如果取消成功,再尝试获取任务结果时会抛出CancellationException

  • boolean isCancelled():判断任务是否在完成之前被取消。如果任务在正常完成之前被取消,则返回true
import java.util.concurrent.*;

public class IsCancelledExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Callable<String> callable = () -> {
            try {
                Thread.sleep(5000);
                return "Task completed";
            } catch (InterruptedException e) {
                return "Task interrupted";
            }
        };

        Future<String> future = executorService.submit(callable);

        try {
            Thread.sleep(1000);
            boolean cancelled = future.cancel(true);
            System.out.println("Task cancelled: " + cancelled);
            boolean isCancelled = future.isCancelled();
            System.out.println("Is task cancelled? " + isCancelled);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

在这段代码中,在尝试取消任务后,通过isCancelled()方法判断任务是否已被取消。

  • boolean isDone():判断任务是否已完成。任务完成的情况包括正常结束、因异常结束或者被取消。
import java.util.concurrent.*;

public class IsDoneExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Callable<String> callable = () -> {
            try {
                Thread.sleep(3000);
                return "Task completed";
            } catch (InterruptedException e) {
                return "Task interrupted";
            }
        };

        Future<String> future = executorService.submit(callable);

        try {
            while (!future.isDone()) {
                System.out.println("Task is still running...");
                Thread.sleep(500);
            }
            System.out.println("Task completed. Result: " + future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

在此代码中,通过isDone()方法在循环中检查任务是否完成,直到任务完成才获取结果。

  • V get() throws InterruptedException, ExecutionException:等待任务完成,并返回任务的结果。如果任务在等待过程中被中断,会抛出InterruptedException;如果任务执行过程中抛出异常,会抛出ExecutionException,并且可以通过ExecutionException.getCause()获取原始异常。
import java.util.concurrent.*;

public class GetResultExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Callable<Integer> callable = () -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Task failed");
            }
            return 42;
        };

        Future<Integer> future = executorService.submit(callable);

        try {
            Integer result = future.get();
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("Exception occurred: " + e.getMessage());
            if (e instanceof ExecutionException) {
                Throwable cause = e.getCause();
                System.out.println("Original cause: " + cause.getMessage());
            }
        } finally {
            executorService.shutdown();
        }
    }
}

在这段代码中,Callable任务以一定概率抛出异常。通过get()方法获取结果时,如果任务抛出异常,ExecutionException会被捕获,并且可以获取原始异常信息。

  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException:在指定的时间内等待任务完成并返回结果。如果在指定时间内任务未完成,会抛出TimeoutException
import java.util.concurrent.*;

public class GetWithTimeoutExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Callable<String> callable = () -> {
            Thread.sleep(5000);
            return "Task completed";
        };

        Future<String> future = executorService.submit(callable);

        try {
            String result = future.get(2, TimeUnit.SECONDS);
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            if (e instanceof TimeoutException) {
                System.out.println("Task timed out");
            } else {
                e.printStackTrace();
            }
        } finally {
            executorService.shutdown();
        }
    }
}

在上述代码中,get(2, TimeUnit.SECONDS)方法等待任务在2秒内完成。由于任务需要5秒完成,因此会抛出TimeoutException

4. FutureTask类

FutureTask类实现了FutureRunnable接口,它既可以作为Runnable被线程执行,也可以作为Future来获取任务执行的结果。

import java.util.concurrent.*;

public class FutureTaskExample {
    public static void main(String[] args) {
        Callable<Integer> callable = () -> {
            Thread.sleep(3000);
            return 42;
        };

        FutureTask<Integer> futureTask = new FutureTask<>(callable);

        Thread thread = new Thread(futureTask);
        thread.start();

        try {
            while (!futureTask.isDone()) {
                System.out.println("Task is still running...");
                Thread.sleep(500);
            }
            System.out.println("Task completed. Result: " + futureTask.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中:

  • 首先创建了一个Callable任务。
  • 然后使用这个Callable创建了一个FutureTask实例。
  • 接着创建一个线程并将FutureTask作为Runnable传入,启动线程执行任务。
  • 主线程通过futureTask.isDone()futureTask.get()方法来检查任务状态并获取结果,与使用ExecutorServiceFuture的方式类似,但这里直接使用Thread来执行任务。

5. CompletionService

CompletionService结合了ExecutorBlockingQueue的功能。它允许提交一组Callable任务,并且当任务完成时,其结果会被放置在一个阻塞队列中。这使得我们可以按照任务完成的顺序来获取结果,而不是按照提交任务的顺序。

import java.util.concurrent.*;

public class CompletionServiceExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);

        for (int i = 1; i <= 5; i++) {
            int number = i;
            completionService.submit(() -> {
                Thread.sleep((long) (Math.random() * 3000));
                return number * number;
            });
        }

        for (int i = 1; i <= 5; i++) {
            try {
                Future<Integer> future = completionService.take();
                System.out.println("Result of task " + i + ": " + future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        executorService.shutdown();
    }
}

在上述代码中:

  • 创建了一个固定大小为3的线程池和一个ExecutorCompletionService
  • 提交了5个Callable任务,每个任务随机睡眠一段时间后返回一个数的平方。
  • 通过completionService.take()方法从阻塞队列中获取已完成任务的Future对象,然后使用future.get()获取任务结果。这样可以确保按照任务完成的顺序获取结果,而不是按照提交的顺序。

6. 使用场景

  • 异步计算:当某个计算任务比较耗时,且不希望主线程等待其完成时,可以使用CallableFuture进行异步计算。例如,在一个Web应用中,可能需要从多个不同的数据源获取数据并进行汇总,这些数据源的获取操作可以异步执行,最后汇总结果返回给用户。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class AsyncDataFetching {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        Callable<Integer> dataSource1 = () -> {
            Thread.sleep(2000);
            return 10;
        };

        Callable<Integer> dataSource2 = () -> {
            Thread.sleep(3000);
            return 20;
        };

        Callable<Integer> dataSource3 = () -> {
            Thread.sleep(1500);
            return 15;
        };

        Future<Integer> future1 = executorService.submit(dataSource1);
        Future<Integer> future2 = executorService.submit(dataSource2);
        Future<Integer> future3 = executorService.submit(dataSource3);

        AtomicInteger total = new AtomicInteger();

        try {
            total.addAndGet(future1.get());
            total.addAndGet(future2.get());
            total.addAndGet(future3.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("Total: " + total.get());
        executorService.shutdown();
    }
}

在上述代码中,模拟从三个不同数据源获取数据,每个数据源获取数据的操作是异步的。主线程等待所有异步任务完成后汇总结果。

  • 并行计算:对于一些可以并行处理的任务,使用CallableFuture可以充分利用多核CPU的优势。例如,对一个大数据集进行排序,可以将数据集分成多个部分,每个部分使用一个Callable任务并行排序,最后合并结果。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class ParallelSorting {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        List<Integer> data = Arrays.asList(5, 3, 8, 1, 9, 2, 7, 4, 6);
        int chunkSize = data.size() / 4;

        List<Future<List<Integer>>> futures = new ArrayList<>();

        for (int i = 0; i < 4; i++) {
            int start = i * chunkSize;
            int end = (i == 3)? data.size() : (i + 1) * chunkSize;
            List<Integer> subList = data.subList(start, end);

            Callable<List<Integer>> sorter = () -> {
                List<Integer> sortedSubList = new ArrayList<>(subList);
                sortedSubList.sort(null);
                return sortedSubList;
            };

            futures.add(executorService.submit(sorter));
        }

        List<Integer> finalSortedList = new ArrayList<>();

        for (Future<List<Integer>> future : futures) {
            try {
                finalSortedList.addAll(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        finalSortedList.sort(null);
        System.out.println("Final sorted list: " + finalSortedList);
        executorService.shutdown();
    }
}

在这段代码中,将一个整数列表分成4个部分,每个部分由一个Callable任务并行排序,最后合并并再次排序得到最终结果。

  • 任务取消:在一些场景下,可能需要在任务执行过程中取消任务。例如,用户在一个长时间运行的操作中点击了取消按钮,这时可以通过Futurecancel()方法尝试取消任务。
import java.util.concurrent.*;

public class CancelableTaskExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Callable<String> callable = () -> {
            for (int i = 0; i < 10; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    throw new InterruptedException();
                }
                Thread.sleep(1000);
                System.out.println("Task is running: " + i);
            }
            return "Task completed";
        };

        Future<String> future = executorService.submit(callable);

        try {
            Thread.sleep(3500);
            boolean cancelled = future.cancel(true);
            System.out.println("Task cancelled: " + cancelled);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

在上述代码中,Callable任务在执行过程中会检查线程是否被中断。主线程在任务执行3.5秒后尝试取消任务,如果任务正在执行,会因为线程中断而提前结束。

7. 注意事项

  • 阻塞问题Future.get()方法是阻塞的,如果在调用此方法时任务还未完成,调用线程会一直等待,直到任务完成。这可能会导致死锁或性能问题,尤其是在多个线程互相等待对方的Future结果时。为避免死锁,可以使用get(long timeout, TimeUnit unit)方法设置等待超时时间。

  • 异常处理:任务执行过程中抛出的异常会被封装在ExecutionException中,通过get()方法获取结果时会重新抛出。因此在调用get()方法时需要妥善处理ExecutionExceptionInterruptedException

  • 内存泄漏:如果Future对象没有被正确管理,例如在任务完成后没有及时释放相关资源,可能会导致内存泄漏。特别是在使用线程池时,如果Future对象一直持有对任务的引用,而任务又持有大量资源,可能会影响系统性能。

  • 任务取消的不确定性:调用Future.cancel(true)方法尝试取消任务时,并不保证任务一定能被取消。如果任务在执行一些无法中断的操作(如I/O操作),可能无法响应中断请求,导致任务无法取消。在编写Callable任务时,应尽量使任务可中断,例如在循环中定期检查Thread.currentThread().isInterrupted()

通过合理使用FutureCallable接口,Java开发者可以更加灵活地控制并发任务,提高程序的性能和响应性,同时避免一些常见的并发编程问题。无论是异步计算、并行处理还是任务取消等场景,这两个接口都为我们提供了强大的工具。在实际应用中,需要根据具体需求,结合线程池、FutureTaskCompletionService等相关类,选择最合适的方式来实现高效的并发编程。