MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java中的CompletableFuture异步编程

2023-12-253.9k 阅读

Java 异步编程的演进

在早期的 Java 编程中,实现异步操作主要依赖于 Thread 类和 Runnable 接口。开发者需要手动创建线程实例,然后调用 start 方法启动线程,如下代码示例:

public class TraditionalAsyncExample {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            System.out.println("异步任务开始执行");
            // 模拟一些耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务执行完毕");
        });
        thread.start();
        System.out.println("主线程继续执行");
    }
}

这种方式虽然简单直接,但存在一些问题。例如,线程的创建和销毁开销较大,如果频繁创建和销毁线程,会影响系统性能。而且,管理多个线程的并发控制,如线程同步、死锁避免等,变得非常复杂。

后来,Java 引入了线程池的概念,通过 ExecutorService 接口及其实现类来管理线程。使用线程池可以复用线程,减少线程创建和销毁的开销。下面是使用 ExecutorService 的示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.submit(() -> {
            System.out.println("异步任务开始执行");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务执行完毕");
        });
        executorService.shutdown();
        System.out.println("主线程继续执行");
    }
}

虽然 ExecutorService 解决了线程创建和销毁的性能问题,但在处理异步任务的结果时,仍然不够方便。Future 接口在一定程度上解决了获取异步任务结果的问题,但它有局限性,比如 get 方法会阻塞主线程,直到异步任务完成。

CompletableFuture 概述

CompletableFuture 是 Java 8 引入的一个强大的类,它实现了 FutureCompletionStage 接口。CompletableFuture 不仅可以异步执行任务,还能方便地处理异步任务的结果,支持链式调用、并行处理等高级特性,极大地简化了异步编程。

CompletableFuture 的核心特性包括:

  1. 异步执行任务:可以在不阻塞主线程的情况下执行耗时操作。
  2. 处理异步任务结果:提供了多种方法来处理异步任务完成后的结果,如 thenApplythenAccept 等。
  3. 链式调用:允许将多个异步操作串联起来,形成一个处理链。
  4. 错误处理:提供了专门的方法来处理异步任务执行过程中发生的异常,如 exceptionally
  5. 并行处理:支持多个 CompletableFuture 并行执行,并合并结果。

创建 CompletableFuture

  1. 使用 supplyAsync 创建有返回值的异步任务 CompletableFuture 提供了 supplyAsync 静态方法来创建一个异步任务,该任务有返回值。示例如下:
import java.util.concurrent.CompletableFuture;

public class SupplyAsyncExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "异步任务执行结果";
        });
        future.thenAccept(System.out::println).join();
        System.out.println("主线程继续执行");
    }
}

在上述代码中,supplyAsync 方法接收一个 Supplier 作为参数,该 Supplier 定义了异步任务的具体逻辑。任务执行完成后,通过 thenAccept 方法处理返回结果,join 方法用于等待异步任务完成并获取结果。

  1. 使用 runAsync 创建无返回值的异步任务 runAsync 方法用于创建一个无返回值的异步任务。示例如下:
import java.util.concurrent.CompletableFuture;

public class RunAsyncExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("无返回值的异步任务执行完毕");
        });
        future.join();
        System.out.println("主线程继续执行");
    }
}

这里 runAsync 方法接收一个 Runnable 作为参数,任务执行完毕后没有返回值,通过 join 方法等待任务完成。

处理 CompletableFuture 的结果

  1. 使用 thenApply 转换结果 thenApply 方法用于在异步任务完成后,对其返回结果进行转换。示例如下:
import java.util.concurrent.CompletableFuture;

public class ThenApplyExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> 10)
               .thenApply(result -> result * 2)
               .thenAccept(System.out::println)
               .join();
        System.out.println("主线程继续执行");
    }
}

在上述代码中,首先 supplyAsync 创建一个异步任务返回 10,然后通过 thenApply 将结果乘以 2,最后通过 thenAccept 打印转换后的结果。

  1. 使用 thenAccept 消费结果 thenAccept 方法用于在异步任务完成后,消费其返回结果,但不返回新的结果。示例如下:
import java.util.concurrent.CompletableFuture;

public class ThenAcceptExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Hello, CompletableFuture!")
               .thenAccept(System.out::println)
               .join();
        System.out.println("主线程继续执行");
    }
}

这里 supplyAsync 返回一个字符串,thenAccept 直接将该字符串打印出来。

  1. 使用 thenRun 执行后续任务 thenRun 方法用于在异步任务完成后,执行一个无参数、无返回值的后续任务。示例如下:
import java.util.concurrent.CompletableFuture;

public class ThenRunExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "任务完成")
               .thenRun(() -> System.out.println("后续任务执行"))
               .join();
        System.out.println("主线程继续执行");
    }
}

supplyAsync 完成后,thenRun 定义的后续任务会被执行。

链式调用

CompletableFuture 支持链式调用,使得多个异步操作可以串联起来,形成一个清晰的处理流程。例如,假设有一个需求,先获取用户信息,然后根据用户信息获取用户订单,最后统计订单数量。示例代码如下:

import java.util.concurrent.CompletableFuture;

class User {
    private String name;

    public User(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

class Order {
    private int orderId;

    public Order(int orderId) {
        this.orderId = orderId;
    }

    public int getOrderId() {
        return orderId;
    }
}

public class ChainingExample {
    public static CompletableFuture<User> getUser() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new User("John");
        });
    }

    public static CompletableFuture<Order[]> getOrders(User user) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Order[]{new Order(1), new Order(2)};
        });
    }

    public static CompletableFuture<Integer> countOrders(Order[] orders) {
        return CompletableFuture.supplyAsync(() -> orders.length);
    }

    public static void main(String[] args) {
        getUser()
               .thenCompose(ChainingExample::getOrders)
               .thenCompose(ChainingExample::countOrders)
               .thenAccept(System.out::println)
               .join();
        System.out.println("主线程继续执行");
    }
}

