MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture thenAccept结果消费的应用场景

2021-02-251.7k 阅读

Java CompletableFuture thenAccept 结果消费的应用场景

一、概述

在Java的异步编程中,CompletableFuture 是一个强大的工具,它提供了一种异步获取结果以及对结果进行处理的方式。thenAccept 方法是 CompletableFuture 众多方法中的一个,用于在 CompletableFuture 完成时,消费其计算结果。thenAccept 方法接受一个 Consumer 作为参数,该 Consumer 会在 CompletableFuture 成功完成时,接收并处理计算结果。其方法签名如下:

CompletableFuture<Void> thenAccept(Consumer<? super U> action);

这里的 UCompletableFuture 计算结果的类型,action 是一个 Consumer 实例,它会处理这个结果。thenAccept 方法返回一个新的 CompletableFuture,当 action 执行完毕后,这个新的 CompletableFuture 会以 null 为结果完成。

二、在数据处理流水线中的应用

  1. 数据获取与处理流水线 在许多应用场景中,我们需要从不同的数据源获取数据,然后对这些数据进行一系列的处理。例如,从数据库中查询用户信息,然后对用户信息进行格式转换、加密等操作。CompletableFuturethenAccept 方法可以很好地应用在这样的数据处理流水线中。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class DataPipelineExample {
    public static CompletableFuture<String> fetchUserData() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据库查询用户数据
            return "userData";
        });
    }

    public static CompletableFuture<Void> processUserData(String userData) {
        return CompletableFuture.runAsync(() -> {
            // 模拟数据格式转换和加密操作
            System.out.println("Processing user data: " + userData);
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = fetchUserData()
               .thenAccept(DataPipelineExample::processUserData);
        future.get();
    }
}

在上述代码中,fetchUserData 方法异步获取用户数据,返回一个 CompletableFuture<String>。然后通过 thenAccept 方法,将获取到的用户数据传递给 processUserData 方法进行处理。processUserData 方法返回一个 CompletableFuture<Void>,因为它只进行数据处理,不需要返回新的结果。

  1. 链式数据处理 thenAccept 方法可以链式调用,实现更复杂的数据处理流程。例如,我们在获取用户数据后,先进行格式转换,再进行加密,最后存储到另一个地方。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ChainedDataProcessingExample {
    public static CompletableFuture<String> fetchUserData() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据库查询用户数据
            return "userData";
        });
    }

    public static String formatUserData(String userData) {
        // 模拟数据格式转换
        return "Formatted: " + userData;
    }

    public static String encryptUserData(String formattedUserData) {
        // 模拟数据加密
        return "Encrypted: " + formattedUserData;
    }

    public static CompletableFuture<Void> storeUserData(String encryptedUserData) {
        return CompletableFuture.runAsync(() -> {
            // 模拟数据存储
            System.out.println("Storing encrypted user data: " + encryptedUserData);
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = fetchUserData()
               .thenApply(ChainedDataProcessingExample::formatUserData)
               .thenApply(ChainedDataProcessingExample::encryptUserData)
               .thenAccept(ChainedDataProcessingExample::storeUserData);
        future.get();
    }
}

在这段代码中,fetchUserData 方法获取用户数据后,通过 thenApply 方法依次进行格式转换和加密,最后通过 thenAccept 方法将加密后的数据传递给 storeUserData 方法进行存储。

三、在异步任务依赖中的应用

  1. 任务依赖关系 在一些复杂的业务场景中,存在多个异步任务,并且这些任务之间存在依赖关系。例如,任务B需要等待任务A完成并获取其结果后才能开始执行。CompletableFuturethenAccept 方法可以方便地处理这种任务依赖关系。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class TaskDependencyExample {
    public static CompletableFuture<Integer> taskA() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟任务A的执行
            System.out.println("Task A is running");
            return 10;
        });
    }

    public static CompletableFuture<Void> taskB(int resultOfA) {
        return CompletableFuture.runAsync(() -> {
            // 模拟任务B的执行,依赖任务A的结果
            System.out.println("Task B is running with result of A: " + resultOfA);
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = taskA()
               .thenAccept(TaskDependencyExample::taskB);
        future.get();
    }
}

在上述代码中,taskA 方法异步执行并返回一个结果。thenAccept 方法将 taskA 的结果传递给 taskB 方法,使得 taskB 能够在 taskA 完成后,基于其结果开始执行。

  1. 多个任务依赖一个任务结果 有时候,可能会有多个任务都依赖于同一个异步任务的结果。例如,一个订单处理系统中,订单生成后,需要同时通知仓库发货、财务部门记账等。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class MultipleTasksDependencyExample {
    public static CompletableFuture<String> generateOrder() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟订单生成
            System.out.println("Generating order");
            return "Order123";
        });
    }

    public static CompletableFuture<Void> notifyWarehouse(String order) {
        return CompletableFuture.runAsync(() -> {
            // 模拟通知仓库发货
            System.out.println("Notifying warehouse about order: " + order);
        });
    }

    public static CompletableFuture<Void> notifyFinance(String order) {
        return CompletableFuture.runAsync(() -> {
            // 模拟通知财务部门记账
            System.out.println("Notifying finance about order: " + order);
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> orderFuture = generateOrder();
        CompletableFuture<Void> warehouseFuture = orderFuture.thenAccept(MultipleTasksDependencyExample::notifyWarehouse);
        CompletableFuture<Void> financeFuture = orderFuture.thenAccept(MultipleTasksDependencyExample::notifyFinance);
        CompletableFuture.allOf(warehouseFuture, financeFuture).get();
    }
}

在这段代码中,generateOrder 方法生成订单后,通过 thenAccept 方法分别将订单信息传递给 notifyWarehousenotifyFinance 方法,实现多个任务依赖于同一个订单生成任务的结果。最后通过 CompletableFuture.allOf 方法等待所有依赖任务完成。

四、在日志记录与监控中的应用

  1. 异步操作的日志记录 在异步编程中,记录异步操作的结果对于调试和系统监控非常重要。thenAccept 方法可以方便地在异步操作完成后,记录相关的日志信息。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class LoggingExample {
    public static CompletableFuture<String> asyncOperation() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟异步操作
            System.out.println("Async operation is running");
            return "Operation result";
        });
    }

    public static void logResult(String result) {
        System.out.println("Logging result: " + result);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = asyncOperation()
               .thenAccept(LoggingExample::logResult);
        future.get();
    }
}

