MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

使用Java CompletableFuture实现并行计算

2021-03-273.7k 阅读

Java CompletableFuture简介

在Java编程领域,随着硬件性能的不断提升,充分利用多核处理器进行并行计算成为提高程序执行效率的关键手段。CompletableFuture作为Java 8引入的重要类,为实现并行计算提供了强大且灵活的工具。它不仅扩展了Future接口,还增加了异步计算、回调处理、组合操作等丰富功能,使得在Java中处理异步任务和并行计算变得更加便捷和高效。

CompletableFuture类实现了FutureCompletionStage接口。Future接口主要用于异步任务的结果获取,但它的局限性在于需要阻塞等待任务完成才能获取结果。而CompletionStage接口则提供了一种更灵活的方式来处理异步计算的结果,支持链式调用、组合操作等。CompletableFuture将两者的优点结合起来,允许我们以更优雅的方式编写异步和并行代码。

创建CompletableFuture实例

直接创建已完成的CompletableFuture

通过CompletableFuture.completedFuture方法可以直接创建一个已完成的CompletableFuture实例,该实例已经包含了指定的计算结果。

CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("任务已完成");
String result = completedFuture.join();
System.out.println(result);

在上述代码中,completedFuture在创建时就已经完成,join方法用于获取其结果,这里会直接输出“任务已完成”。

使用supplyAsync方法异步执行有返回值的任务

CompletableFuture.supplyAsync方法用于异步执行一个有返回值的任务。它接受一个Supplier作为参数,该Supplier定义了具体的计算逻辑。任务会在一个默认的ForkJoinPool.commonPool()线程池中执行。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一些复杂计算
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 42;
});
future.thenAccept(result -> System.out.println("计算结果: " + result));

在这段代码中,supplyAsync方法中的Supplier会在后台线程中执行模拟的复杂计算(这里通过Thread.sleep模拟耗时操作)。thenAccept方法用于在任务完成后处理结果,当任务执行完毕,会输出“计算结果: 42”。

使用runAsync方法异步执行无返回值的任务

CompletableFuture.runAsync方法用于异步执行一个无返回值的任务。它接受一个Runnable作为参数,同样任务会在默认的线程池中执行。

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // 模拟一些无返回值的操作
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("无返回值任务执行完毕");
});
future.join();

上述代码中,runAsync方法中的Runnable在后台线程执行模拟操作,执行完毕后输出“无返回值任务执行完毕”。join方法在这里用于等待任务完成。

处理CompletableFuture的结果

使用thenApply方法处理结果并返回新的结果

thenApply方法用于在CompletableFuture完成后,对其结果进行处理并返回一个新的结果。它接受一个Function作为参数,该Function定义了对结果的处理逻辑。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> processedFuture = future.thenApply(result -> result * 2);
processedFuture.thenAccept(finalResult -> System.out.println("最终结果: " + finalResult));

在这段代码中,首先supplyAsync创建了一个异步任务返回值为10。然后thenApply对这个结果进行处理,将其乘以2,最后thenAccept输出最终结果“最终结果: 20”。

使用thenAccept方法处理结果但不返回新结果

thenAccept方法用于在CompletableFuture完成后,对其结果进行处理但不返回新的结果。它接受一个Consumer作为参数,该Consumer定义了对结果的处理逻辑。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello, CompletableFuture");
future.thenAccept(result -> System.out.println("处理结果: " + result));

这里supplyAsync返回一个字符串,thenAccept方法只是简单地输出这个结果“处理结果: Hello, CompletableFuture”。

使用thenRun方法在任务完成后执行无结果依赖的操作

thenRun方法用于在CompletableFuture完成后,执行一个无结果依赖的操作。它接受一个Runnable作为参数,该Runnable定义的操作不依赖于任务的结果。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 5);
future.thenRun(() -> System.out.println("任务已完成,执行后续操作"));

