MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture supplyAsync任务调度的优化策略

2021-03-215.9k 阅读

Java CompletableFuture supplyAsync 任务调度的优化策略

理解 CompletableFuture 和 supplyAsync

在 Java 中,CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它提供了一种更灵活、更易于使用的方式来处理异步操作及其结果。supplyAsyncCompletableFuture 的一个静态方法,用于异步执行有返回值的任务。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SupplyAsyncExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        });

        String result = future.get();
        System.out.println(result);
    }
}

在上述代码中,supplyAsync 接受一个 Supplier 作为参数,该 Supplier 定义了异步执行的任务。任务执行完成后,可以通过 get 方法获取任务的返回值。如果任务还未完成,get 方法会阻塞当前线程,直到任务完成。

优化策略一:合理选择线程池

supplyAsync 方法有两个重载版本,一个无参数版本和一个接受 Executor 的版本。无参数版本默认使用 ForkJoinPool.commonPool() 作为线程池来执行任务。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    // 任务代码
    return "Result from common pool";
});

然而,ForkJoinPool.commonPool() 是一个共享的线程池,它的线程数量是根据 CPU 核心数动态调整的。在高并发场景下,可能会出现线程饥饿等问题。为了避免这些问题,可以创建自定义的线程池。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from custom pool";
        }, executor);

        future.thenAccept(System.out::println).thenRun(executor::shutdown);
    }
}

在上述代码中,我们创建了一个固定大小为 10 的线程池,并将其传递给 supplyAsync 方法。这样可以根据业务需求合理分配线程资源,提高任务执行效率。同时,在任务完成后,通过 thenRun 方法关闭线程池。

优化策略二:减少任务阻塞

在异步任务中,尽量减少任务内部的阻塞操作,因为阻塞会降低线程的利用率,影响整体性能。例如,避免在 supplyAsync 任务中使用 Thread.sleep 等阻塞方法,如果必须使用,可以考虑使用更高效的异步等待方式。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AvoidBlockingExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 这里避免使用Thread.sleep
            // 假设这里是一个异步I/O操作
            return "Async I/O result";
        });

        try {
            String result = future.get(5, TimeUnit.SECONDS);
            System.out.println(result);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,我们避免了在 supplyAsync 任务中使用 Thread.sleep,而是假设进行了一个异步 I/O 操作。同时,通过 get 方法的超时设置,防止线程无限期阻塞。

优化策略三:任务依赖管理

CompletableFuture 支持任务之间的依赖关系管理,合理利用这一特性可以优化任务调度。例如,当一个任务的结果依赖于另一个任务的结果时,可以使用 thenApplythenCompose 等方法。

import java.util.concurrent.CompletableFuture;

public class TaskDependencyExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Hello")
               .thenApply(s -> s + ", World")
               .thenApply(String::toUpperCase)
               .thenAccept(System.out::println);
    }
}

在上述代码中,每个 thenApply 操作都依赖于前一个操作的结果,形成了一个任务链。这种方式可以让任务按照逻辑顺序依次执行,并且在执行过程中不会阻塞主线程。

优化策略四:异常处理优化

在异步任务执行过程中,异常处理是非常重要的。CompletableFuture 提供了多种异常处理方法,如 exceptionallyhandle 等。

import java.util.concurrent.CompletableFuture;

public class ExceptionHandlingExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("Simulated error");
            }
            return "Success result";
        })
               .exceptionally(ex -> {
                    System.out.println("Caught exception: " + ex.getMessage());
                    return "Default value";
                })
               .thenAccept(System.out::println);
    }
}

在上述代码中,exceptionally 方法捕获了异步任务中抛出的异常,并返回一个默认值。这样可以避免因为一个任务的异常导致整个异步流程中断,提高系统的稳定性。

优化策略五:批量任务处理

当需要处理大量异步任务时,可以使用 CompletableFuture.allOfCompletableFuture.anyOf 方法。

import java.util.concurrent.CompletableFuture;

