MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture并行处理应对高并发场景的方案

2024-01-121.1k 阅读

Java CompletableFuture并行处理应对高并发场景的方案

高并发场景下的挑战

在当今的互联网应用开发中,高并发场景随处可见。例如电商平台的抢购活动、在线支付系统、大型网站的流量高峰时段等。在这些场景下,传统的顺序处理方式会遇到严重的性能瓶颈。

假设一个简单的业务场景,我们需要从多个不同的数据源获取数据,然后进行整合处理。如果按照顺序依次从每个数据源获取数据,每个数据源的获取操作可能都需要一定的时间,在高并发环境下,大量用户同时请求,这种顺序处理方式会导致响应时间过长,用户体验变差,甚至可能使服务器因长时间处理请求而不堪重负。

CompletableFuture简介

CompletableFuture 是Java 8引入的一个强大的类,它实现了 FutureCompletionStage 接口。Future 接口是Java早期用于异步计算的工具,它允许我们异步执行任务并获取任务的结果,但存在一些局限性,比如无法直接处理异步任务的完成事件,获取结果时需要阻塞等待。

CompletableFuture 弥补了这些不足,它支持链式调用、组合多个异步任务、处理异步任务的完成事件等。通过 CompletableFuture,我们可以更灵活、高效地处理异步计算,特别适合应对高并发场景。

CompletableFuture基础使用

  1. 创建CompletableFuture
    • 使用 supplyAsync 创建有返回值的CompletableFuture
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        });

        String result = future.get();
        System.out.println(result);
    }
}

在上述代码中,CompletableFuture.supplyAsync 方法接受一个 Supplier 作为参数,在一个新的线程中异步执行该 Supplierget 方法,并返回一个 CompletableFuturefuture.get() 方法会阻塞当前线程,直到异步任务完成并返回结果。

- **使用 `runAsync` 创建无返回值的CompletableFuture**
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Task completed without return value.");
        });

        future.get();
    }
}

CompletableFuture.runAsync 方法接受一个 Runnable 作为参数,在新线程中异步执行该 Runnable,返回的 CompletableFuture 没有返回值(Void)。

  1. 获取CompletableFuture的结果
    • get 方法:如上述例子所示,get 方法会阻塞当前线程,直到 CompletableFuture 完成并返回结果。如果异步任务抛出异常,get 方法会将异常包装成 ExecutionExceptionInterruptedException 抛出。
    • getNow(T valueIfAbsent) 方法:该方法不会阻塞,如果 CompletableFuture 已经完成,返回其结果;否则返回传入的 valueIfAbsent 参数。
import java.util.concurrent.CompletableFuture;

public class CompletableFutureGetNowExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result";
        });

        String result1 = future.getNow("Default");
        System.out.println("Result1: " + result1);

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        String result2 = future.getNow("Default");
        System.out.println("Result2: " + result2);
    }
}

在这个例子中,第一次调用 getNow 时,异步任务还未完成,所以返回 Default;第二次调用时,异步任务已完成,返回实际的结果 Result

- **`join` 方法**:与 `get` 方法类似,但如果异步任务抛出异常,`join` 方法会直接抛出原始异常,而不是包装异常。

CompletableFuture链式调用

  1. thenApply 方法:用于对 CompletableFuture 的结果进行转换。它接受一个 Function 作为参数,该 Function 会在异步任务完成后,对任务的结果进行处理,并返回一个新的 CompletableFuture
import java.util.concurrent.CompletableFuture;

public class CompletableFutureThenApplyExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s -> s + ", World")
                .thenApply(String::toUpperCase)
                .thenAccept(System.out::println);
    }
}

在这个例子中,supplyAsync 方法创建了一个异步任务,返回字符串 Hello。然后通过 thenApply 方法依次对结果进行追加字符串 , World 和转换为大写的操作,最后通过 thenAccept 方法打印最终结果。

  1. thenAccept 方法:用于在 CompletableFuture 完成后执行一个 Consumer,但不返回新的 CompletableFuture
import java.util.concurrent.CompletableFuture;

public class CompletableFutureThenAcceptExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Hello")
                .thenAccept(s -> System.out.println(s + ", World"));
    }
}

这里 thenAccept 方法接受一个 Consumer,在异步任务返回 Hello 后,将其与 , World 拼接并打印。

  1. thenRun 方法:与 thenAccept 类似,但 thenRun 接受的是一个 Runnable,不处理 CompletableFuture 的结果,只是在任务完成后执行 Runnable
import java.util.concurrent.CompletableFuture;