在上述代码中,supplyAsync返回一个整数5,但thenRun中的Runnable并不关心这个结果,只是在任务完成后输出“任务已完成,执行后续操作”。

异常处理

使用exceptionally方法处理异常

exceptionally方法用于在CompletableFuture执行过程中出现异常时,提供一个备用的处理逻辑。它接受一个Function作为参数,该Function在捕获到异常时会被调用,返回一个备用结果。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("模拟异常");
    }
    return 10;
});
CompletableFuture<Integer> resultFuture = future.exceptionally(ex -> {
    System.out.println("捕获到异常: " + ex.getMessage());
    return -1;
});
resultFuture.thenAccept(finalResult -> System.out.println("最终结果: " + finalResult));

在这段代码中,supplyAsync中有一定概率抛出异常。如果异常发生,exceptionally中的Function会捕获异常并返回-1作为备用结果,最终输出“最终结果: -1”;如果没有异常,会输出“最终结果: 10”。

使用handle方法同时处理正常结果和异常

handle方法用于同时处理CompletableFuture的正常结果和异常情况。它接受一个BiFunction作为参数,第一个参数是正常结果(如果有异常则为null),第二个参数是异常(如果没有异常则为null),返回一个新的结果。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("模拟异常");
    }
    return 10;
});
CompletableFuture<String> resultFuture = future.handle((result, ex) -> {
    if (ex != null) {
        System.out.println("捕获到异常: " + ex.getMessage());
        return "处理异常";
    } else {
        return "正常结果: " + result;
    }
});
resultFuture.thenAccept(finalResult -> System.out.println(finalResult));

这里handle方法根据是否有异常返回不同的处理结果。如果有异常,输出“捕获到异常: 模拟异常”以及“处理异常”;如果没有异常,输出“正常结果: 10”。

组合CompletableFuture

使用thenCompose方法组合两个CompletableFuture

thenCompose方法用于组合两个CompletableFuture,第一个CompletableFuture的结果作为第二个CompletableFuture的输入。它接受一个Function作为参数,该Function返回一个新的CompletableFuture

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = future1.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + ", World"));
future2.thenAccept(finalResult -> System.out.println(finalResult));

在这段代码中,future1返回“Hello”,thenCompose使用这个结果作为输入创建并返回future2future2返回“Hello, World”,最后输出“Hello, World”。

使用thenCombine方法合并两个CompletableFuture的结果

thenCombine方法用于合并两个CompletableFuture的结果。它接受另一个CompletableFuture和一个BiFunction作为参数,BiFunction用于处理两个CompletableFuture的结果并返回一个新的结果。

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 * result2);
combinedFuture.thenAccept(finalResult -> System.out.println("相乘结果: " + finalResult));

这里future1返回2,future2返回3,thenCombine使用BiFunction将两个结果相乘,最终输出“相乘结果: 6”。

使用allOf方法等待所有CompletableFuture完成

allOf方法用于等待所有给定的CompletableFuture都完成。它接受多个CompletableFuture作为参数,返回一个新的CompletableFuture,当所有输入的CompletableFuture都完成时,这个新的CompletableFuture才完成。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "结果1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "结果2";
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
allFuture.join();
String result1 = future1.join();
String result2 = future2.join();
System.out.println("所有任务完成,结果1: " + result1 + ",结果2: " + result2);

在这段代码中,future1future2分别模拟了不同耗时的任务。allOf方法等待它们都完成,然后通过join方法获取各自的结果并输出。

使用anyOf方法等待任意一个CompletableFuture完成

anyOf方法用于等待任意一个给定的CompletableFuture完成。它接受多个CompletableFuture作为参数,返回一个新的CompletableFuture,当任意一个输入的CompletableFuture完成时,这个新的CompletableFuture就完成,并且其结果就是第一个完成的CompletableFuture的结果。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "结果1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "结果2";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
String result = (String) anyFuture.join();
System.out.println("任意一个任务完成,结果: " + result);

