MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture组合操作提升异步任务处理效率

2022-02-142.0k 阅读

Java CompletableFuture组合操作基础

CompletableFuture概述

在Java的并发编程领域,CompletableFuture是一个强大的类,它在Java 8中被引入。CompletableFuture实现了Future接口和CompletionStage接口,这使得它既能表示一个异步计算的结果,又能支持在计算完成时触发的回调操作。Future接口在Java早期就已存在,它提供了一种获取异步任务执行结果的方式,但存在一些局限性,比如在获取结果时可能会阻塞主线程,并且缺乏对异步任务组合和链式调用的支持。而CompletableFuture弥补了这些不足,允许我们以更灵活、更高效的方式处理异步任务。

创建CompletableFuture实例

  1. 使用CompletableFuture.supplyAsync创建有返回值的异步任务

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务完成";
    });
    

    在上述代码中,supplyAsync方法接受一个Supplier作为参数,它会在一个新的线程中执行这个Supplierget方法,并返回一个CompletableFuture实例,该实例最终会包含Supplier返回的结果。

  2. 使用CompletableFuture.runAsync创建无返回值的异步任务

    CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("无返回值任务执行完成");
    });
    

    runAsync方法接受一个Runnable作为参数,它同样会在新线程中执行Runnablerun方法,但返回的CompletableFuture的泛型类型为Void,因为该任务没有返回值。

获取CompletableFuture的结果

  1. 使用get方法获取结果(可能阻塞)

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "结果";
    });
    try {
        String result = future.get();
        System.out.println("获取到的结果: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    get方法会阻塞调用线程,直到CompletableFuture完成并返回结果。如果在等待过程中线程被中断,会抛出InterruptedException;如果任务执行过程中抛出异常,会抛出ExecutionException

  2. 使用get(long timeout, TimeUnit unit)方法设置超时获取结果

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "延迟结果";
    });
    try {
        String result = future3.get(3, TimeUnit.SECONDS);
        System.out.println("获取到的结果: " + result);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        if (e instanceof TimeoutException) {
            System.out.println("获取结果超时");
        } else {
            e.printStackTrace();
        }
    }
    

    这个版本的get方法允许设置一个超时时间。如果在指定的时间内CompletableFuture没有完成,会抛出TimeoutException

  3. 使用join方法获取结果(不推荐在非异步代码块中使用)

    CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> "快速结果");
    String result = future4.join();
    System.out.println("使用join获取到的结果: " + result);
    

    join方法和get方法类似,但它不会抛出受检异常。如果任务执行过程中抛出异常,join会将异常包装成CompletionException并抛出。在非异步代码块中使用join可能会导致难以排查的错误,因为它不会像get那样明确抛出InterruptedExceptionExecutionException

CompletableFuture的组合操作

