MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步任务执行上下文传递

2022-11-301.8k 阅读

Java 中 CompletableFuture 异步任务执行上下文传递

理解 CompletableFuture

在现代 Java 编程中,CompletableFuture 是一个强大的工具,用于异步编程。它允许我们以一种非阻塞的方式执行任务,并在任务完成时获取结果。CompletableFuture 提供了丰富的方法来处理异步操作,包括链式调用、组合多个异步任务以及处理异常等。

例如,我们可以创建一个简单的 CompletableFuture 来模拟一个异步任务:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        });

        String result = future.get();
        System.out.println(result);
    }
}

在上述代码中,supplyAsync 方法接受一个 Supplier,该 Supplier 在一个新的线程中执行异步任务,并返回一个 CompletableFuture,通过 get 方法获取异步任务的结果。

上下文传递的需求

在实际应用中,我们常常需要在异步任务之间传递一些上下文信息。例如,在一个 Web 应用中,用户的身份信息(如用户 ID、角色等)可能需要在不同的异步操作中使用,以便进行权限检查、日志记录等。传统的线程局部变量(ThreadLocal)在异步任务场景下存在局限性,因为异步任务可能在不同的线程中执行,而 ThreadLocal 是基于线程的。

以一个简单的业务场景为例,假设我们有一个订单处理系统,在处理订单时需要记录用户的操作日志。我们可能有一个获取订单详情的异步任务,以及一个根据订单详情更新库存的异步任务,这两个任务都需要知道当前操作的用户是谁,以便在日志中记录用户信息。

线程上下文传递机制

  1. InheritableThreadLocal InheritableThreadLocalThreadLocal 的一个子类,它允许子线程继承父线程的 ThreadLocal 值。这在一定程度上可以解决异步任务中的上下文传递问题。

    示例代码如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class InheritableThreadLocalExample {
    private static final InheritableThreadLocal<String> userContext = new InheritableThreadLocal<>();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        userContext.set("user123");

        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Async task running with user context: " + userContext.get());
        });

        future.get();
    }
}

在上述代码中,我们在主线程中设置了 InheritableThreadLocal 的值,然后在异步任务中可以获取到该值。但是,这种方式在复杂的异步任务链或者并行异步任务场景下可能会出现问题,因为 InheritableThreadLocal 的继承关系是基于线程创建时的,而 CompletableFuture 可能会复用线程池中的线程,导致上下文传递不准确。

  1. 使用自定义的上下文传递机制 我们可以创建一个自定义的上下文对象,并在异步任务之间手动传递。

    示例代码如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class UserContext {
    private final String userId;

    public UserContext(String userId) {
        this.userId = userId;
    }

    public String getUserId() {
        return userId;
    }
}

public class CustomContextExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        UserContext context = new UserContext("user456");

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> context)
              .thenApplyAsync(ctx -> {
                    System.out.println("Async task running with user context: " + ctx.getUserId());
                    return null;
                });

        future.get();
    }
}

在这个示例中,我们创建了一个 UserContext 类来封装上下文信息,并通过 CompletableFuture 的链式调用将上下文传递给异步任务。这种方式更加灵活,但在复杂的应用中,手动传递上下文可能会变得繁琐。

CompletableFuture 中的上下文传递优化

  1. 使用 ForkJoinPool 的特性 CompletableFuture 通常使用 ForkJoinPool 来执行异步任务。ForkJoinPool 提供了一些机制来处理上下文传递。

    示例代码如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;

public class ForkJoinPoolContextExample {
    private static final AtomicReference<String> userContext = new AtomicReference<>();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        userContext.set("user789");

        ForkJoinPool.commonPool().submit(() -> {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                System.out.println("Async task running with user context: " + userContext.get());
            });
            return future;
        }).get();
    }
}

在这个示例中,我们使用 AtomicReference 来存储上下文信息,并通过 ForkJoinPoolsubmit 方法提交异步任务。由于 CompletableFuture 默认使用 ForkJoinPool.commonPool(),这种方式在一定程度上可以保证上下文在异步任务中的传递。

  1. 自定义 Executor 实现上下文传递 我们可以创建一个自定义的 Executor,在执行任务时传递上下文。

    示例代码如下:

import java.util.concurrent.*;

class ContextAwareExecutor implements Executor {
    private final Executor delegate;
    private final String userContext;

    public ContextAwareExecutor(Executor delegate, String userContext) {
        this.delegate = delegate;
        this.userContext = userContext;
    }

    @Override
    public void execute(Runnable task) {
        delegate.execute(() -> {
            System.out.println("Async task running with user context: " + userContext);
            task.run();
        });
    }
}

public class CustomExecutorContextExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        String userContext = "user1011";

        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Inner async task");
        }, new ContextAwareExecutor(executorService, userContext));

        future.get();
        executorService.shutdown();
    }
}

在上述代码中,ContextAwareExecutor 包装了一个 Executor,并在执行任务时传递上下文信息。通过将这个自定义的 Executor 传递给 CompletableFuturerunAsync 方法,我们可以在异步任务中获取上下文。

