MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

深入理解Java中的Future和CompletableFuture

2021-09-195.2k 阅读

Java中的Future接口

在Java的并发编程领域,Future接口是一个重要的存在。它为异步任务的执行提供了一种机制,允许我们在任务执行完毕后获取其结果。

Future接口的定义与功能

Future接口位于java.util.concurrent包中,其定义如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
  1. cancel(boolean mayInterruptIfRunning):尝试取消任务的执行。如果任务已经完成、已经被取消,或者由于某些原因无法取消,则此方法会失败。如果任务尚未开始,它将永远不会运行。如果任务正在运行,并且mayInterruptIfRunningtrue,则会尝试中断执行该任务的线程。
  2. isCancelled():判断任务是否在正常完成之前被取消。
  3. isDone():判断任务是否已经完成。任务完成可能是正常结束、异常结束或者被取消。
  4. get():等待任务完成,然后获取其结果。如果任务尚未完成,调用此方法的线程将被阻塞,直到任务完成。如果任务因异常而结束,此方法将抛出ExecutionException,其中包含任务执行时抛出的异常。如果调用线程在等待过程中被中断,此方法将抛出InterruptedException
  5. get(long timeout, TimeUnit unit):这是一个带超时的get方法。它会等待任务完成,但最多等待指定的时间。如果在超时时间内任务未完成,此方法将抛出TimeoutException

使用Future的简单示例

假设我们有一个耗时的任务,比如从网络下载一个大文件。我们可以使用Future来异步执行这个任务,同时主线程可以继续执行其他操作,然后在需要的时候获取下载的结果。

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> future = executorService.submit(() -> {
            // 模拟一个耗时操作,比如下载文件
            Thread.sleep(3000);
            return "File downloaded successfully";
        });

        // 主线程可以继续执行其他任务
        System.out.println("Main thread is doing other things...");

        try {
            // 获取异步任务的结果,最多等待2秒
            String result = future.get(2, TimeUnit.SECONDS);
            System.out.println(result);
        } catch (TimeoutException e) {
            System.out.println("Task timed out");
            future.cancel(true);
        } finally {
            executorService.shutdown();
        }
    }
}

在这个例子中,我们创建了一个ExecutorService并提交了一个异步任务。主线程继续执行其他操作,然后尝试获取任务的结果。如果任务在2秒内未完成,将抛出TimeoutException,并且我们会取消任务。

Future的局限性

尽管Future提供了异步执行任务并获取结果的能力,但它存在一些局限性:

  1. 缺乏对结果的进一步处理:一旦调用get()方法获取到结果,Future本身并没有提供方便的机制来对结果进行链式处理。例如,如果获取的结果需要进行进一步的计算或者转换,我们需要手动编写代码来处理。
  2. 阻塞问题get()方法会阻塞调用线程,直到任务完成。这在某些场景下可能会导致性能问题,特别是当有多个异步任务需要等待结果时,可能会出现线程的不必要等待。
  3. 错误处理不够灵活Future中的异常处理相对简单,通过ExecutionException来包装任务执行时抛出的异常。但对于复杂的异步任务链,这种方式不够灵活,难以进行统一的错误处理。

CompletableFuture的出现与优势

为了克服Future的局限性,Java 8引入了CompletableFuture类。CompletableFuture实现了Future接口和CompletionStage接口,提供了更强大的异步编程能力。

CompletionStage接口

CompletionStage接口定义了一系列方法,用于支持异步任务的链式调用、组合和错误处理。它是CompletableFuture强大功能的基础。

  1. 链式调用CompletionStage允许我们将多个异步任务链接在一起,前一个任务的结果可以作为后一个任务的输入。
  2. 组合任务:可以将多个异步任务组合成一个新的任务,例如并行执行多个任务并等待所有任务完成后再进行下一步操作。
  3. 错误处理:提供了更灵活的错误处理机制,能够在任务链的任何位置捕获和处理异常。

CompletableFuture的创建

