MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture在复杂业务逻辑中的异步编程应用

2022-03-014.7k 阅读

Java CompletableFuture 基础概念

在Java 8引入CompletableFuture之前,处理异步操作和并发编程是一项相对复杂的任务。传统的Future接口存在一些局限性,例如无法手动完成Future、获取结果时需要阻塞线程等。CompletableFuture的出现极大地改善了这种情况,它不仅实现了Future接口,还提供了更强大的异步编程能力。

CompletableFuture代表一个异步计算的结果。它可以在计算完成后被获取,并且可以通过回调函数来处理计算结果,而不需要阻塞调用线程。这使得编写异步代码变得更加简洁和高效。

创建CompletableFuture

  1. 使用 CompletableFuture.supplyAsync 创建有返回值的异步任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        });

        String result = future.get();
        System.out.println(result);
    }
}

在上述代码中,CompletableFuture.supplyAsync 方法接受一个 Supplier 作为参数,该 Supplier 中的代码会在一个新的线程中异步执行。get 方法会阻塞当前线程,直到异步任务完成并返回结果。

  1. 使用 CompletableFuture.runAsync 创建无返回值的异步任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Async task completed without return value.");
        });

        future.get();
    }
}

CompletableFuture.runAsync 方法接受一个 Runnable 作为参数,执行的任务没有返回值。同样,get 方法用于等待任务完成。

处理异步任务的结果

1. thenApply 方法

thenApply 方法用于在异步任务完成后,对其结果进行转换。它接受一个 Function 作为参数,该 Function 会在异步任务完成后被调用,输入参数为异步任务的返回值,返回值为转换后的结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
               .thenApply(s -> s + ", CompletableFuture!");

        String result = future.get();
        System.out.println(result);
    }
}

在上述代码中,supplyAsync 方法返回一个包含 “Hello” 的 CompletableFuture,接着 thenApply 方法对这个结果进行转换,添加了 “, CompletableFuture!”,最终输出 “Hello, CompletableFuture!”。

2. thenAccept 方法

thenAccept 方法用于在异步任务完成后,消费其结果,但不返回新的结果。它接受一个 Consumer 作为参数,该 Consumer 会在异步任务完成后被调用,输入参数为异步任务的返回值。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenAcceptExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> "Hello")
               .thenAccept(s -> System.out.println(s + ", CompletableFuture!"));

        // 主线程不能立即退出,否则异步任务可能还未执行
        Thread.sleep(2000);
    }
}

在这个例子中,supplyAsync 方法返回一个包含 “Hello” 的 CompletableFuturethenAccept 方法对这个结果进行消费,将其打印出来。由于主线程需要等待异步任务执行完毕,所以使用 Thread.sleep 方法进行了短暂等待。

3. thenRun 方法

thenRun 方法用于在异步任务完成后,执行一个无参数的 Runnable。它不关心异步任务的返回值,只是在任务完成后执行给定的 Runnable

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenRunExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> "Hello")
               .thenRun(() -> System.out.println("Task completed, but I don't care about the result."));

        // 主线程不能立即退出,否则异步任务可能还未执行
        Thread.sleep(2000);
    }
}

在上述代码中,supplyAsync 方法返回结果 “Hello”,但 thenRun 方法并不关心这个结果,只是在任务完成后打印一条消息。同样,主线程需要等待异步任务执行完毕。

组合多个 CompletableFuture

在复杂业务逻辑中,经常需要组合多个异步任务。CompletableFuture提供了丰富的方法来实现这一点。

1. thenCompose 方法

thenCompose 方法用于将两个异步任务进行组合,第一个任务的结果作为第二个任务的输入。它接受一个 Function 作为参数,该 Function 返回一个新的 CompletableFuture

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenComposeExample {
    public static CompletableFuture<String> fetchUserInfo(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据库或网络获取用户信息
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "User " + userId + " info";
        });
    }

    public static CompletableFuture<String> processUserInfo(String userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟对用户信息进行处理
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Processed: " + userInfo;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = fetchUserInfo("123")
               .thenCompose(ThenComposeExample::processUserInfo);

        String result = future.get();
        System.out.println(result);
    }
}