public class CompletableFutureThenRunExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Hello")
                .thenRun(() -> System.out.println("Task completed"));
    }
}

在这个例子中,异步任务返回 Hello 后,thenRun 执行打印 Task completed 的操作。

CompletableFuture处理异常

  1. exceptionally 方法:用于处理 CompletableFuture 执行过程中抛出的异常。它接受一个 Function 作为参数,当异步任务抛出异常时,该 Function 会被调用,参数为异常对象,返回值作为新的 CompletableFuture 的结果。
import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionallyExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Simulated exception");
            }
            return "Success";
        })
                .exceptionally(ex -> {
                    System.out.println("Caught exception: " + ex.getMessage());
                    return "Default value";
                })
                .thenAccept(System.out::println);
    }
}

在上述代码中,异步任务有一定概率抛出异常,exceptionally 方法捕获到异常后,打印异常信息并返回默认值 Default value

  1. handle 方法:既能处理正常的结果,也能处理异常。它接受一个 BiFunction 作为参数,第一个参数是正常的结果(如果有异常则为 null),第二个参数是异常对象(如果任务正常完成则为 null),返回值作为新的 CompletableFuture 的结果。
import java.util.concurrent.CompletableFuture;

public class CompletableFutureHandleExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Simulated exception");
            }
            return "Success";
        })
                .handle((result, ex) -> {
                    if (ex != null) {
                        System.out.println("Caught exception: " + ex.getMessage());
                        return "Default value";
                    }
                    return result + " processed";
                })
                .thenAccept(System.out::println);
    }
}

这里 handle 方法根据任务是否成功,对结果进行不同的处理,成功时对结果进行追加字符串处理,失败时返回默认值并打印异常信息。

CompletableFuture组合多个异步任务

  1. thenCombine 方法:用于组合两个 CompletableFuture 的结果。它接受另一个 CompletableFuture 和一个 BiFunction 作为参数,当两个 CompletableFuture 都完成时,BiFunction 会被调用,参数为两个 CompletableFuture 的结果,返回值作为新的 CompletableFuture 的结果。
import java.util.concurrent.CompletableFuture;

public class CompletableFutureThenCombineExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

        future1.thenCombine(future2, (s1, s2) -> s1 + ", " + s2)
                .thenAccept(System.out::println);
    }
}

在这个例子中,future1future2 异步执行,当它们都完成后,thenCombine 方法将两个结果拼接并打印。

  1. allOf 方法:用于等待所有 CompletableFuture 都完成。它接受多个 CompletableFuture 作为参数,返回一个新的 CompletableFuture<Void>,当所有传入的 CompletableFuture 都完成时,这个新的 CompletableFuture 才完成。
import java.util.concurrent.CompletableFuture;