public class BatchTaskExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1 result");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2 result");
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Task 3 result");

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);

        allFutures.thenRun(() -> {
            try {
                System.out.println(future1.get());
                System.out.println(future2.get());
                System.out.println(future3.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).join();
    }
}

在上述代码中,CompletableFuture.allOf 方法等待所有任务完成,然后通过 thenRun 方法处理所有任务的结果。如果只需要获取第一个完成的任务结果,可以使用 CompletableFuture.anyOf 方法。

优化策略六:资源管理与释放

在异步任务中,涉及到资源的获取与释放,如数据库连接、文件句柄等。确保在任务完成后及时释放资源,避免资源泄漏。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;

public class ResourceManagementExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            Connection conn = null;
            PreparedStatement pstmt = null;
            ResultSet rs = null;
            try {
                conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password");
                pstmt = conn.prepareStatement("SELECT * FROM users");
                rs = pstmt.executeQuery();
                if (rs.next()) {
                    return rs.getString("username");
                }
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (rs != null) rs.close();
                    if (pstmt != null) pstmt.close();
                    if (conn != null) conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            return null;
        }).thenAccept(System.out::println);
    }
}

在上述代码中,在异步任务内部获取数据库连接、执行查询,并在 finally 块中确保资源被正确释放。

优化策略七:性能监控与调优

使用 Java 自带的工具如 jconsolejvisualvm 等对异步任务进行性能监控,了解线程池的使用情况、任务执行时间等指标,根据监控结果进行调优。

例如,通过 jvisualvm 可以查看线程池的活动线程数、队列长度等信息,从而判断是否需要调整线程池大小。

优化策略八:使用 CompletionStage 接口特性

CompletableFuture 实现了 CompletionStage 接口,该接口提供了更多灵活的异步操作组合方式。例如,可以使用 thenCombine 方法将两个异步任务的结果进行合并。

import java.util.concurrent.CompletableFuture;

public class CompletionStageExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);

        CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (a, b) -> a + b);

        combinedFuture.thenAccept(System.out::println);
    }
}

在上述代码中,thenCombine 方法将两个异步任务的结果相加,形成一个新的异步任务。

优化策略九:考虑响应式编程

对于一些对实时性要求较高的场景,可以考虑结合响应式编程框架,如 Reactor 或 RxJava。这些框架可以与 CompletableFuture 结合使用,提供更强大的异步流处理能力。

import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;

public class ReactiveIntegrationExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello from CompletableFuture");

        Mono.fromFuture(future)
               .map(String::toUpperCase)
               .subscribe(System.out::println);
    }
}

在上述代码中,我们将 CompletableFuture 转换为 Mono,利用 Reactor 的操作符对异步结果进行处理。

优化策略十:代码结构优化

保持异步任务代码的简洁和可读性,避免在异步任务中编写复杂的业务逻辑。可以将复杂的业务逻辑拆分成多个小的异步任务,通过 CompletableFuture 的组合方法进行调度。

import java.util.concurrent.CompletableFuture;

public class CodeStructureOptimizationExample {
    private static CompletableFuture<String> step1() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟第一步操作
            return "Step 1 result";
        });
    }

    private static CompletableFuture<String> step2(String input) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟第二步操作,依赖第一步结果
            return input + " -> Step 2 result";
        });
    }

    public static void main(String[] args) {
        step1()
               .thenCompose(CodeStructureOptimizationExample::step2)
               .thenAccept(System.out::println);
    }
}

在上述代码中,将复杂的业务逻辑拆分成 step1step2 两个方法,通过 thenCompose 方法进行顺序调用,使代码结构更加清晰。

优化策略十一:缓存异步结果

如果某些异步任务的结果是经常需要使用且不经常变化的,可以考虑对异步结果进行缓存。例如,使用 ConcurrentHashMap 来缓存 CompletableFuture 的结果。

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;

public class ResultCachingExample {
    private static final ConcurrentHashMap<String, CompletableFuture<String>> cache = new ConcurrentHashMap<>();

    public static CompletableFuture<String> getCachedResult(String key) {
        return cache.computeIfAbsent(key, k -> CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Cached result for " + key;
        }));
    }

    public static void main(String[] args) {
        getCachedResult("testKey").thenAccept(System.out::println);
        getCachedResult("testKey").thenAccept(System.out::println);
    }
}

在上述代码中,computeIfAbsent 方法确保对于相同的 key,只会执行一次异步任务,后续请求直接从缓存中获取结果,提高了性能。

优化策略十二:动态调整线程池大小