在上述代码中,fetchUserInfo 方法异步获取用户信息,processUserInfo 方法异步处理用户信息。thenCompose 方法将这两个异步任务组合起来,使得第一个任务的结果能够作为第二个任务的输入。

2. thenCombine 方法

thenCombine 方法用于将两个异步任务的结果进行合并。它接受另一个 CompletableFuture 和一个 BiFunction 作为参数,BiFunction 的两个输入参数分别为两个异步任务的结果,返回值为合并后的结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenCombineExample {
    public static CompletableFuture<String> task1() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of task1";
        });
    }

    public static CompletableFuture<String> task2() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of task2";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = task1()
               .thenCombine(task2(), (r1, r2) -> r1 + " and " + r2);

        String result = future.get();
        System.out.println(result);
    }
}

在这个例子中,task1task2 是两个独立的异步任务,thenCombine 方法将它们的结果合并在一起,最终输出 “Result of task1 and Result of task2”。

3. allOf 方法

allOf 方法用于等待所有给定的 CompletableFuture 都完成。它接受多个 CompletableFuture 作为参数,返回一个新的 CompletableFuture<Void>。当所有传入的 CompletableFuture 都完成时,这个新的 CompletableFuture 才会完成。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AllOfExample {
    public static CompletableFuture<String> task1() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 1 completed";
        });
    }

    public static CompletableFuture<String> task2() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 2 completed";
        });
    }

    public static CompletableFuture<String> task3() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 3 completed";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> allTasksFuture = CompletableFuture.allOf(task1(), task2(), task3());

        allTasksFuture.get();
        System.out.println("All tasks completed.");
    }
}

在上述代码中,task1task2task3 是三个异步任务,allOf 方法会等待这三个任务都完成后,才使得 allTasksFuture 完成,然后打印 “All tasks completed.”。

4. anyOf 方法

anyOf 方法用于等待给定的 CompletableFuture 中任意一个完成。它接受多个 CompletableFuture 作为参数,返回一个新的 CompletableFuture,这个新的 CompletableFuture 会在任意一个传入的 CompletableFuture 完成时完成,其结果为第一个完成的 CompletableFuture 的结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AnyOfExample {
    public static CompletableFuture<String> task1() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 1 completed";
        });
    }

    public static CompletableFuture<String> task2() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 2 completed";
        });
    }

    public static CompletableFuture<String> task3() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task 3 completed";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Object> anyTaskFuture = CompletableFuture.anyOf(task1(), task2(), task3());

        String result = (String) anyTaskFuture.get();
        System.out.println("First completed task result: " + result);
    }
}

在这个例子中,task1task2task3 是三个异步任务,anyOf 方法会等待其中任意一个任务完成,然后 anyTaskFuture 就会完成,其结果为第一个完成的任务的结果。在这个例子中,task2 最快完成,所以最终输出 “First completed task result: Task 2 completed”。

处理异步任务的异常

在异步编程中,异常处理是非常重要的一部分。CompletableFuture提供了多种处理异常的方式。

1. exceptionally 方法

exceptionally 方法用于在异步任务发生异常时,提供一个备用的结果。它接受一个 Function 作为参数,该 Function 的输入参数为异常对象,返回值为备用结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ExceptionallyExample {
    public static CompletableFuture<String> potentiallyFailingTask() {
        return CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("Simulated exception");
            }
            return "Task completed successfully";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = potentiallyFailingTask()
               .exceptionally(ex -> {
                    System.out.println("Caught exception: " + ex.getMessage());
                    return "Default result";
                });

        String result = future.get();
        System.out.println("Result: " + result);
    }
}

在上述代码中,potentiallyFailingTask 方法模拟了一个可能会抛出异常的异步任务。exceptionally 方法捕获到异常后,打印异常信息,并返回一个默认结果 “Default result”。

2. handle 方法

