MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java中CompletableFuture的异步编程技巧

2022-03-136.8k 阅读

Java 异步编程简介

在现代软件开发中,异步编程已经成为提高应用程序性能和响应性的关键技术之一。传统的同步编程模型中,代码按照顺序依次执行,一个任务执行完后才会执行下一个任务。这在处理 I/O 密集型操作(如网络请求、文件读取等)时会导致线程长时间阻塞,降低了系统的整体效率。而异步编程允许程序在执行耗时操作时不阻塞主线程,从而可以同时处理其他任务,提高了系统的并发处理能力。

Java 提供了多种异步编程的方式,早期主要通过 Thread 类和 Runnable 接口来创建和管理线程,但这种方式较为繁琐,需要手动处理线程的创建、启动、同步等问题。后来,Java 引入了线程池(如 ExecutorService)来管理线程资源,简化了线程的使用。随着 Java 8 的发布,CompletableFuture 类的出现进一步简化和增强了异步编程模型,它结合了 FutureCompletionStage 的功能,提供了更强大、灵活且易于使用的异步编程方式。

CompletableFuture 基础

1. CompletableFuture 概述

CompletableFuture 类实现了 FutureCompletionStage 接口,它代表一个异步计算的结果。与传统的 Future 不同,CompletableFuture 可以在计算完成时自动触发后续的操作,并且支持链式调用、组合多个异步操作等功能。这使得异步编程更加流畅和易于理解。

2. 创建 CompletableFuture

  • 使用 CompletableFuture.supplyAsync 创建有返回值的异步任务

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureExample {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                // 模拟一个耗时操作
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Hello, CompletableFuture!";
            });
            String result = future.get();
            System.out.println(result);
        }
    }
    

    在上述代码中,CompletableFuture.supplyAsync 方法接受一个 Supplier 接口的实现,它会在一个新的线程中执行 Supplierget 方法,并返回一个 CompletableFuture 对象,该对象的结果就是 Supplier 的返回值。通过 future.get() 方法可以获取异步操作的结果,但这个方法会阻塞当前线程,直到异步操作完成。

  • 使用 CompletableFuture.runAsync 创建无返回值的异步任务

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class CompletableFutureExample2 {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                // 模拟一个耗时操作
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Async task completed without return value.");
            });
            future.get();
        }
    }
    

    CompletableFuture.runAsync 方法接受一个 Runnable 接口的实现,同样会在新线程中执行 Runnablerun 方法,但返回的 CompletableFuture 对象的结果类型为 Void,因为 Runnable 没有返回值。

3. 获取 CompletableFuture 的结果

  • 使用 get 方法获取结果(阻塞方式): 如前面示例中使用的 future.get() 方法,它会阻塞当前线程,直到 CompletableFuture 完成计算并返回结果。如果异步操作抛出异常,get 方法会将异常包装成 ExecutionExceptionInterruptedException 重新抛出。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("Simulated exception");
    });
    try {
        String result = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    在这个例子中,异步任务抛出了一个运行时异常,get 方法捕获到这个异常并重新包装抛出。

  • 使用 get(long timeout, TimeUnit unit) 方法设置获取结果的超时时间

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureTimeoutExample {
        public static void main(String[] args) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "Result after 3 seconds";
            });
            try {
                String result = future.get(2, TimeUnit.SECONDS);
                System.out.println(result);
            } catch (InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    

    在上述代码中,future.get(2, TimeUnit.SECONDS) 方法设置了获取结果的超时时间为 2 秒。如果在 2 秒内异步任务没有完成,就会抛出 TimeoutException

  • 使用 join 方法获取结果join 方法与 get 方法类似,也是用于获取 CompletableFuture 的结果,但它不会抛出受检异常(InterruptedExceptionExecutionException)。如果异步操作抛出异常,join 方法会直接抛出未经检查的 CompletionException,该异常包装了原始的异常。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("Simulated exception");
    });
    try {
        String result = future.join();
    } catch (CompletionException e) {
        e.printStackTrace();
    }
    

CompletableFuture 的链式调用

1. thenApply 方法

thenApply 方法用于在 CompletableFuture 完成后,对其结果进行转换。它接受一个 Function 接口的实现,该 Function 的输入是前一个 CompletableFuture 的结果,输出是新的 CompletableFuture 的结果。

CompletableFuture.supplyAsync(() -> "Hello")
      .thenApply(s -> s + ", World")
      .thenApply(String::toUpperCase)
      .thenAccept(System.out::println);

在这个例子中,首先通过 CompletableFuture.supplyAsync 创建一个异步任务,返回字符串 "Hello"。然后通过 thenApply 方法将字符串转换为 "Hello, World",接着再将其转换为大写 "HELLO, WORLD",最后通过 thenAccept 方法将结果打印出来。thenAccept 方法与 thenApply 类似,但它接受的是一个 Consumer 接口,不返回新的结果,仅用于消费前一个 CompletableFuture 的结果。

2. thenCompose 方法