在上述代码中,asyncOperation 方法异步执行一个操作并返回结果。通过 thenAccept 方法,将操作结果传递给 logResult 方法,记录操作结果的日志。

  1. 监控异步任务执行时间 我们还可以利用 thenAccept 方法监控异步任务的执行时间。通过记录任务开始时间和结束时间,计算任务的执行时长,并记录到日志或者发送到监控系统中。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class MonitoringExample {
    public static CompletableFuture<String> asyncOperation() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟异步操作
            System.out.println("Async operation is running");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Operation result";
        });
    }

    public static void monitorExecutionTime(long startTime, String result) {
        long endTime = System.currentTimeMillis();
        long executionTime = endTime - startTime;
        System.out.println("Operation executed in " + executionTime + " ms. Result: " + result);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long startTime = System.currentTimeMillis();
        CompletableFuture<Void> future = asyncOperation()
               .thenAccept(result -> monitorExecutionTime(startTime, result));
        future.get();
    }
}

在这段代码中,asyncOperation 方法模拟一个异步操作,在 main 方法中记录开始时间,通过 thenAccept 方法在操作完成后,计算并记录执行时间。

五、在消息队列与事件驱动架构中的应用

  1. 消息处理与消费 在消息队列系统中,当生产者发送消息到队列后,消费者需要异步地从队列中获取消息并进行处理。CompletableFuturethenAccept 方法可以很好地模拟这种消息消费过程。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class MessageQueueExample {
    public static CompletableFuture<String> receiveMessage() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从消息队列接收消息
            System.out.println("Receiving message from queue");
            return "New message";
        });
    }

    public static CompletableFuture<Void> processMessage(String message) {
        return CompletableFuture.runAsync(() -> {
            // 模拟消息处理
            System.out.println("Processing message: " + message);
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = receiveMessage()
               .thenAccept(MessageQueueExample::processMessage);
        future.get();
    }
}

在上述代码中,receiveMessage 方法模拟从消息队列接收消息,返回一个 CompletableFuture<String>thenAccept 方法将接收到的消息传递给 processMessage 方法进行处理。

  1. 事件驱动的系统 在事件驱动架构中,当某个事件发生时,需要触发一系列的异步操作。例如,用户注册成功后,需要发送欢迎邮件、创建用户相关的默认配置等。thenAccept 方法可以方便地处理这种事件驱动的异步操作。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class EventDrivenExample {
    public static CompletableFuture<String> userRegistered() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟用户注册事件
            System.out.println("User registered");
            return "user123";
        });
    }

    public static CompletableFuture<Void> sendWelcomeEmail(String userId) {
        return CompletableFuture.runAsync(() -> {
            // 模拟发送欢迎邮件
            System.out.println("Sending welcome email to user: " + userId);
        });
    }

    public static CompletableFuture<Void> createDefaultConfig(String userId) {
        return CompletableFuture.runAsync(() -> {
            // 模拟创建用户默认配置
            System.out.println("Creating default config for user: " + userId);
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> userFuture = userRegistered();
        CompletableFuture<Void> emailFuture = userFuture.thenAccept(EventDrivenExample::sendWelcomeEmail);
        CompletableFuture<Void> configFuture = userFuture.thenAccept(EventDrivenExample::createDefaultConfig);
        CompletableFuture.allOf(emailFuture, configFuture).get();
    }
}

在这段代码中,userRegistered 方法模拟用户注册事件并返回用户ID。通过 thenAccept 方法,分别将用户ID传递给 sendWelcomeEmailcreateDefaultConfig 方法,实现事件驱动的异步操作。

六、在Web应用开发中的应用

  1. 异步请求处理 在Web应用开发中,处理HTTP请求时,有时需要进行一些耗时的异步操作,例如查询数据库、调用外部API等。CompletableFuturethenAccept 方法可以在异步操作完成后,将结果返回给客户端。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet("/asyncRequest")
public class AsyncRequestExample extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public static CompletableFuture<String> asyncTask() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时异步操作
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Async task result";
        });
    }

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) {
        asyncTask()
               .thenAccept(result -> {
                    try {
                        response.getWriter().println(result);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
    }
}

在上述代码中,asyncTask 方法模拟一个耗时的异步操作。在 doGet 方法中,通过 thenAccept 方法在异步操作完成后,将结果输出到HTTP响应中返回给客户端。

  1. 异步数据渲染 在Web应用中,有时需要从多个数据源异步获取数据,然后将这些数据合并并渲染到页面上。thenAccept 方法可以在获取到所有数据后,进行数据的合并和渲染操作。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet("/asyncDataRender")
public class AsyncDataRenderExample extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public static CompletableFuture<String> getDataFromSource1() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据源1获取数据
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Data from source 1";
        });
    }

    public static CompletableFuture<String> getDataFromSource2() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据源2获取数据
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Data from source 2";
        });
    }

    public static void renderData(String data1, String data2, HttpServletResponse response) {
        try {
            String combinedData = data1 + " " + data2;
            response.getWriter().println("Rendered data: " + combinedData);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) {
        CompletableFuture<String> future1 = getDataFromSource1();
        CompletableFuture<String> future2 = getDataFromSource2();

        CompletableFuture.allOf(future1, future2)
               .thenRun(() -> {
                    try {
                        String data1 = future1.get();
                        String data2 = future2.get();
                        renderData(data1, data2, response);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
    }
}

在这段代码中,getDataFromSource1getDataFromSource2 方法分别从不同数据源异步获取数据。通过 CompletableFuture.allOf 方法等待两个异步操作都完成,然后通过 thenRun(这里也可以用 thenAccept 结合一个接受 Void 参数的 Consumer 来实现类似功能)方法在获取到所有数据后,调用 renderData 方法进行数据的合并和渲染,并输出到HTTP响应中。

七、异常处理与 thenAccept 结合

  1. 简单异常处理 在使用 thenAccept 方法时,可能会遇到异步操作抛出异常的情况。CompletableFuture 提供了 exceptionally 方法来处理异常。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ExceptionHandlingExample {
    public static CompletableFuture<String> asyncOperation() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟可能抛出异常的异步操作
            if (Math.random() > 0.5) {
                throw new RuntimeException("Async operation failed");
            }
            return "Async operation result";
        });
    }

    public static void handleResult(String result) {
        System.out.println("Handling result: " + result);
    }

    public static void handleException(Throwable e) {
        System.out.println("Handling exception: " + e.getMessage());
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = asyncOperation()
               .exceptionally(ExceptionHandlingExample::handleException)
               .thenAccept(ExceptionHandlingExample::handleResult);
        future.get();
    }
}