handle 方法用于在异步任务完成或发生异常时,对结果或异常进行处理。它接受一个 BiFunction 作为参数,第一个输入参数为异步任务的结果(如果任务正常完成),第二个输入参数为异常对象(如果任务发生异常),返回值为处理后的结果。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class HandleExample {
    public static CompletableFuture<String> potentiallyFailingTask() {
        return CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("Simulated exception");
            }
            return "Task completed successfully";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = potentiallyFailingTask()
               .handle((result, ex) -> {
                    if (ex != null) {
                        System.out.println("Caught exception: " + ex.getMessage());
                        return "Default result";
                    }
                    return "Processed result: " + result;
                });

        String finalResult = future.get();
        System.out.println("Final result: " + finalResult);
    }
}

在这个例子中,handle 方法根据任务是否正常完成来进行不同的处理。如果任务发生异常,打印异常信息并返回默认结果;如果任务正常完成,对结果进行处理并返回。

CompletableFuture 在复杂业务逻辑中的应用场景

1. 电商系统中的商品详情页加载

在电商系统中,商品详情页可能需要从多个数据源获取信息,例如商品基本信息、库存信息、用户评价等。这些信息的获取可以通过异步任务并行执行,然后将结果合并展示。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ProductDetailExample {
    public static CompletableFuture<String> getProductInfo(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据库获取商品基本信息
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Product " + productId + " basic info";
        });
    }

    public static CompletableFuture<String> getStockInfo(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从库存系统获取库存信息
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Product " + productId + " stock info";
        });
    }

    public static CompletableFuture<String> getReviews(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从评价系统获取用户评价
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Product " + productId + " reviews";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String productId = "123";
        CompletableFuture<String> productInfoFuture = getProductInfo(productId);
        CompletableFuture<String> stockInfoFuture = getStockInfo(productId);
        CompletableFuture<String> reviewsFuture = getReviews(productId);

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(productInfoFuture, stockInfoFuture, reviewsFuture);

        allFutures.thenRun(() -> {
            try {
                String productInfo = productInfoFuture.get();
                String stockInfo = stockInfoFuture.get();
                String reviews = reviewsFuture.get();
                System.out.println("Product Detail: " + productInfo + ", " + stockInfo + ", " + reviews);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }).get();
    }
}

在上述代码中,getProductInfogetStockInfogetReviews 方法分别异步获取商品基本信息、库存信息和用户评价。allOf 方法等待这三个任务都完成后,将结果合并展示。

2. 分布式系统中的数据聚合

在分布式系统中,可能需要从多个节点获取数据并进行聚合。例如,一个分布式日志系统,需要从多个日志节点获取日志数据,然后进行分析和汇总。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class DistributedDataAggregationExample {
    public static CompletableFuture<String> fetchDataFromNode(String node) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从节点获取数据
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Data from " + node;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<String> nodes = new ArrayList<>();
        nodes.add("Node1");
        nodes.add("Node2");
        nodes.add("Node3");

        List<CompletableFuture<String>> futures = new ArrayList<>();
        for (String node : nodes) {
            futures.add(fetchDataFromNode(node));
        }

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        allFutures.thenRun(() -> {
            StringBuilder aggregatedData = new StringBuilder();
            for (CompletableFuture<String> future : futures) {
                try {
                    aggregatedData.append(future.get()).append(", ");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("Aggregated Data: " + aggregatedData.toString());
        }).get();
    }
}

在这个例子中,fetchDataFromNode 方法异步从每个节点获取数据。通过 allOf 方法等待所有数据获取完成后,将数据进行聚合并打印。

3. 搜索引擎中的多数据源搜索

搜索引擎可能需要从多个数据源(如网页数据库、文档库等)中搜索相关信息,然后将结果合并展示给用户。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class MultiSourceSearchExample {
    public static CompletableFuture<List<String>> searchWeb(String query) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟网页搜索
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            List<String> results = new ArrayList<>();
            results.add("Web result 1 for " + query);
            results.add("Web result 2 for " + query);
            return results;
        });
    }

    public static CompletableFuture<List<String>> searchDocuments(String query) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟文档搜索
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            List<String> results = new ArrayList<>();
            results.add("Document result 1 for " + query);
            results.add("Document result 2 for " + query);
            return results;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String query = "Java CompletableFuture";
        CompletableFuture<List<String>> webSearchFuture = searchWeb(query);
        CompletableFuture<List<String>> documentSearchFuture = searchDocuments(query);

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(webSearchFuture, documentSearchFuture);

        allFutures.thenRun(() -> {
            List<String> allResults = new ArrayList<>();
            try {
                allResults.addAll(webSearchFuture.get());
                allResults.addAll(documentSearchFuture.get());
                System.out.println("All search results: " + allResults);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }).get();
    }
}

