MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture supplyAsync异步任务创建技巧

2024-05-065.6k 阅读

Java CompletableFuture supplyAsync 异步任务创建技巧

理解 CompletableFuture

在 Java 并发编程领域,CompletableFuture 是 Java 8 引入的一个强大工具,它极大地简化了异步编程。CompletableFuture 代表一个异步计算的结果,它不仅可以手动完成(设置计算结果),还支持一系列函数式编程风格的组合操作,比如 thenApplythenAcceptthenRun 等,这些操作可以方便地将多个异步任务串联起来,形成复杂的异步计算流程。

supplyAsync 方法基础

CompletableFuturesupplyAsync 方法用于异步执行有返回值的任务。它有两个重载形式:

  1. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  2. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

第一个重载形式使用默认的 ForkJoinPool.commonPool() 作为执行任务的线程池。第二个重载形式允许我们指定一个自定义的 Executor 来执行任务。Supplier 是一个函数式接口,它只有一个 get 方法,该方法不接受参数并返回一个结果,这正好适合用来定义异步执行的有返回值的任务。

简单示例

下面是一个简单的使用 supplyAsync 创建异步任务的示例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务执行完成";
        });

        // 获取异步任务的结果,如果任务还未完成,get 方法会阻塞
        String result = future.get();
        System.out.println(result);
    }
}

在这个示例中,supplyAsync 方法接受一个 Supplier,在这个 Supplierget 方法中模拟了一个耗时 2 秒的操作,然后返回一个字符串。future.get() 方法用于获取异步任务的结果,如果任务尚未完成,它会阻塞当前线程直到任务完成。

自定义线程池

使用默认的 ForkJoinPool.commonPool() 可能在某些情况下不能满足需求,比如需要控制线程池的大小、线程的优先级等。这时我们可以使用第二个重载形式,传入一个自定义的 Executor

import java.util.concurrent.*;

public class CustomExecutorExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务执行完成";
        }, executor);

        String result = future.get();
        System.out.println(result);

        // 关闭线程池
        executor.shutdown();
    }
}

在这个示例中,我们创建了一个固定大小为 5 的线程池 executor,并将其作为参数传递给 supplyAsync 方法。这样异步任务就会在我们自定义的线程池中执行。任务完成后,我们调用 executor.shutdown() 方法关闭线程池。

链式调用与组合操作

CompletableFuture 的强大之处在于它支持链式调用和各种组合操作。当 supplyAsync 创建的异步任务完成后,我们可以使用 thenApplythenAcceptthenRun 等方法进行后续处理。

  • thenApply:接受一个 Function,将异步任务的结果作为输入,返回一个新的 CompletableFuture,新的 CompletableFuture 的结果是 Function 应用后的返回值。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "初始结果")
               .thenApply(result -> result + " 经过处理");

        String finalResult = future.get();
        System.out.println(finalResult);
    }
}

在这个示例中,supplyAsync 创建的异步任务返回字符串 “初始结果”,然后 thenApply 方法接受这个结果,并在其基础上进行处理,返回新的字符串 “初始结果 经过处理”。

  • thenAccept:接受一个 Consumer,将异步任务的结果作为输入,但是不返回新的 CompletableFuture,它主要用于执行一些不需要返回值的操作,比如打印结果。
import java.util.concurrent.CompletableFuture;

public class ThenAcceptExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "任务结果")
               .thenAccept(result -> System.out.println("处理结果: " + result));
    }
}

在这个示例中,supplyAsync 创建的异步任务返回字符串 “任务结果”,thenAccept 方法接受这个结果并打印出来。

  • thenRun:接受一个 Runnable,不接受异步任务的结果作为输入,也不返回新的 CompletableFuture,它主要用于在异步任务完成后执行一些与结果无关的操作。
import java.util.concurrent.CompletableFuture;

public class ThenRunExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "任务结果")
               .thenRun(() -> System.out.println("任务已完成,执行一些收尾操作"));
    }
}

在这个示例中,supplyAsync 创建的异步任务返回字符串 “任务结果”,thenRun 方法在任务完成后执行打印操作,它不关心任务的具体结果。

异常处理

在异步任务执行过程中,可能会出现异常。CompletableFuture 提供了多种方法来处理异常。

  • exceptionally:当异步任务抛出异常时,exceptionally 方法会捕获这个异常,并返回一个替代结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ExceptionallyExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("任务执行出错");
            }
            return "任务执行成功";
        })
               .exceptionally(ex -> {
                    System.out.println("捕获到异常: " + ex.getMessage());
                    return "默认结果";
                });

        String result = future.get();
        System.out.println(result);
    }
}

在这个示例中,supplyAsync 中的任务有 50% 的概率抛出异常。如果抛出异常,exceptionally 方法会捕获异常并返回 “默认结果”,同时打印异常信息。

  • handlehandle 方法既可以处理正常的结果,也可以处理异常情况。它接受一个 BiFunction,第一个参数是任务的结果(如果任务正常完成),第二个参数是异常(如果任务抛出异常)。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class HandleExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("任务执行出错");
            }
            return "任务执行成功";
        })
               .handle((result, ex) -> {
                    if (ex != null) {
                        System.out.println("捕获到异常: " + ex.getMessage());
                        return "默认结果";
                    }
                    return result + " 处理后";
                });

        String finalResult = future.get();
        System.out.println(finalResult);
    }
}