这里future2耗时较短,会先完成,anyOf返回的CompletableFuture会在future2完成时完成,最终输出“任意一个任务完成,结果: 结果2”。

并行计算示例

假设我们有一个需求,需要计算一组数字的平方和立方,并将结果汇总。可以使用CompletableFuture实现并行计算。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ParallelComputingExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<Integer> numbers = List.of(1, 2, 3, 4, 5);
        List<CompletableFuture<Integer>> futures = new ArrayList<>();

        for (int number : numbers) {
            CompletableFuture<Integer> squareFuture = CompletableFuture.supplyAsync(() -> square(number));
            CompletableFuture<Integer> cubeFuture = CompletableFuture.supplyAsync(() -> cube(number));

            CompletableFuture<Integer> combinedFuture = squareFuture.thenCombine(cubeFuture, (squareResult, cubeResult) -> squareResult + cubeResult);
            futures.add(combinedFuture);
        }

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        allFutures.join();

        int total = 0;
        for (CompletableFuture<Integer> future : futures) {
            total += future.get();
        }

        System.out.println("汇总结果: " + total);
    }

    private static int square(int number) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return number * number;
    }

    private static int cube(int number) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return number * number * number;
    }
}

在上述代码中,首先对每个数字创建两个异步任务,分别计算平方和立方,然后使用thenCombine合并这两个任务的结果。所有的合并任务添加到futures列表中,通过allOf等待所有任务完成。最后汇总所有任务的结果并输出。这里通过Thread.sleep模拟了耗时操作,充分展示了并行计算在提高效率方面的优势。

自定义线程池

在使用CompletableFuture进行异步计算时,默认情况下会使用ForkJoinPool.commonPool()线程池。然而,在某些场景下,我们可能需要自定义线程池以满足特定的需求,比如控制线程数量、设置线程优先级等。

创建自定义线程池

我们可以使用ThreadPoolExecutor来创建自定义线程池。以下是一个简单的示例:

import java.util.concurrent.*;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(5)
        );

        CompletableFuture.supplyAsync(() -> {
            // 异步任务逻辑
            System.out.println("任务在自定义线程池中执行");
            return "任务结果";
        }, executor)
       .thenAccept(result -> System.out.println("结果: " + result))
       .whenComplete((r, ex) -> executor.shutdown());
    }
}

在上述代码中,ThreadPoolExecutor创建了一个初始线程数为2,最大线程数为4,线程存活时间为10秒,任务队列容量为5的线程池。CompletableFuture.supplyAsync方法将异步任务提交到这个自定义线程池中执行。当任务完成后,通过whenComplete方法关闭线程池。

线程池参数的选择

  1. 核心线程数(corePoolSize):这是线程池在正常情况下保持的活动线程数。即使这些线程处于空闲状态,它们也不会被销毁,除非设置了allowCoreThreadTimeOuttrue。在上述示例中,核心线程数设置为2,意味着线程池在启动后会立即创建2个线程。
  2. 最大线程数(maximumPoolSize):这是线程池允许创建的最大线程数。当任务队列已满且活动线程数小于最大线程数时,线程池会创建新的线程来处理任务。在示例中,最大线程数设置为4,表明线程池最多可以有4个线程同时执行任务。
  3. 存活时间(keepAliveTime):当线程池中的线程数超过核心线程数时,多余的空闲线程在等待新任务到来的这段时间内,如果超过了存活时间,就会被销毁。这里设置为10秒,即如果一个线程空闲10秒且线程池中的线程数大于核心线程数,该线程将被销毁。
  4. 时间单位(unit):存活时间的时间单位,这里使用TimeUnit.SECONDS表示秒。
  5. 任务队列(workQueue):用于存储等待执行的任务。ArrayBlockingQueue是一个有界队列,容量为5,意味着最多可以有5个任务在队列中等待执行。如果队列已满且活动线程数小于最大线程数,线程池会创建新线程;如果队列已满且活动线程数达到最大线程数,新任务可能会根据RejectedExecutionHandler策略进行处理。