结合 ThreadLocal 和 CompletableFuture 的最佳实践

  1. 使用 ThreadLocal 进行上下文隔离 虽然传统的 ThreadLocal 在异步任务中存在局限性,但我们可以结合 CompletableFuture 的特性,在异步任务内部使用 ThreadLocal 进行上下文隔离。

    示例代码如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThreadLocalIsolationExample {
    private static final ThreadLocal<String> localContext = ThreadLocal.withInitial(() -> "default context");

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            localContext.set("context1");
            System.out.println("Async task 1 with context: " + localContext.get());
        });

        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            localContext.set("context2");
            System.out.println("Async task 2 with context: " + localContext.get());
        });

        CompletableFuture.allOf(future1, future2).get();
    }
}

在这个示例中,每个异步任务都可以设置自己的 ThreadLocal 值,从而实现上下文隔离。

  1. 使用 TransmittableThreadLocal TransmittableThreadLocal 是阿里开源的一个库,它可以解决 ThreadLocal 在异步任务中的上下文传递问题。

    首先,添加依赖:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.11.2</version>
</dependency>

然后,使用示例代码如下:

import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class TransmittableThreadLocalExample {
    private static final TransmittableThreadLocal<String> userContext = new TransmittableThreadLocal<>();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        userContext.set("user1213");

        CompletableFuture<Void> future = CompletableFuture.runAsync(TtlRunnable.get(() -> {
            System.out.println("Async task running with user context: " + userContext.get());
        }));

        future.get();
    }
}

在上述代码中,TransmittableThreadLocal 可以确保上下文在异步任务中正确传递,它通过对 RunnableCallable 进行包装,使得上下文能够在不同线程间传递。

异常处理与上下文传递

在异步任务执行过程中,异常处理是非常重要的。同时,我们也需要确保在异常处理过程中上下文信息能够正确传递。

  1. 使用 CompletableFuture 的异常处理方法 CompletableFuture 提供了 exceptionallyhandle 等方法来处理异常。

    示例代码如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ExceptionHandlingContextExample {
    private static final InheritableThreadLocal<String> userContext = new InheritableThreadLocal<>();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        userContext.set("user1415");

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Simulated exception");
            }
            return "Task completed successfully";
        }).exceptionally(ex -> {
            System.out.println("Exception in async task with user context: " + userContext.get());
            ex.printStackTrace();
            return "Exception handled";
        });

        String result = future.get();
        System.out.println(result);
    }
}

在这个示例中,exceptionally 方法捕获异步任务中的异常,并在处理异常时可以获取到上下文信息。

  1. 自定义异常处理与上下文传递 我们也可以创建自定义的异常处理逻辑,并结合上下文传递。

    示例代码如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class CustomException extends RuntimeException {
    private final String userContext;

    public CustomException(String message, String userContext) {
        super(message);
        this.userContext = userContext;
    }

    public String getUserContext() {
        return userContext;
    }
}

public class CustomExceptionHandlingContextExample {
    private static final String userContext = "user1617";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            if (Math.random() > 0.5) {
                throw new CustomException("Custom exception occurred", userContext);
            }
            System.out.println("Task completed successfully");
        }).handle((result, ex) -> {
            if (ex != null) {
                if (ex instanceof CustomException) {
                    CustomException customEx = (CustomException) ex;
                    System.out.println("Custom exception handled with user context: " + customEx.getUserContext());
                    customEx.printStackTrace();
                } else {
                    System.out.println("Other exception handled");
                    ex.printStackTrace();
                }
            }
            return null;
        });

        future.get();
    }
}

在这个示例中,我们自定义了一个 CustomException,并在异常处理逻辑中获取上下文信息。

性能考量与上下文传递

  1. 上下文传递对性能的影响 上下文传递在一定程度上会增加系统的开销。例如,每次传递上下文对象可能需要进行对象的复制或者引用传递,这会消耗一定的 CPU 和内存资源。

    在使用 InheritableThreadLocal 或者 TransmittableThreadLocal 时,由于需要对线程进行特殊处理,也会带来额外的性能开销。

    TransmittableThreadLocal 为例,它通过对 RunnableCallable 进行包装来传递上下文,这会增加对象的创建和方法调用的开销。

  2. 优化性能的方法

    • 减少不必要的上下文传递:只在真正需要上下文的异步任务中传递,避免在整个异步任务链中无意义地传递上下文。
    • 使用轻量级上下文对象:尽量减少上下文对象的大小和复杂度,以降低传递时的性能开销。
    • 合理使用线程池:通过合理配置 ForkJoinPool 或者自定义线程池,可以减少线程创建和销毁的开销,从而在一定程度上弥补上下文传递带来的性能损失。

    例如,我们可以复用一些常用的上下文对象,避免每次都创建新的对象:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class LightweightContext {
    private static final LightweightContext INSTANCE = new LightweightContext();

    private LightweightContext() {}

    public static LightweightContext getInstance() {
        return INSTANCE;
    }
}