在上述代码中,searchWebsearchDocuments 方法分别异步从网页和文档库中搜索相关信息。通过 allOf 方法等待两个搜索任务完成后,将结果合并展示。

CompletableFuture 的线程池使用

在默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 来执行异步任务。然而,在某些情况下,我们可能需要使用自定义的线程池,以满足特定的业务需求,例如控制线程数量、设置线程优先级等。

使用自定义线程池

可以通过 supplyAsyncrunAsync 方法的重载版本来指定使用的线程池。

import java.util.concurrent.*;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);

        CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task executed with custom thread pool";
        }, executor)
               .thenAccept(System.out::println)
               .thenRun(() -> executor.shutdown());
    }
}

在上述代码中,首先创建了一个固定大小为 5 的线程池 executor。然后使用 supplyAsync 方法的重载版本,将自定义线程池作为参数传入,这样异步任务就会在自定义线程池中执行。任务完成后,通过 thenRun 方法关闭线程池。

线程池大小的选择

选择合适的线程池大小对于性能至关重要。如果线程池太小,可能导致任务等待,无法充分利用系统资源;如果线程池太大,可能会增加线程上下文切换的开销,导致性能下降。

一般来说,对于 CPU 密集型任务,线程池大小可以设置为 CPU 核心数加 1,以确保在某个线程因偶尔的 I/O 操作等原因阻塞时,其他线程仍能继续使用 CPU 资源。对于 I/O 密集型任务,线程池大小可以根据预估的 I/O 等待时间和 CPU 处理时间的比例适当增大,例如设置为 CPU 核心数的 2 到 3 倍。

CompletableFuture 的性能优化

减少不必要的异步任务

虽然异步编程可以提高系统的响应性,但过多的异步任务可能会增加系统的复杂性和性能开销。在设计时,应仔细评估哪些任务真正需要异步执行,避免将一些简单、快速的任务也异步化。

合理设置超时

在异步任务执行过程中,设置合理的超时时间可以避免任务长时间阻塞。CompletableFuture 本身没有直接提供设置超时的方法,但可以通过 ExecutorServicesubmit 方法结合 Futureget(timeout, unit) 方法来实现。

import java.util.concurrent.*;

public class TimeoutExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(() -> {
            // 模拟耗时操作
            Thread.sleep(5000);
            return "Task completed";
        });

        try {
            String result = future.get(2, TimeUnit.SECONDS);
            System.out.println(result);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            System.out.println("Task timed out or interrupted.");
            future.cancel(true);
        } finally {
            executor.shutdown();
        }
    }
}

在上述代码中,submit 方法提交一个异步任务,然后通过 get(2, TimeUnit.SECONDS) 方法设置超时时间为 2 秒。如果任务在 2 秒内未完成,将抛出 TimeoutException,并取消任务。

避免不必要的阻塞

在处理 CompletableFuture 时,应尽量避免在主线程或关键线程中调用 get 方法阻塞等待结果。可以通过使用回调函数(如 thenApplythenAccept 等)来处理结果,以保持程序的异步性。

总结

CompletableFuture 为 Java 开发者提供了强大的异步编程能力,使得在复杂业务逻辑中处理异步任务变得更加简洁和高效。通过合理使用 CompletableFuture 的各种方法,如创建异步任务、处理结果、组合任务、处理异常等,我们可以构建出高性能、高响应性的应用程序。同时,在使用 CompletableFuture 时,要注意线程池的选择、性能优化等方面,以充分发挥其优势。希望本文的内容能帮助你更好地理解和应用 CompletableFuture 进行异步编程。