CompletableFuture提供了多种创建实例的方式:

  1. CompletableFuture.supplyAsync(Supplier<U> supplier):创建一个异步任务,任务的返回值由Supplier提供。任务会在默认的ForkJoinPool.commonPool()中执行。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello, CompletableFuture!");
  1. CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor):与上一个方法类似,但可以指定执行任务的Executor
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Custom executor", executor);
  1. CompletableFuture.runAsync(Runnable runnable):创建一个异步任务,任务没有返回值,由Runnable定义。同样在ForkJoinPool.commonPool()中执行。
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> System.out.println("Running async task without return value"));
  1. CompletableFuture.runAsync(Runnable runnable, Executor executor):可以指定执行Runnable任务的Executor
CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> System.out.println("Running with custom executor"), executor);

CompletableFuture的结果处理

  1. thenApply(Function<? super U,? extends V> fn):当CompletableFuture完成时,将其结果作为参数传递给Function,并返回一个新的CompletableFuture,其结果是Function的返回值。
CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s -> s + ", World")
                .thenAccept(System.out::println);

在这个例子中,首先异步返回"Hello",然后通过thenApply将其转换为"Hello, World",最后通过thenAccept打印结果。

  1. thenApplyAsync(Function<? super U,? extends V> fn):与thenApply类似,但Function会在一个新的线程中执行(默认使用ForkJoinPool.commonPool())。
CompletableFuture.supplyAsync(() -> "Hello")
                .thenApplyAsync(s -> s + ", World")
                .thenAccept(System.out::println);
  1. thenAccept(Consumer<? super U> action):当CompletableFuture完成时,将其结果作为参数传递给Consumer,但不返回新的结果。
CompletableFuture.supplyAsync(() -> "Hello")
                .thenAccept(System.out::println);
  1. thenRun(Runnable action):当CompletableFuture完成时,执行给定的Runnable,不关心CompletableFuture的结果。
CompletableFuture.supplyAsync(() -> "Hello")
                .thenRun(() -> System.out.println("Task completed"));

CompletableFuture的异常处理

  1. exceptionally(Function<Throwable,? extends U> fn):当CompletableFuture出现异常时,将异常作为参数传递给Function,并返回一个新的CompletableFuture,其结果是Function的返回值。
CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("Simulated error");
    }
    return "Success";
})
.exceptionally(ex -> {
    System.out.println("Caught exception: " + ex.getMessage());
    return "Default value";
})
.thenAccept(System.out::println);

在这个例子中,如果异步任务抛出异常,exceptionally中的Function会捕获异常并返回一个默认值。

  1. handle(BiFunction<? super U, Throwable,? extends V> fn):无论CompletableFuture是正常完成还是出现异常,都会将结果(如果正常完成)或异常(如果出现异常)作为参数传递给BiFunction,并返回一个新的CompletableFuture,其结果是BiFunction的返回值。
CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("Simulated error");
    }
    return "Success";
})
.handle((result, ex) -> {
    if (ex != null) {
        System.out.println("Caught exception: " + ex.getMessage());
        return "Default value";
    }
    return result;
})
.thenAccept(System.out::println);

CompletableFuture的任务组合

多个任务并行执行并等待所有任务完成

CompletableFuture提供了allOf方法来实现多个任务并行执行并等待所有任务完成。

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Result A";
});

CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Result B";
});

CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureA, futureB);
allFutures.thenRun(() -> {
    try {
        System.out.println(futureA.get());
        System.out.println(futureB.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}).join();

在这个例子中,futureAfutureB并行执行,allOf方法返回一个新的CompletableFuture,当futureAfutureB都完成时,这个新的CompletableFuture才完成。

多个任务并行执行,只要有一个任务完成就返回

CompletableFutureanyOf方法可以实现多个任务并行执行,只要有一个任务完成就返回其结果。

CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Result C";
});

CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Result D";
});

CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futureC, futureD);
anyFuture.thenAccept(result -> System.out.println("First completed task result: " + result)).join();

在这个例子中,futureCfutureD并行执行,anyOf方法返回一个新的CompletableFuture,当futureCfutureD其中一个完成时,这个新的CompletableFuture就完成,并返回第一个完成的任务的结果。