在运行时根据系统负载动态调整线程池的大小,可以提高系统的资源利用率和任务处理能力。例如,可以使用 ScheduledExecutorService 定期检查系统负载,根据负载情况调整线程池大小。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class DynamicThreadPoolSizeExample {
    private static final AtomicInteger load = new AtomicInteger(0);
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private static final int minThreads = 5;
    private static final int maxThreads = 20;

    public static void main(String[] args) {
        // 模拟负载变化
        scheduler.scheduleAtFixedRate(() -> {
            load.set((int) (Math.random() * 100));
            System.out.println("Current load: " + load.get());
        }, 0, 5, TimeUnit.SECONDS);

        // 定期调整线程池大小
        scheduler.scheduleAtFixedRate(() -> {
            int currentLoad = load.get();
            int threadCount = Math.min(maxThreads, Math.max(minThreads, currentLoad / 10));
            // 这里假设存在一个可调整大小的线程池对象 executor
            // executor.setCorePoolSize(threadCount);
            // executor.setMaximumPoolSize(threadCount);
            System.out.println("Adjusted thread pool size to: " + threadCount);
        }, 0, 10, TimeUnit.SECONDS);
    }
}

在上述代码中,我们使用 ScheduledExecutorService 模拟负载变化,并定期根据负载调整线程池大小。实际应用中,需要根据具体的线程池实现来调整核心线程数和最大线程数。

优化策略十三:避免过度异步

虽然异步编程可以提高系统的并发性能,但过度异步也可能导致代码复杂度增加、调试困难等问题。在设计系统时,需要权衡同步和异步的使用场景,只在必要的地方使用异步。

例如,对于一些简单的、执行时间较短的操作,同步执行可能更加简单高效,而对于一些 I/O 密集型或耗时较长的操作,异步执行更能发挥优势。

优化策略十四:利用 CompletableFuture 的 lazy 特性

CompletableFuture 支持延迟计算,即只有在真正需要结果时才会执行任务。例如,可以使用 CompletableFuture.supplyAsync(() -> computeResult(), ForkJoinPool.commonPool()).thenApply(this::processResult),其中 computeResult 方法不会立即执行,而是在后续通过 thenApply 等方法获取结果时才执行。

import java.util.concurrent.CompletableFuture;

public class LazyEvaluationExample {
    private static int computeResult() {
        System.out.println("Computing result...");
        return 42;
    }

    private static String processResult(int result) {
        return "The result is: " + result;
    }

    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(LazyEvaluationExample::computeResult)
               .thenApply(LazyEvaluationExample::processResult);

        System.out.println("Before getting result");
        try {
            String result = future.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,computeResult 方法在 get 方法调用前不会执行,体现了 CompletableFuture 的 lazy 特性,有助于提高性能,特别是在任务执行代价较高且不一定需要执行的情况下。

优化策略十五:考虑使用 Fork/Join 框架的特性

CompletableFutureFork/Join 框架有一定的关联,Fork/Join 框架适用于分治算法的并行执行。如果业务场景适合分治算法,可以将任务拆分成更小的子任务并行执行,然后合并结果。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinExample extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 100;
    private int start;
    private int end;

    public ForkJoinExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            int sum = 0;
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            int mid = (start + end) / 2;
            ForkJoinExample leftTask = new ForkJoinExample(start, mid);
            ForkJoinExample rightTask = new ForkJoinExample(mid + 1, end);

            leftTask.fork();
            int rightResult = rightTask.compute();
            int leftResult = leftTask.join();

            return leftResult + rightResult;
        }
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinExample task = new ForkJoinExample(1, 1000);
        int result = forkJoinPool.invoke(task);
        System.out.println("Sum is: " + result);
    }
}

在上述代码中,ForkJoinExample 类继承自 RecursiveTask,实现了分治算法来计算从 startend 的整数之和。通过 forkjoin 方法,将大任务拆分成小任务并行执行,提高计算效率。虽然这不是直接使用 CompletableFuture,但 Fork/Join 框架的思想可以与 CompletableFuture 结合,在合适的场景下优化任务调度。

优化策略十六:使用 CompletableFuture 的并行流操作