在上述代码中,asyncOperation 方法模拟一个可能抛出异常的异步操作。通过 exceptionally 方法捕获异常并进行处理,然后通过 thenAccept 方法处理正常的结果或者异常处理后的结果。

  1. 复杂异常处理与重试机制 对于一些复杂的业务场景,可能需要在遇到异常时进行重试操作。可以结合 CompletableFuture 的方法和循环来实现重试机制。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class RetryExceptionHandlingExample {
    public static CompletableFuture<String> asyncOperation(int attempt) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟可能抛出异常的异步操作
            if (Math.random() > 0.5 && attempt < 3) {
                throw new RuntimeException("Async operation failed, attempt " + attempt);
            }
            return "Async operation result";
        });
    }

    public static void handleResult(String result) {
        System.out.println("Handling result: " + result);
    }

    public static void handleException(Throwable e) {
        System.out.println("Handling exception: " + e.getMessage());
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int maxAttempts = 3;
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> null);
        for (int i = 1; i <= maxAttempts; i++) {
            int attempt = i;
            future = future.thenComposeAsync(v -> asyncOperation(attempt))
                   .exceptionally(e -> {
                        if (attempt < maxAttempts) {
                            return null;
                        } else {
                            handleException(e);
                            return null;
                        }
                    });
        }
        future.thenAccept(RetryExceptionHandlingExample::handleResult).get();
    }
}