thenCompose 方法用于将两个 CompletableFuture 连接起来,前一个 CompletableFuture 的结果作为后一个 CompletableFuture 的输入。与 thenApply 不同的是,thenApply 中传入的 Function 返回的是一个普通值,而 thenCompose 中传入的 Function 返回的是另一个 CompletableFuture

CompletableFuture.supplyAsync(() -> 10)
      .thenCompose(num -> CompletableFuture.supplyAsync(() -> num * 2))
      .thenAccept(System.out::println);

在上述代码中,第一个 CompletableFuture 返回值 10,thenCompose 方法中的 Function 使用这个 10 创建并返回一个新的 CompletableFuture,该 CompletableFuture 的结果是 10 * 2 = 20,最后将结果 20 打印出来。

3. thenCombine 方法

thenCombine 方法用于将两个 CompletableFuture 的结果合并成一个新的结果。它接受另一个 CompletableFuture 和一个 BiFunctionBiFunction 的两个输入分别是这两个 CompletableFuture 的结果,输出是新的 CompletableFuture 的结果。

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (a, b) -> a + b);
combinedFuture.thenAccept(System.out::println);

在这个例子中,future1 返回 5,future2 返回 3,thenCombine 方法通过 BiFunction 将这两个结果相加,得到 8 并作为新的 CompletableFuture 的结果,最后打印出来。

CompletableFuture 的错误处理

1. exceptionally 方法

exceptionally 方法用于处理 CompletableFuture 执行过程中抛出的异常。它接受一个 Function,当 CompletableFuture 抛出异常时,这个 Function 会被调用,其输入是异常对象,输出是一个替代结果。

CompletableFuture.supplyAsync(() -> {
      if (Math.random() < 0.5) {
          throw new RuntimeException("Simulated exception");
      }
      return "Success";
  })
  .exceptionally(ex -> {
      System.out.println("Caught exception: " + ex.getMessage());
      return "Default value";
  })
  .thenAccept(System.out::println);

在上述代码中,异步任务有 50% 的概率抛出异常。如果抛出异常,exceptionally 方法中的 Function 会被调用,打印异常信息并返回 "Default value"。如果没有抛出异常,则返回正常的结果 "Success"。

2. handle 方法

handle 方法既可以处理正常的结果,也可以处理异常情况。它接受一个 BiFunction,第一个参数是正常的结果(如果有异常则为 null),第二个参数是异常对象(如果没有异常则为 null),返回值是新的 CompletableFuture 的结果。

CompletableFuture.supplyAsync(() -> {
      if (Math.random() < 0.5) {
          throw new RuntimeException("Simulated exception");
      }
      return "Success";
  })
  .handle((result, ex) -> {
      if (ex != null) {
          System.out.println("Caught exception: " + ex.getMessage());
          return "Default value";
      }
      return result;
  })
  .thenAccept(System.out::println);

这里 handle 方法的功能与 exceptionally 方法类似,但更加灵活,因为它可以同时处理正常结果和异常情况。

CompletableFuture 的并行处理

1. allOf 方法

allOf 方法用于等待多个 CompletableFuture 都完成。它接受多个 CompletableFuture 作为参数,并返回一个新的 CompletableFuture。只有当所有传入的 CompletableFuture 都完成时,这个新的 CompletableFuture 才会完成,其结果为 Void

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
      try {
          Thread.sleep(2000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      return "Result1";
  });
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
      try {
          Thread.sleep(1000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      return "Result2";
  });
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
allFuture.join();
try {
      String result1 = future1.get();
      String result2 = future2.get();
      System.out.println(result1 + ", " + result2);
  } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
  }

在这个例子中,future1future2 是两个异步任务,CompletableFuture.allOf 方法创建了一个新的 CompletableFuture allFuture,当 future1future2 都完成后,allFuture 才完成。通过 allFuture.join() 等待所有任务完成,然后分别获取 future1future2 的结果并打印。

2. anyOf 方法

anyOf 方法用于等待多个 CompletableFuture 中的任意一个完成。它接受多个 CompletableFuture 作为参数,并返回一个新的 CompletableFuture。只要有一个传入的 CompletableFuture 完成,这个新的 CompletableFuture 就会完成,其结果就是第一个完成的 CompletableFuture 的结果。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
      try {
          Thread.sleep(2000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      return "Result1";
  });
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
      try {
          Thread.sleep(1000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      return "Result2";
  });
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
try {
      Object result = anyFuture.get();
      System.out.println("First completed result: " + result);
  } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
  }

在上述代码中,future2 会先于 future1 完成,所以 anyFuture 的结果就是 future2 的结果 "Result2",并将其打印出来。

CompletableFuture 与线程池

在前面的示例中,CompletableFuture 的异步任务默认使用 ForkJoinPool.commonPool() 线程池来执行。然而,在实际应用中,根据不同的需求,我们可能需要使用自定义的线程池。

1. 使用自定义线程池

可以通过 CompletableFuture.supplyAsync(Supplier<T> supplier, Executor executor)CompletableFuture.runAsync(Runnable runnable, Executor executor) 方法来指定使用的线程池。

import java.util.concurrent.*;