CompletableFuture 可以与 Java 8 的并行流结合使用,进一步提高数据处理的并行度。例如,对一个集合中的元素进行异步处理并收集结果。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class ParallelStreamWithCompletableFutureExample {
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            numbers.add(i);
        }

        List<CompletableFuture<Integer>> futures = numbers.parallelStream()
               .map(n -> CompletableFuture.supplyAsync(() -> n * n))
               .collect(Collectors.toList());

        List<Integer> results = futures.stream()
               .map(CompletableFuture::join)
               .collect(Collectors.toList());

        System.out.println(results);
    }
}

在上述代码中,我们将集合中的每个元素通过 CompletableFuture.supplyAsync 进行异步平方计算,利用并行流提高处理速度。最后,通过 join 方法获取所有异步任务的结果并收集到一个新的列表中。

优化策略十七:考虑使用 CompletableFuture 的异步回调机制

除了使用 get 方法阻塞获取结果外,CompletableFuture 还提供了丰富的异步回调方法,如 thenAcceptthenRunthenApplyAsync 等。合理使用这些回调方法可以避免主线程阻塞,提高系统的响应性。

import java.util.concurrent.CompletableFuture;

public class AsyncCallbackExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Async result";
        })
               .thenApplyAsync(result -> result + " processed")
               .thenAccept(System.out::println);

        System.out.println("Main thread continues without waiting");
    }
}

在上述代码中,thenApplyAsyncthenAccept 方法在异步任务完成后异步执行回调操作,主线程不会阻塞,继续执行后续代码,提高了系统的响应性。

优化策略十八:利用 CompletableFuture 的组合操作实现复杂业务逻辑

CompletableFuture 提供了多种组合操作方法,如 thenCombinethenAcceptBothapplyToEither 等,可以根据业务需求灵活组合异步任务,实现复杂的业务逻辑。

import java.util.concurrent.CompletableFuture;

public class ComplexBusinessLogicExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2");

        CompletableFuture<Void> combinedFuture = future1.thenCombine(future2, (r1, r2) -> r1 + " and " + r2)
               .thenAccept(System.out::println);

        combinedFuture.join();
    }
}

在上述代码中,thenCombine 方法将两个异步任务的结果合并成一个新的结果,然后通过 thenAccept 方法处理合并后的结果。这种方式可以方便地处理多个异步任务之间的复杂依赖关系和业务逻辑。

优化策略十九:避免 CompletableFuture 的内存泄漏

在使用 CompletableFuture 时,要注意避免内存泄漏问题。例如,如果在 CompletableFuture 中持有对大对象的引用,并且该 CompletableFuture 没有被正确释放,可能会导致内存泄漏。

import java.util.concurrent.CompletableFuture;

public class MemoryLeakAvoidanceExample {
    public static void main(String[] args) {
        // 假设LargeObject是一个占用大量内存的对象
        class LargeObject {
            private byte[] data = new byte[1024 * 1024 * 10]; // 10MB
        }

        CompletableFuture<LargeObject> future = CompletableFuture.supplyAsync(() -> new LargeObject());

        // 确保在使用完后释放资源
        future.thenAccept(obj -> {
            // 使用obj
        }).thenRun(() -> {
            // 这里可以进行资源清理操作
        });
    }
}

在上述代码中,我们在 thenRun 方法中可以添加资源清理操作,确保在 CompletableFuture 任务完成后,相关的大对象资源能够被正确释放,避免内存泄漏。

优化策略二十:考虑使用 CompletableFuture 的 CompletionStage 链优化

CompletableFuture 实现的 CompletionStage 接口支持链式调用,通过合理构建链式调用,可以使代码更加简洁和高效。

import java.util.concurrent.CompletableFuture;

public class CompletionStageChainOptimizationExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Initial result")
               .thenApply(String::toUpperCase)
               .thenApply(result -> result + " appended")
               .thenAccept(System.out::println);
    }
}

在上述代码中,通过链式调用 thenApplythenAccept 方法,将异步任务的处理逻辑串联起来,使代码更加简洁易读,同时也提高了代码的执行效率,因为这种链式调用避免了中间变量的创建和不必要的代码冗余。

通过以上二十种优化策略,可以在使用 CompletableFuture.supplyAsync 进行任务调度时,显著提高系统的性能、稳定性和可维护性。在实际应用中,需要根据具体的业务场景和需求,灵活选择和组合这些优化策略。