不同类型的任务队列

  1. 有界队列(如ArrayBlockingQueue):使用有界队列可以控制任务积压的数量,避免任务无限增长导致内存耗尽。但如果队列容量设置过小,可能会频繁创建新线程;如果设置过大,可能会导致任务长时间等待。
  2. 无界队列(如LinkedBlockingQueue):无界队列可以容纳无限数量的任务,这在一定程度上可以避免任务被拒绝。然而,如果任务提交速度过快,可能会导致内存占用不断增加,最终引发内存溢出问题。
  3. 同步移交队列(SynchronousQueue)SynchronousQueue没有容量,每个插入操作必须等待另一个线程的移除操作,反之亦然。这种队列适用于任务处理速度非常快的场景,因为它不会缓存任务,而是直接将任务传递给线程执行。

通过合理选择线程池参数和任务队列类型,可以优化CompletableFuture在并行计算中的性能,使其更好地适应不同的应用场景需求。

性能优化与注意事项

减少不必要的异步任务创建

虽然CompletableFuture提供了强大的异步处理能力,但过多的异步任务创建会带来额外的开销,包括线程创建、调度和上下文切换等。在编写代码时,应尽量避免不必要的异步任务创建。例如,如果某些操作本身并不耗时,或者可以在主线程中快速完成,就没有必要将其包装成异步任务。

合理设置线程池大小

线程池大小的设置对性能有重要影响。如果线程池过小,可能无法充分利用多核处理器的性能,导致任务长时间等待;如果线程池过大,过多的线程竞争资源会增加系统开销,反而降低性能。一般来说,可以根据任务的类型(CPU密集型或I/O密集型)来估算合适的线程池大小。对于CPU密集型任务,线程池大小可以设置为CPU核心数加1;对于I/O密集型任务,由于线程在等待I/O操作时会处于空闲状态,可以适当增加线程池大小,例如设置为CPU核心数的2倍或更多。

避免死锁

在使用CompletableFuture进行组合操作时,要特别注意避免死锁。例如,当两个CompletableFuture相互依赖对方的结果时,就可能发生死锁。如下代码示例:

CompletableFuture<Integer> future1 = new CompletableFuture<>();
CompletableFuture<Integer> future2 = new CompletableFuture<>();

future1.thenApplyAsync(result1 -> {
    int result2 = future2.join();
    return result1 + result2;
});

future2.thenApplyAsync(result2 -> {
    int result1 = future1.join();
    return result1 + result2;
});

future1.complete(1);
future2.complete(2);

在上述代码中,future1依赖future2的结果,future2又依赖future1的结果,这就形成了死锁。为了避免这种情况,在设计异步任务的依赖关系时,要确保不会出现循环依赖。

及时处理异常

在异步任务执行过程中,异常可能不会像在同步代码中那样立即抛出并被捕获。如果不及时处理CompletableFuture中的异常,可能会导致程序出现难以排查的问题。因此,在使用CompletableFuture时,要始终使用exceptionallyhandle等方法来处理可能出现的异常,确保程序的健壮性。

内存管理

大量的异步任务可能会消耗大量的内存,尤其是当任务持有大量数据或创建了许多中间对象时。要注意及时释放不再使用的资源,避免内存泄漏。例如,在任务完成后,及时清理相关的缓存数据或关闭文件句柄等资源。

通过遵循这些性能优化建议和注意事项,可以使基于CompletableFuture的并行计算代码更加高效、稳定地运行,充分发挥Java在并行编程方面的优势。

在实际应用中,CompletableFuture的强大功能使得它在各种场景下都能发挥重要作用,无论是Web应用中的异步请求处理,还是大数据处理中的并行计算任务,都能通过合理使用CompletableFuture来提升系统的性能和响应速度。通过深入理解其原理和特性,并结合实际需求进行优化,开发人员可以编写出更加高效、优雅的Java代码。