顺序执行任务

  1. 使用thenApply方法对结果进行转换 thenApply方法接受一个Function作为参数,当CompletableFuture完成时,会将其结果作为参数传递给这个Function,并返回一个新的CompletableFuture,新的CompletableFuture的结果是Function的返回值。

    CompletableFuture.supplyAsync(() -> "Hello")
                     .thenApply(s -> s + ", World")
                     .thenApply(String::toUpperCase)
                     .thenAccept(System.out::println);
    

    在上述代码中,首先异步执行一个返回"Hello"的任务,然后通过thenApply将结果转换为"Hello, World",接着又将其转换为大写形式"HELLO, WORLD",最后通过thenAccept消费这个最终结果并打印出来。

  2. 使用thenCompose方法进行任务链接 thenCompose方法和thenApply类似,但它接受的是一个返回CompletableFutureFunction。这使得我们可以将多个异步任务链接起来,前一个任务的结果作为后一个异步任务的输入。

    CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> "1")
                                                         .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "2"))
                                                         .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "3"));
    try {
        String result5 = future5.get();
        System.out.println("thenCompose结果: " + result5);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    这里先异步返回"1",然后将"1"作为输入,异步返回"12",最后再将"12"作为输入,异步返回"123"。

  3. 使用thenRun方法执行无返回值的后续任务 thenRun方法接受一个Runnable作为参数,当CompletableFuture完成时,会执行这个Runnable,但不会处理CompletableFuture的结果。

    CompletableFuture.supplyAsync(() -> "任务完成")
                     .thenRun(() -> System.out.println("后续无返回值任务执行"));
    

    首先异步返回"任务完成",然后执行thenRun中的Runnable,打印出"后续无返回值任务执行"。

并行执行任务

  1. 使用CompletableFuture.allOf方法等待所有任务完成 allOf方法接受多个CompletableFuture作为参数,它返回一个新的CompletableFuture,只有当所有传入的CompletableFuture都完成时,这个新的CompletableFuture才会完成。

    CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务1完成";
    });
    CompletableFuture<String> future7 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务2完成";
    });
    CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future6, future7);
    allOfFuture.join();
    try {
        String result6 = future6.get();
        String result7 = future7.get();
        System.out.println(result6);
        System.out.println(result7);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    在上述代码中,future6future7并行执行,allOfFuture会在future6future7都完成后完成。通过join等待allOfFuture完成,然后可以获取future6future7的结果。

  2. 使用CompletableFuture.anyOf方法只要有一个任务完成就返回 anyOf方法同样接受多个CompletableFuture作为参数,它返回一个新的CompletableFuture,只要传入的CompletableFuture中有一个完成,这个新的CompletableFuture就会完成,并且其结果就是第一个完成的CompletableFuture的结果。

    CompletableFuture<String> future8 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务3完成";
    });
    CompletableFuture<String> future9 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务4完成";
    });
    CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future8, future9);
    try {
        Object result = anyOfFuture.get();
        System.out.println("anyOf结果: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    这里future9会先于future8完成,所以anyOfFuture的结果就是future9的结果"任务4完成"。

处理异常

  1. 使用exceptionally方法处理异常 exceptionally方法接受一个Function作为参数,当CompletableFuture在执行过程中抛出异常时,会调用这个Function,并将异常作为参数传递给它。Function的返回值会作为新的CompletableFuture的结果。

    CompletableFuture<String> future10 = CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("模拟异常");
        }
        return "正常结果";
    }).exceptionally(e -> {
        System.out.println("捕获到异常: " + e.getMessage());
        return "异常处理结果";
    });
    try {
        String result = future10.get();
        System.out.println("最终结果: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    在上述代码中,如果CompletableFuture执行过程中抛出异常,会在exceptionally中捕获并处理,返回"异常处理结果"。如果没有异常,则返回正常的结果。

  2. 使用handle方法同时处理正常结果和异常 handle方法接受一个BiFunction作为参数,第一个参数是CompletableFuture的结果(如果正常完成),第二个参数是异常(如果有异常发生)。无论CompletableFuture是正常完成还是抛出异常,都会调用这个BiFunction,并返回一个新的CompletableFuture,其结果是BiFunction的返回值。

    CompletableFuture<String> future11 = CompletableFuture.supplyAsync(() -> {
        if (Math.random() < 0.5) {
            throw new RuntimeException("模拟异常2");
        }
        return "正常结果2";
    }).handle((result, e) -> {
        if (e != null) {
            System.out.println("捕获到异常: " + e.getMessage());
            return "异常处理结果2";
        } else {
            return result + " 处理后";
        }
    });
    try {
        String finalResult = future11.get();
        System.out.println("handle最终结果: " + finalResult);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    这里handle方法既可以处理正常结果,对其进行进一步处理,也可以在有异常时捕获并处理异常,返回相应的处理结果。

CompletableFuture组合操作的实际应用场景

Web服务调用优化

在一个Web应用中,可能需要调用多个外部API来获取数据并进行处理。比如,一个电商应用需要从库存服务获取商品库存信息,从价格服务获取商品价格信息,然后根据这两个信息计算商品的总价值。

CompletableFuture<Integer> stockFuture = CompletableFuture.supplyAsync(() -> {
    // 模拟调用库存服务
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
});
CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> {
    // 模拟调用价格服务
    try {
        Thread.sleep(1500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 100.0;
});
CompletableFuture<Double> totalValueFuture = CompletableFuture.allOf(stockFuture, priceFuture)
                                                              .thenApply(v -> {
                                                                  try {
                                                                      int stock = stockFuture.get();
                                                                      double price = priceFuture.get();
                                                                      return stock * price;
                                                                  } catch (InterruptedException | ExecutionException e) {
                                                                      e.printStackTrace();
                                                                      return 0.0;
                                                                  }
                                                              });
try {
    double totalValue = totalValueFuture.get();
    System.out.println("商品总价值: " + totalValue);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在这个例子中,通过CompletableFuture并行调用库存服务和价格服务,然后使用allOf等待两个服务都返回结果后,计算商品的总价值。这样可以显著提高性能,因为两个服务调用是并行执行的,而不是顺序执行。

数据处理流水线

假设我们有一个大数据处理任务,需要对一批数据进行过滤、转换和汇总操作。

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
CompletableFuture.supplyAsync(() -> data)
                 .thenApply(list -> list.stream().filter(i -> i % 2 == 0).collect(Collectors.toList()))
                 .thenApply(list -> list.stream().map(i -> i * 2).collect(Collectors.toList()))
                 .thenApply(list -> list.stream().mapToInt(Integer::intValue).sum())
                 .thenAccept(sum -> System.out.println("汇总结果: " + sum));

在上述代码中,首先异步获取数据列表,然后通过thenApply依次进行过滤(只保留偶数)、转换(将每个数乘以2)和汇总(计算总和)操作。这种方式通过CompletableFuture的组合操作实现了数据处理的流水线,并且可以在不同的阶段并行执行,提高处理效率。

分布式系统中的任务协调

在一个分布式系统中,可能有多个节点负责不同的任务,比如一个文件上传系统,一个节点负责文件的存储,另一个节点负责生成文件的索引。

CompletableFuture<Void> storageFuture = CompletableFuture.runAsync(() -> {
    // 模拟文件存储操作
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("文件存储完成");
});
CompletableFuture<Void> indexFuture = CompletableFuture.runAsync(() -> {
    // 模拟文件索引生成操作
    try {
        Thread.sleep(2500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("文件索引生成完成");
});
CompletableFuture.allOf(storageFuture, indexFuture)
                 .thenRun(() -> System.out.println("文件上传和索引生成全部完成"))
                 .join();

这里通过CompletableFuture并行执行文件存储和索引生成任务,然后使用allOf等待两个任务都完成后,执行最终的提示操作,表示整个文件上传流程完成。在分布式系统中,这种任务协调方式可以有效提高系统的并发处理能力和整体性能。

CompletableFuture组合操作的性能考量

线程池的使用

  1. 默认线程池的局限性 CompletableFuture的异步操作默认使用ForkJoinPool.commonPool()作为线程池。这个线程池是一个共享的线程池,对于一些计算密集型的任务,如果多个CompletableFuture任务同时执行,可能会导致线程竞争,从而影响性能。例如,在一个包含大量复杂计算的CompletableFuture任务场景中,由于commonPool的线程数量有限,可能会出现任务排队等待执行的情况。
  2. 自定义线程池的优势 为了避免默认线程池的局限性,可以创建自定义线程池。
    ExecutorService executor = Executors.newFixedThreadPool(10);
    CompletableFuture<String> future12 = CompletableFuture.supplyAsync(() -> {
        // 复杂计算任务
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "复杂计算结果";
    }, executor);
    
    通过自定义线程池,可以根据任务的特性(如计算密集型、I/O密集型等)来调整线程池的参数,如线程数量、队列容量等,从而提高任务的执行效率。对于计算密集型任务,可以适当增加线程数量;对于I/O密集型任务,可以适当调整队列容量,以减少线程的空闲等待时间。

任务粒度的影响

  1. 细粒度任务的性能特点 如果将一个大任务拆分成许多细粒度的CompletableFuture任务,虽然可以提高并行度,但也会带来一些开销。每个CompletableFuture任务都需要创建、调度和管理,这会消耗一定的系统资源。例如,在处理一个大的数据集时,如果将每个数据项的处理都作为一个独立的CompletableFuture任务,虽然可以并行处理这些数据项,但由于任务创建和管理的开销,可能会导致整体性能下降。
  2. 粗粒度任务的性能特点 相反,粗粒度的CompletableFuture任务减少了任务创建和管理的开销,但可能会降低并行度。如果一个任务过于粗粒度,比如一个长时间运行的数据库查询任务作为一个CompletableFuture,在执行这个任务时,其他任务可能需要等待,无法充分利用系统的多核资源。
  3. 平衡任务粒度 为了获得最佳性能,需要在任务粒度上进行平衡。对于计算密集型的任务,可以将任务拆分成适中粒度的子任务,既能充分利用多核资源,又能控制任务管理的开销。对于I/O密集型任务,由于I/O操作本身会有等待时间,可以适当增加任务粒度,减少任务创建和管理的开销。

组合操作的复杂度

  1. 复杂组合操作的性能损耗 随着CompletableFuture组合操作的复杂度增加,性能也会受到影响。例如,在一个包含多层thenApplythenCompose等操作的链条中,每一步操作都需要一定的时间来处理和调度。此外,如果在组合操作中使用了大量的allOfanyOf,并且涉及到大量的CompletableFuture实例,会增加任务协调和同步的开销。
  2. 优化复杂组合操作 为了优化复杂组合操作的性能,可以尽量减少不必要的中间步骤,避免在组合操作中进行过于复杂的计算。同时,可以合理使用CompletableFuture的特性,比如对于一些可以并行执行的部分,尽量并行化处理,而不是顺序执行。例如,在一个需要调用多个Web服务并对结果进行复杂处理的场景中,可以先并行调用Web服务,然后再对结果进行合并和处理,而不是依次调用Web服务并处理结果。

通过对CompletableFuture组合操作的深入理解和合理应用,结合性能考量,可以在Java的异步任务处理中显著提高效率,满足各种复杂的业务需求。无论是在Web开发、大数据处理还是分布式系统等领域,CompletableFuture的组合操作都为开发者提供了强大而灵活的异步编程工具。