在这段代码中,asyncOperation 方法模拟一个可能抛出异常的异步操作,并且根据尝试次数来决定是否抛出异常。通过 thenComposeAsync 方法和循环实现重试机制,在达到最大重试次数后,如果仍然失败,则通过 exceptionally 方法处理异常,最后通过 thenAccept 方法处理最终的结果。

八、性能优化与注意事项

  1. 线程池的使用 在使用 CompletableFuture 进行异步操作时,如果不指定线程池,默认会使用 ForkJoinPool.commonPool()。对于一些需要大量异步操作或者对线程资源有特殊要求的场景,建议创建自己的线程池。
import java.util.concurrent.*;

public class ThreadPoolExample {
    public static ExecutorService executorService = Executors.newFixedThreadPool(10);

    public static CompletableFuture<String> asyncOperation() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟异步操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Async operation result";
        }, executorService);
    }

    public static void handleResult(String result) {
        System.out.println("Handling result: " + result);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = asyncOperation()
               .thenAccept(ThreadPoolExample::handleResult);
        future.get();
        executorService.shutdown();
    }
}

在上述代码中,创建了一个固定大小为10的线程池 executorService,并在 asyncOperation 方法中使用该线程池执行异步操作。

  1. 避免阻塞与死锁 在使用 CompletableFuture 时,要注意避免阻塞和死锁。例如,不要在 thenAccept 等回调方法中调用 get 方法获取 CompletableFuture 的结果,因为这可能会导致线程阻塞,甚至死锁。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AvoidBlockingExample {
    public static CompletableFuture<String> asyncOperation1() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟异步操作1
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Async operation 1 result";
        });
    }

    public static CompletableFuture<String> asyncOperation2() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟异步操作2
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Async operation 2 result";
        });
    }

    public static void handleResults(String result1, String result2) {
        System.out.println("Handling results: " + result1 + ", " + result2);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = asyncOperation1();
        CompletableFuture<String> future2 = asyncOperation2();

        CompletableFuture.allOf(future1, future2)
               .thenRun(() -> {
                    try {
                        String result1 = future1.get();
                        String result2 = future2.get();
                        handleResults(result1, result2);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
    }
}

在上述代码中,通过 CompletableFuture.allOf 方法等待两个异步操作完成,然后在 thenRun 方法中获取结果并处理,避免了在 thenAccept 等回调方法中直接调用 get 方法导致的阻塞问题。

  1. 内存管理与资源释放 在异步操作中,如果涉及到一些资源的使用,例如数据库连接、文件句柄等,要注意及时释放资源。可以在 thenAccept 方法或者相关的回调方法中进行资源的关闭操作。
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ResourceManagementExample {
    public static CompletableFuture<String> readFileAsync(String filePath) {
        return CompletableFuture.supplyAsync(() -> {
            StringBuilder content = new StringBuilder();
            try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    content.append(line).append("\n");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return content.toString();
        });
    }

    public static void handleFileContent(String content) {
        System.out.println("Handling file content: " + content);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = readFileAsync("example.txt")
               .thenAccept(ResourceManagementExample::handleFileContent);
        future.get();
    }
}

在上述代码中,readFileAsync 方法使用 try - with - resources 语句来自动关闭文件句柄,确保在异步操作完成后资源得到正确释放。

通过对 Java CompletableFuture thenAccept 结果消费的各种应用场景的介绍,我们可以看到它在异步编程中的灵活性和强大功能。合理使用 thenAccept 方法,可以提高程序的性能、可扩展性和响应性,使得我们能够更好地应对各种复杂的业务需求。同时,在使用过程中,要注意性能优化、异常处理以及资源管理等方面的问题,以确保程序的稳定性和可靠性。