public class PerformanceOptimizationExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        LightweightContext context = LightweightContext.getInstance();

        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Async task running with lightweight context");
        }, r -> {
            // 自定义线程池处理逻辑
            Thread thread = new Thread(r);
            thread.start();
        });

        future.get();
    }
}

在这个示例中,我们通过单例模式创建了一个轻量级的上下文对象,减少了对象创建的开销。

应用场景与案例分析

  1. Web 应用中的上下文传递 在一个 Spring Boot 的 Web 应用中,我们可以使用 CompletableFuture 进行异步任务处理,同时传递用户上下文。

    假设我们有一个用户登录功能,登录成功后需要异步更新用户的登录日志和缓存。

    示例代码如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.CompletableFuture;

@RestController
public class UserController {
    @Autowired
    private UserService userService;

    @GetMapping("/login/{userId}")
    public String login(String userId) {
        CompletableFuture.runAsync(() -> userService.updateLoginLog(userId));
        CompletableFuture.runAsync(() -> userService.updateUserCache(userId));
        return "Login successful";
    }
}

class UserService {
    private final UserContextHolder userContextHolder;

    public UserService(UserContextHolder userContextHolder) {
        this.userContextHolder = userContextHolder;
    }

    public void updateLoginLog(String userId) {
        String userContext = userContextHolder.getContext();
        // 记录登录日志,使用用户上下文
        System.out.println("Updating login log for user " + userId + " with context " + userContext);
    }

    public void updateUserCache(String userId) {
        String userContext = userContextHolder.getContext();
        // 更新用户缓存,使用用户上下文
        System.out.println("Updating user cache for user " + userId + " with context " + userContext);
    }
}

class UserContextHolder {
    private String context;

    public void setContext(String context) {
        this.context = context;
    }

    public String getContext() {
        return context;
    }
}

在这个示例中,UserContextHolder 用于存储用户上下文信息,UserService 中的异步任务可以获取该上下文进行相应的操作。

  1. 分布式系统中的上下文传递 在分布式系统中,CompletableFuture 也可以用于异步任务处理,并且上下文传递更为重要。

    假设我们有一个微服务架构,其中一个订单服务需要调用库存服务和物流服务进行订单处理。

    示例代码如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class OrderService {
    private final InventoryService inventoryService;
    private final LogisticsService logisticsService;
    private final String traceId;

    public OrderService(InventoryService inventoryService, LogisticsService logisticsService, String traceId) {
        this.inventoryService = inventoryService;
        this.logisticsService = logisticsService;
        this.traceId = traceId;
    }

    public void processOrder(String orderId) {
        CompletableFuture<Void> inventoryFuture = CompletableFuture.runAsync(() -> inventoryService.updateInventory(orderId, traceId));
        CompletableFuture<Void> logisticsFuture = CompletableFuture.runAsync(() -> logisticsService.dispatchOrder(orderId, traceId));

        CompletableFuture.allOf(inventoryFuture, logisticsFuture).thenRun(() -> {
            System.out.println("Order " + orderId + " processed successfully with trace id " + traceId);
        }).exceptionally(ex -> {
            System.out.println("Error processing order " + orderId + " with trace id " + traceId);
            ex.printStackTrace();
            return null;
        });
    }
}

class InventoryService {
    public void updateInventory(String orderId, String traceId) {
        // 更新库存逻辑,使用 traceId 进行日志记录等
        System.out.println("Updating inventory for order " + orderId + " with trace id " + traceId);
    }
}

class LogisticsService {
    public void dispatchOrder(String orderId, String traceId) {
        // 安排物流逻辑,使用 traceId 进行日志记录等
        System.out.println("Dispatching order " + orderId + " with trace id " + traceId);
    }
}

在这个示例中,traceId 作为上下文信息在不同的微服务异步任务中传递,用于跟踪订单处理流程。

总结 CompletableFuture 异步任务执行上下文传递的要点

  1. 多种上下文传递方式:我们介绍了 InheritableThreadLocal、自定义上下文传递机制、结合 ForkJoinPool 特性、自定义 Executor 以及使用 TransmittableThreadLocal 等多种方式来实现 CompletableFuture 异步任务中的上下文传递。每种方式都有其优缺点和适用场景,需要根据具体需求选择合适的方式。
  2. 异常处理与上下文:在异步任务异常处理过程中,要确保上下文信息能够正确传递,以便更好地定位和处理问题。通过 CompletableFuture 的异常处理方法以及自定义异常处理逻辑,可以有效地实现这一点。
  3. 性能与优化:上下文传递会带来一定的性能开销,通过减少不必要的上下文传递、使用轻量级上下文对象以及合理使用线程池等方法,可以优化性能,在保证功能的同时提高系统的运行效率。
  4. 实际应用场景:在 Web 应用和分布式系统等实际场景中,上下文传递对于实现业务逻辑、跟踪和调试系统非常重要。通过具体的案例分析,我们展示了如何在不同场景下应用上下文传递技术。

希望通过本文的介绍,读者能够深入理解 JavaCompletableFuture 异步任务执行上下文传递的相关知识,并在实际项目中灵活运用,提升系统的异步处理能力和可维护性。