public class CompletableFutureAllOfExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result2";
        });

        CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);

        allFuture.join();

        try {
            System.out.println("Future1 result: " + future1.get());
            System.out.println("Future2 result: " + future2.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,allOf 方法等待 future1future2 都完成,然后通过 join 方法等待 allFuture 完成,最后获取并打印 future1future2 的结果。

  1. anyOf 方法:只要有一个 CompletableFuture 完成,就返回这个完成的 CompletableFuture 的结果。它接受多个 CompletableFuture 作为参数,返回一个 CompletableFuture<Object>,其结果是第一个完成的 CompletableFuture 的结果。
import java.util.concurrent.CompletableFuture;

public class CompletableFutureAnyOfExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result2";
        });

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);

        try {
            System.out.println("First completed result: " + anyFuture.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,future2 会先完成,anyOf 方法返回的 anyFuture 的结果就是 future2 的结果 Result2

CompletableFuture在高并发场景中的应用方案

  1. 并行获取多个数据源的数据 假设我们有三个数据源,分别获取用户信息、订单信息和商品信息,然后整合这些信息返回给用户。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class DataSource {
    public static String getUserInfo() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "User information";
    }

    public static String getOrderInfo() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Order information";
    }

    public static String getProductInfo() {
        try {
            Thread.sleep(2500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Product information";
    }
}

public class HighConcurrencyDataFetching {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(DataSource::getUserInfo);
        CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(DataSource::getOrderInfo);
        CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(DataSource::getProductInfo);

        CompletableFuture<Void> allFuture = CompletableFuture.allOf(userFuture, orderFuture, productFuture);

        allFuture.join();

        String userInfo = userFuture.get();
        String orderInfo = orderFuture.get();
        String productInfo = productFuture.get();

        String combinedInfo = userInfo + ", " + orderInfo + ", " + productInfo;
        System.out.println(combinedInfo);
    }
}

在这个例子中,通过 CompletableFuture.supplyAsync 方法并行地从三个数据源获取数据,allOf 方法等待所有数据获取完成,然后将结果整合并打印。这样可以大大减少总响应时间,提高系统在高并发场景下的性能。

  1. 异步处理任务队列 在高并发场景下,可能会有大量的任务需要处理,我们可以将这些任务放入队列,然后使用 CompletableFuture 异步处理。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;

public class TaskQueueProcessing {
    public static void main(String[] args) {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

        // 模拟添加任务到队列
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            taskQueue.add(() -> {
                System.out.println("Processing task " + taskNumber);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 异步处理任务队列
        for (int i = 0; i < 5; i++) {
            CompletableFuture.runAsync(() -> {
                while (true) {
                    Runnable task = null;
                    try {
                        task = taskQueue.take();
                        task.run();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        break;
                    }
                }
            });
        }
    }
}

在这个例子中,我们创建了一个任务队列,并向队列中添加了10个任务。然后通过 CompletableFuture.runAsync 启动5个异步线程从队列中取出任务并执行,提高任务处理的效率。

  1. 处理依赖关系复杂的任务 有时候任务之间存在复杂的依赖关系,比如任务B依赖任务A的结果,任务C依赖任务B和另一个任务D的结果等。CompletableFuture 可以很好地处理这种情况。
import java.util.concurrent.CompletableFuture;

public class ComplexDependencyTasks {
    public static void main(String[] args) {
        CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of task A";
        });

        CompletableFuture<String> taskB = taskA.thenApply(resultA -> {
            System.out.println("Using result of task A: " + resultA);
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of task B based on A";
        });

        CompletableFuture<String> taskD = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of task D";
        });

        CompletableFuture<String> taskC = taskB.thenCombine(taskD, (resultB, resultD) -> {
            System.out.println("Using result of task B: " + resultB);
            System.out.println("Using result of task D: " + resultD);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result of task C based on B and D";
        });

        taskC.thenAccept(System.out::println);
    }
}

在这个例子中,taskB 依赖 taskA 的结果,taskC 依赖 taskBtaskD 的结果。通过 CompletableFuture 的链式调用和组合方法,清晰地表达了任务之间的依赖关系,并异步执行这些任务。

性能优化与注意事项

  1. 线程池的使用 在使用 CompletableFuture 时,默认情况下,supplyAsyncrunAsync 方法使用的是 ForkJoinPool.commonPool()。对于高并发场景,如果任务量较大且任务执行时间较长,可能会导致 commonPool 线程池饱和,影响性能。因此,建议根据实际情况创建自定义的线程池。
import java.util.concurrent.*;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result";
        }, executor)
                .thenAccept(System.out::println);

        executor.shutdown();
    }
}

在这个例子中,我们创建了一个固定大小为10的线程池,并将其传递给 supplyAsync 方法。这样可以更好地控制线程资源的使用。

  1. 资源管理 在高并发场景下,要注意资源的合理使用和释放。例如,在异步任务中可能会使用数据库连接、文件句柄等资源,确保在任务完成后及时释放这些资源,避免资源泄漏。

  2. 异常处理策略 在处理多个 CompletableFuture 组合的场景中,要确保异常能够被正确捕获和处理。如果一个 CompletableFuture 抛出异常,可能会影响整个任务链的执行,因此需要根据业务需求制定合适的异常处理策略,如重试、回滚等。

  3. 避免过度并行 虽然并行处理可以提高性能,但过度并行可能会导致线程上下文切换开销增大、资源竞争加剧等问题。需要根据系统的硬件资源(如CPU核心数、内存等)和任务的特性(如I/O密集型还是CPU密集型)来合理设置并行度。

总结

CompletableFuture 为Java开发者提供了强大的异步编程能力,在高并发场景下,通过合理使用 CompletableFuture 的各种特性,如链式调用、异常处理、任务组合等,可以显著提高系统的性能和响应速度。同时,要注意线程池的使用、资源管理、异常处理和并行度的控制等方面,以确保系统在高并发环境下的稳定运行。通过不断实践和优化,我们能够更好地利用 CompletableFuture 来构建高效、可靠的高并发应用程序。

希望以上内容对你深入理解和应用 CompletableFuture 应对高并发场景有所帮助。在实际开发中,根据具体业务需求灵活运用这些知识,将能有效提升系统的性能和用户体验。