public class CompletableFutureWithCustomExecutor {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Result from custom executor";
        }, executor);
        try {
            String result = future.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

在这个例子中,我们创建了一个固定大小为 5 的线程池 executor,并通过 CompletableFuture.supplyAsync 的第二个参数指定使用这个线程池来执行异步任务。最后在程序结束时,调用 executor.shutdown() 关闭线程池。

2. 线程池的选择与调优

选择合适的线程池类型和参数对于应用程序的性能至关重要。例如,对于 I/O 密集型任务,通常可以选择较大的线程池大小,因为线程在等待 I/O 操作时不会占用 CPU 资源,更多的线程可以提高并发处理能力。而对于 CPU 密集型任务,线程池大小应接近 CPU 核心数,以避免过多的线程上下文切换开销。此外,还可以考虑使用 ThreadPoolExecutor 来更精细地控制线程池的行为,如设置核心线程数、最大线程数、任务队列容量等参数。

CompletableFuture 的应用场景

1. 并行数据处理

在处理大量数据时,可以将数据分成多个部分,使用 CompletableFuture 并行处理这些部分,然后将结果合并。例如,在数据分析场景中,对一个大的数据集进行分组统计,可以并行处理每个分组,最后汇总结果。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class ParallelDataProcessing {
    public static void main(String[] args) {
        List<Integer> data = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            data.add(i);
        }
        ExecutorService executor = Executors.newFixedThreadPool(4);
        List<CompletableFuture<Integer>> futures = new ArrayList<>();
        int chunkSize = data.size() / 4;
        for (int i = 0; i < 4; i++) {
            int start = i * chunkSize;
            int end = (i == 3)? data.size() : (i + 1) * chunkSize;
            List<Integer> subData = data.subList(start, end);
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                int sum = 0;
                for (int num : subData) {
                    sum += num;
                }
                return sum;
            }, executor);
            futures.add(future);
        }
        CompletableFuture<Integer> totalFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
              .thenApply(v -> {
                    int total = 0;
                    for (CompletableFuture<Integer> future : futures) {
                        try {
                            total += future.get();
                        } catch (InterruptedException | ExecutionException e) {
                            e.printStackTrace();
                        }
                    }
                    return total;
                });
        try {
            int total = totalFuture.get();
            System.out.println("Total sum: " + total);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

在这个例子中,我们将一个包含 1000 个整数的列表分成 4 个部分,使用 4 个线程并行计算每个部分的和,最后汇总得到总和。

2. 异步 I/O 操作

在进行网络请求、文件读取等 I/O 操作时,CompletableFuture 可以避免主线程阻塞,提高系统的响应性。例如,在一个 Web 应用中,同时发起多个 HTTP 请求获取不同的数据,然后将这些数据合并后返回给客户端。

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AsyncIOExample {
    public static String sendRequest(String urlStr) {
        try {
            URL url = new URL(urlStr);
            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
            connection.setRequestMethod("GET");
            int responseCode = connection.getResponseCode();
            if (responseCode == HttpURLConnection.HTTP_OK) {
                java.io.BufferedReader in = new java.io.BufferedReader(
                        new java.io.InputStreamReader(connection.getInputStream()));
                String inputLine;
                StringBuilder response = new StringBuilder();
                while ((inputLine = in.readLine()) != null) {
                    response.append(inputLine);
                }
                in.close();
                return response.toString();
            } else {
                return "Request failed with response code: " + responseCode;
            }
        } catch (IOException e) {
            e.printStackTrace();
            return "IOException occurred";
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> sendRequest("http://example.com/api1"));
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> sendRequest("http://example.com/api2"));
        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
            return "Combined result: " + result1 + ", " + result2;
        });
        String result = combinedFuture.get();
        System.out.println(result);
    }
}

在这个例子中,通过 CompletableFuture.supplyAsync 发起两个异步的 HTTP 请求,然后使用 thenCombine 方法将两个请求的结果合并。

3. 微服务架构中的异步调用

在微服务架构中,一个业务操作可能需要调用多个不同的微服务。使用 CompletableFuture 可以并行调用这些微服务,减少整体的响应时间。例如,一个订单处理系统,在处理订单时需要同时调用库存微服务检查库存、用户微服务验证用户信息等。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class MicroserviceAsyncCall {
    public static String callInventoryService() {
        // 模拟调用库存微服务
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Inventory check passed";
    }

    public static String callUserService() {
        // 模拟调用用户微服务
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "User verification passed";
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> inventoryFuture = CompletableFuture.supplyAsync(() -> callInventoryService());
        CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> callUserService());
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(inventoryFuture, userFuture);
        allFuture.join();
        try {
            String inventoryResult = inventoryFuture.get();
            String userResult = userFuture.get();
            System.out.println("Order processing: " + inventoryResult + ", " + userResult);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,并行调用库存微服务和用户微服务,等待两个调用都完成后,继续订单处理流程。

通过以上对 CompletableFuture 的详细介绍和各种示例,我们可以看到它在 Java 异步编程中提供了强大而灵活的功能,能够有效地提高应用程序的性能和响应性,适用于多种不同的应用场景。在实际开发中,合理运用 CompletableFuture 可以使代码更加简洁、高效,并且易于维护。