在这个示例中,如果任务正常完成,handle 方法会在结果后加上 “ 处理后”;如果任务抛出异常,handle 方法会捕获异常并返回 “默认结果”,同时打印异常信息。

多个异步任务的组合

在实际应用中,经常需要组合多个异步任务。CompletableFuture 提供了一些方法来实现这一点。

  • thenCombine:将两个 CompletableFuture 的结果合并,返回一个新的 CompletableFuture
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenCombineExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2");

        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (r1, r2) -> r1 + " 和 " + r2);

        String result = combinedFuture.get();
        System.out.println(result);
    }
}

在这个示例中,future1future2 是两个异步任务,thenCombine 方法将它们的结果合并,返回一个新的 CompletableFuture,其结果是 “结果1 和 结果2”。

  • allOf:等待所有的 CompletableFuture 都完成。allOf 方法返回一个 CompletableFuture<Void>,当所有输入的 CompletableFuture 都完成时,这个 CompletableFuture<Void> 才会完成。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AllOfExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务1完成";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务2完成";
        });

        CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);

        allFuture.get(); // 等待所有任务完成
        System.out.println(future1.get() + ", " + future2.get());
    }
}

在这个示例中,future1future2 是两个异步任务,allOf 方法返回一个 CompletableFuture<Void>,我们调用 allFuture.get() 方法等待所有任务完成,然后再获取 future1future2 的结果并打印。

  • anyOf:只要有一个 CompletableFuture 完成,anyOf 方法返回的 CompletableFuture 就会完成,其结果是第一个完成的 CompletableFuture 的结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AnyOfExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务1完成";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务2完成";
        });

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);

        Object result = anyFuture.get();
        System.out.println(result);
    }
}

在这个示例中,future2 会先完成,所以 anyFuture.get() 获取到的结果是 “任务2完成”。

与传统异步编程方式的对比

在 Java 8 引入 CompletableFuture 之前,实现异步编程主要依赖 FutureCallable 接口。Future 接口提供了一种获取异步任务结果的方式,但是它有一些局限性。例如,Future 只能通过 get 方法阻塞获取结果,无法方便地进行链式调用和组合操作。而 CompletableFuture 不仅继承了 Future 接口,还提供了丰富的函数式编程风格的方法,使得异步编程更加简洁和灵活。

下面是一个使用 FutureCallable 的示例:

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Callable<String> task = () -> {
            Thread.sleep(2000);
            return "任务执行完成";
        };

        Future<String> future = executor.submit(task);
        String result = future.get();
        System.out.println(result);

        executor.shutdown();
    }
}

对比这个示例和前面使用 CompletableFuture 的示例,可以明显看出 CompletableFuture 在异步任务的创建、组合和结果处理上更加简洁和直观。

实际应用场景

  1. 网络请求:在进行多个网络请求时,可以使用 CompletableFuture 异步发起请求,并在所有请求完成后进行统一处理。例如,一个电商应用可能需要同时从多个供应商获取商品价格,然后比较价格并选择最优的供应商。
  2. 数据处理流水线:在大数据处理场景中,数据可能需要经过多个处理步骤,每个步骤可以作为一个异步任务,使用 CompletableFuture 可以方便地构建数据处理流水线,提高处理效率。
  3. 并行计算:对于一些可以并行执行的计算任务,CompletableFuture 可以充分利用多核 CPU 的优势,通过创建多个异步任务并行执行,然后合并结果,加快计算速度。

性能考虑

虽然 CompletableFuture 为异步编程带来了很多便利,但在使用时也需要考虑性能问题。

  1. 线程池的选择:选择合适的线程池非常重要。如果使用默认的 ForkJoinPool.commonPool(),在高并发场景下可能会出现线程饥饿等问题。对于 I/O 密集型任务,可以适当增加线程池的大小;对于 CPU 密集型任务,线程池大小应该根据 CPU 核心数进行合理设置。
  2. 任务粒度:任务的粒度不宜过大或过小。如果任务粒度太大,可能无法充分利用多核优势;如果任务粒度太小,线程的创建和切换开销可能会影响性能。需要根据具体业务场景进行权衡。
  3. 异常处理开销:过多的异常处理可能会带来一定的性能开销,尤其是在高并发场景下。因此,在编写异步任务时,应该尽量避免不必要的异常抛出,提高代码的健壮性。

总结常用技巧

  1. 合理选择线程池:根据任务类型(I/O 密集型或 CPU 密集型)和并发量,选择合适的线程池。对于简单应用,默认的 ForkJoinPool.commonPool() 可能够用,但对于复杂的高并发场景,自定义线程池可以更好地控制资源。
  2. 善用链式调用和组合操作:通过 thenApplythenAcceptthenRun 等方法进行链式调用,以及使用 thenCombineallOfanyOf 等方法组合多个异步任务,可以构建出复杂而高效的异步计算流程。
  3. 正确处理异常:使用 exceptionallyhandle 方法来处理异步任务中的异常,确保程序的稳定性和健壮性。
  4. 优化任务粒度:根据实际业务场景,合理划分任务粒度,避免过大或过小的任务粒度对性能产生不利影响。

通过深入理解和掌握 CompletableFuturesupplyAsync 方法以及相关的异步任务创建技巧,可以大大提高 Java 程序的异步编程能力,使程序在处理并发任务时更加高效和灵活。在实际应用中,需要根据具体的业务需求和性能要求,合理运用这些技巧,以达到最佳的编程效果。