CompletableFuture的实际应用场景

电商系统中的商品信息获取

在电商系统中,获取商品信息可能涉及多个异步操作,比如从数据库获取商品基本信息、从图片服务器获取商品图片URL、从评价系统获取商品评价等。

CompletableFuture<String> productInfoFuture = CompletableFuture.supplyAsync(() -> {
    // 从数据库获取商品基本信息
    return "Product basic info";
});

CompletableFuture<String> productImageFuture = CompletableFuture.supplyAsync(() -> {
    // 从图片服务器获取商品图片URL
    return "http://example.com/image.jpg";
});

CompletableFuture<String> productReviewFuture = CompletableFuture.supplyAsync(() -> {
    // 从评价系统获取商品评价
    return "4.5 stars";
});

CompletableFuture<Void> allProductFutures = CompletableFuture.allOf(productInfoFuture, productImageFuture, productReviewFuture);
allProductFutures.thenRun(() -> {
    try {
        System.out.println("Product info: " + productInfoFuture.get());
        System.out.println("Product image: " + productImageFuture.get());
        System.out.println("Product review: " + productReviewFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}).join();

通过CompletableFuture,可以并行执行这些异步操作,提高系统的响应速度。

搜索引擎中的多数据源查询

搜索引擎在查询结果时,可能需要从多个数据源获取数据,比如网页数据库、新闻数据库、图片数据库等。

CompletableFuture<List<String>> webSearchFuture = CompletableFuture.supplyAsync(() -> {
    // 从网页数据库查询结果
    return Arrays.asList("Web result 1", "Web result 2");
});

CompletableFuture<List<String>> newsSearchFuture = CompletableFuture.supplyAsync(() -> {
    // 从新闻数据库查询结果
    return Arrays.asList("News result 1", "News result 2");
});

CompletableFuture<List<String>> imageSearchFuture = CompletableFuture.supplyAsync(() -> {
    // 从图片数据库查询结果
    return Arrays.asList("Image result 1", "Image result 2");
});

CompletableFuture<Void> allSearchFutures = CompletableFuture.allOf(webSearchFuture, newsSearchFuture, imageSearchFuture);
allSearchFutures.thenRun(() -> {
    try {
        List<String> allResults = new ArrayList<>();
        allResults.addAll(webSearchFuture.get());
        allResults.addAll(newsSearchFuture.get());
        allResults.addAll(imageSearchFuture.get());
        System.out.println("All search results: " + allResults);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}).join();

通过CompletableFuture将多个数据源的查询并行化,快速整合查询结果返回给用户。

总结CompletableFuture的性能与注意事项

性能方面

  1. 并行执行优势CompletableFuture通过并行执行多个异步任务,充分利用多核CPU的优势,显著提高系统的性能和响应速度。例如在电商系统的商品信息获取场景中,并行获取商品基本信息、图片URL和评价,比顺序执行这些操作要快得多。
  2. 减少线程阻塞:与Future相比,CompletableFuture的非阻塞式结果处理方式,避免了线程长时间等待任务完成,减少了线程资源的浪费,提高了系统的并发处理能力。

注意事项

  1. 线程池使用:在使用CompletableFuture时,要注意合理选择线程池。如果使用默认的ForkJoinPool.commonPool(),可能会在高并发场景下出现线程饥饿问题。特别是在执行大量计算密集型任务时,建议创建自定义的线程池,根据系统资源和任务特点进行配置。
  2. 异常处理:虽然CompletableFuture提供了灵活的异常处理机制,但在复杂的任务链中,要确保异常能够被正确捕获和处理。如果在某个环节遗漏了异常处理,可能会导致程序出现难以调试的错误。
  3. 内存管理:由于CompletableFuture涉及异步任务的执行和结果处理,要注意避免内存泄漏问题。特别是在处理大量异步任务时,要及时释放不再使用的资源。

通过深入理解FutureCompletableFuture,开发者能够在Java并发编程中更加灵活、高效地处理异步任务,提升应用程序的性能和响应能力。无论是开发大型分布式系统,还是小型的桌面应用,这些知识都将是非常有价值的。