在上述代码中,getUser 方法返回一个 CompletableFuture<User>thenCompose 方法用于将前一个 CompletableFuture 的结果作为参数传递给下一个 CompletableFuture 的生成方法。通过链式调用,实现了复杂的异步业务逻辑。

错误处理

  1. 使用 exceptionally 处理异常 exceptionally 方法用于在异步任务执行过程中发生异常时,提供一个默认的处理逻辑。示例如下:
import java.util.concurrent.CompletableFuture;

public class ExceptionallyExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("模拟异常");
            }
            return "正常结果";
        })
               .exceptionally(ex -> {
                    System.out.println("捕获到异常: " + ex.getMessage());
                    return "默认结果";
                })
               .thenAccept(System.out::println)
               .join();
        System.out.println("主线程继续执行");
    }
}

在上述代码中,supplyAsync 内部可能会抛出异常,exceptionally 捕获到异常后返回一个默认结果。

  1. 使用 handle 同时处理结果和异常 handle 方法可以同时处理异步任务的正常结果和异常情况。示例如下:
import java.util.concurrent.CompletableFuture;

public class HandleExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("模拟异常");
            }
            return "正常结果";
        })
               .handle((result, ex) -> {
                    if (ex != null) {
                        System.out.println("捕获到异常: " + ex.getMessage());
                        return "默认结果";
                    }
                    return result;
                })
               .thenAccept(System.out::println)
               .join();
        System.out.println("主线程继续执行");
    }
}

handle 方法接收一个 BiFunction,它的两个参数分别是异步任务的结果和可能发生的异常。通过判断 ex 是否为 null,可以分别处理正常情况和异常情况。

并行处理

  1. 使用 allOf 等待所有任务完成 allOf 方法用于等待所有给定的 CompletableFuture 都完成。示例如下:
import java.util.concurrent.CompletableFuture;

public class AllOfExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务 1 完成";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务 2 完成";
        });
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
        allFutures.join();
        try {
            System.out.println(future1.get());
            System.out.println(future2.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("所有任务完成,主线程继续执行");
    }
}

在上述代码中,allOf 方法接收多个 CompletableFuture,返回一个新的 CompletableFuture<Void>,当所有传入的 CompletableFuture 都完成时,这个新的 CompletableFuture 才完成。

  1. 使用 anyOf 等待任一任务完成 anyOf 方法用于等待给定的 CompletableFuture 中任一任务完成。示例如下:
import java.util.concurrent.CompletableFuture;

public class AnyOfExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务 1 完成";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务 2 完成";
        });
        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
        anyFuture.join();
        try {
            System.out.println(anyFuture.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("任一任务完成,主线程继续执行");
    }
}

anyOf 方法返回的 CompletableFuture 在任一传入的 CompletableFuture 完成时就完成,其结果是第一个完成的 CompletableFuture 的结果。

CompletableFuture 的实现原理

CompletableFuture 的实现基于 Fork/Join 框架和 CAS(Compare - and - Swap)操作。它内部维护了一个状态变量来表示任务的执行状态,如未开始、进行中、已完成、已异常等。

  1. 任务执行CompletableFuture 使用线程池(默认是 ForkJoinPool.commonPool())来执行异步任务。当调用 supplyAsyncrunAsync 时,任务会被提交到线程池执行。

  2. 结果处理:任务完成后,会通过 postComplete 方法来触发后续的结果处理逻辑。postComplete 方法会检查是否有依赖于该任务结果的其他 CompletableFuture,如果有,则会将这些 CompletableFuture 加入到队列中等待执行。

  3. 链式调用实现:链式调用是通过 CompletionStage 接口的方法实现的。每个 thenXxx 方法都会创建一个新的 CompletableFuture,并将其与前一个 CompletableFuture 关联起来。当前一个 CompletableFuture 完成时,会自动触发后续 CompletableFuture 的执行。

  4. 错误处理实现:异常处理是通过在任务执行过程中捕获异常,并将异常信息存储在 CompletableFuture 的内部状态中。exceptionallyhandle 等方法会检查这个异常状态,并根据相应的逻辑进行处理。

性能优化与注意事项

  1. 线程池使用:虽然 CompletableFuture 默认使用 ForkJoinPool.commonPool(),但在一些场景下,可能需要自定义线程池。例如,当任务类型不同,对线程资源需求差异较大时,自定义线程池可以更好地控制资源分配,提高性能。
  2. 避免阻塞:尽量避免在 CompletableFuture 的处理链中使用阻塞方法,如 get 方法。如果必须使用,也要确保在合适的时机调用,以免影响异步编程的优势。
  3. 异常处理:在异步任务中,要妥善处理异常,避免异常在处理链中被忽略,导致程序出现难以调试的问题。可以通过全局的异常处理器来捕获和处理所有未处理的异常。
  4. 内存管理:注意 CompletableFuture 可能产生的内存开销。如果创建了大量的 CompletableFuture,并且长时间持有它们,可能会导致内存泄漏。及时释放不再使用的 CompletableFuture 实例。

在实际应用中,根据具体业务场景合理使用 CompletableFuture 的各种特性,可以显著提高程序的性能和响应性,让异步编程变得更加简洁和高效。通过深入理解其原理和注意事项,可以更好地发挥 CompletableFuture 的优势,编写出高质量的异步代码。