MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 中 CompletableFuture 异步任务线程资源管理

2023-12-244.9k 阅读

Java 中 CompletableFuture 异步任务线程资源管理

CompletableFuture 简介

在 Java 8 引入 CompletableFuture 之前,处理异步任务相对繁琐。Future 接口虽然提供了一种异步执行任务并获取结果的方式,但它有一些局限性。例如,它缺乏对异步任务完成后的链式操作支持,并且难以处理多个异步任务之间的依赖关系。CompletableFuture 弥补了这些不足,它不仅实现了 Future 接口,还提供了丰富的方法用于异步任务的组合、转换和结果处理。

CompletableFuture 允许我们以更简洁、更灵活的方式编写异步代码,使得异步编程变得更加直观。它支持同步和异步的方式获取任务结果,并且可以在任务完成时执行回调函数。这对于处理 I/O 密集型任务,如网络请求、数据库查询等,能够显著提高应用程序的性能和响应性。

基本使用

创建 CompletableFuture

  1. 使用 CompletableFuture.supplyAsync 创建有返回值的异步任务

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task completed";
    });
    

    在上述代码中,supplyAsync 方法接收一个 Supplier 函数式接口,它会在一个新的线程中执行该接口的 get 方法,并返回一个 CompletableFuture 对象,该对象最终会包含任务的执行结果。

  2. 使用 CompletableFuture.runAsync 创建无返回值的异步任务

    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task without return value completed");
    });
    

    runAsync 方法接收一个 Runnable 接口,同样会在新线程中执行 run 方法,但返回的 CompletableFuture 对象的泛型为 Void,因为任务没有返回值。

获取任务结果

  1. 使用 get 方法同步获取结果

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task completed";
    });
    try {
        String result = future.get();
        System.out.println("Result: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    get 方法会阻塞当前线程,直到异步任务完成并返回结果。如果任务执行过程中抛出异常,get 方法会将异常包装成 ExecutionException 抛出。

  2. 使用 get(long timeout, TimeUnit unit) 方法设置超时获取结果

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task completed";
    });
    try {
        String result = future.get(2, TimeUnit.SECONDS);
        System.out.println("Result: " + result);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        e.printStackTrace();
    }
    

    上述代码中,如果在 2 秒内任务没有完成,get 方法会抛出 TimeoutException

  3. 使用 join 方法同步获取结果

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task completed";
    });
    String result = future.join();
    System.out.println("Result: " + result);
    

    join 方法与 get 方法类似,也是阻塞当前线程获取结果。但不同的是,如果任务执行过程中抛出异常,join 方法会直接抛出 CompletionException,而不是 ExecutionException

  4. 使用 whenComplete 方法异步获取结果

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task completed";
    });
    future.whenComplete((result, exception) -> {
        if (exception == null) {
            System.out.println("Result: " + result);
        } else {
            exception.printStackTrace();
        }
    });
    

    whenComplete 方法接收一个 BiConsumer,当异步任务完成时(无论成功还是失败),会异步执行该 BiConsumer。它不会阻塞当前线程,适用于在任务完成后执行一些后续操作,如日志记录等。

  5. 使用 thenApply 方法处理任务结果

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task completed";
    }).thenApply(result -> result + " and processed");
    try {
        String finalResult = future.get();
        System.out.println("Final Result: " + finalResult);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    thenApply 方法接收一个 Function,它会在异步任务成功完成后,将任务的结果作为参数传递给 Function 进行处理,并返回一个新的 CompletableFuture,其结果为 Function 的返回值。

异步任务的组合

多个异步任务串行执行

  1. 使用 thenCompose 方法

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Step 1 completed";
    });
    CompletableFuture<String> future2 = future1.thenCompose(result1 -> CompletableFuture.supplyAsync(() -> {
        System.out.println(result1);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Step 2 completed";
    }));
    try {
        String finalResult = future2.get();
        System.out.println("Final Result: " + finalResult);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    thenCompose 方法接收一个 Function,该 Function 的返回值必须是一个 CompletableFuture。它会将前一个任务的结果作为参数传递给 Function,并将返回的 CompletableFuture 与当前 CompletableFuture 进行组合,形成一个新的串行异步任务链。

  2. 使用 thenApplythenAccept 组合实现串行执行

    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Step 1 completed";
    }).thenApply(result1 -> {
        System.out.println(result1);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Step 2 completed";
    }).thenAccept(finalResult -> System.out.println("Final Result: " + finalResult));
    

    这种方式通过 thenApply 依次处理任务结果,并通过 thenAccept 在最后一步进行结果的消费。但与 thenCompose 不同的是,thenApply 返回的是一个包含处理结果的 CompletableFuture,而 thenCompose 会将返回的 CompletableFuture 进行扁平化处理,使得代码更简洁,更适合处理复杂的异步任务链。

多个异步任务并行执行

  1. 使用 CompletableFuture.allOf 方法

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 1 completed";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 2 completed";
    });
    CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
    allOfFuture.join();
    try {
        String result1 = future1.get();
        String result2 = future2.get();
        System.out.println(result1);
        System.out.println(result2);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    allOf 方法接收多个 CompletableFuture 作为参数,返回一个新的 CompletableFuture。这个新的 CompletableFuture 会在所有传入的 CompletableFuture 都完成时才完成。通过调用 join 方法等待所有任务完成,然后可以分别获取每个任务的结果。

  2. 使用 CompletableFuture.anyOf 方法

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 1 completed";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 2 completed";
    });
    CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
    try {
        Object result = anyOfFuture.get();
        System.out.println("First completed task result: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    

    anyOf 方法同样接收多个 CompletableFuture 作为参数,返回的新 CompletableFuture 会在任何一个传入的 CompletableFuture 完成时就完成,其结果为第一个完成的任务的结果。

线程资源管理

默认线程池

当我们使用 CompletableFuture.supplyAsyncCompletableFuture.runAsync 方法时,如果不指定线程池,它们会使用 ForkJoinPool.commonPool() 作为默认线程池。ForkJoinPool 是 Java 7 引入的一种线程池,它采用工作窃取算法,能够有效提高多核 CPU 环境下的任务并行执行效率。

CompletableFuture.supplyAsync(() -> {
    // 任务代码
    return "Result";
});

上述代码使用默认线程池执行异步任务。然而,ForkJoinPool.commonPool() 是一个共享的线程池,可能会被其他异步任务共享使用。如果应用程序中有大量不同类型的异步任务,可能会导致线程资源竞争,影响性能。

自定义线程池

为了更好地管理线程资源,我们可以创建自定义线程池并传递给 CompletableFuture 的异步方法。

  1. 使用 Executors 创建线程池

    ExecutorService executor = Executors.newFixedThreadPool(5);
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task completed";
    }, executor);
    try {
        String result = future.get();
        System.out.println("Result: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        executor.shutdown();
    }
    

    在上述代码中,我们使用 Executors.newFixedThreadPool(5) 创建了一个固定大小为 5 的线程池,并将其传递给 supplyAsync 方法。这样,异步任务就会在这个自定义线程池中执行。注意,在使用完线程池后,需要调用 executor.shutdown() 方法关闭线程池,以避免资源泄漏。

  2. 使用 ThreadPoolExecutor 创建线程池

    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            3,
            5,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task completed";
    }, executor);
    try {
        String result = future.get();
        System.out.println("Result: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        executor.shutdown();
    }
    

    ThreadPoolExecutor 提供了更灵活的线程池配置参数。我们可以指定核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、线程存活时间(keepAliveTime)和任务队列(workQueue)等。通过合理配置这些参数,可以根据应用程序的负载情况优化线程资源的使用。

线程池参数调优

  1. 核心线程数的选择 核心线程数的选择需要考虑任务的类型。如果是 CPU 密集型任务,核心线程数一般设置为 CPU 核心数加 1。例如,对于一个 4 核 CPU 的机器,核心线程数可以设置为 5。这是因为 CPU 密集型任务几乎一直在使用 CPU,多一个线程可以在某个线程因偶尔的页缺失等原因暂停时,利用 CPU 资源。
int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
        cores + 1,
        cores + 1,
        0,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>());

如果是 I/O 密集型任务,核心线程数可以设置为 CPU 核心数的 2 倍左右。因为 I/O 密集型任务大部分时间在等待 I/O 操作完成,CPU 有空闲时间,所以可以多分配一些线程来充分利用 CPU 资源。

int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
        cores * 2,
        cores * 2,
        0,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>());
  1. 最大线程数的设置 最大线程数一般不应设置得过大,否则可能会导致系统资源耗尽。对于 I/O 密集型任务,最大线程数可以适当比核心线程数大一些,但也要根据系统的内存等资源情况来调整。例如,对于一个内存有限的系统,过多的线程可能会导致频繁的内存交换,反而降低性能。

  2. 任务队列的选择 任务队列的选择也很关键。LinkedBlockingQueue 是一个无界队列,它可以无限添加任务,但可能会导致任务堆积,占用大量内存。ArrayBlockingQueue 是一个有界队列,需要指定队列容量。如果队列满了,新的任务会根据线程池的拒绝策略进行处理。常用的拒绝策略有 AbortPolicy(默认,直接抛出异常)、CallerRunsPolicy(由调用者线程执行任务)、DiscardPolicy(丢弃任务)和 DiscardOldestPolicy(丢弃队列中最老的任务)。

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        3,
        5,
        10,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(10),
        new CallerRunsPolicy());

在上述代码中,我们使用 ArrayBlockingQueue 作为任务队列,并设置容量为 10,同时使用 CallerRunsPolicy 拒绝策略,当队列满时,新任务会由调用者线程执行。

异常处理

传统的异常处理方式

在使用 CompletableFuture 时,如果任务执行过程中抛出异常,我们可以使用传统的 try - catch 块来捕获异常。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Task failed");
});
try {
    String result = future.get();
    System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在上述代码中,异步任务抛出了 RuntimeExceptionget 方法会将其包装成 ExecutionException 抛出,我们可以在 catch 块中捕获并处理。

使用 exceptionally 方法处理异常

exceptionally 方法提供了一种更优雅的方式来处理异步任务中的异常。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Task failed");
}).exceptionally(exception -> {
    System.out.println("Caught exception: " + exception.getMessage());
    return "Default result";
});
try {
    String result = future.get();
    System.out.println("Result: " + result);
} catch (InterruptedException e) {
    e.printStackTrace();
}

exceptionally 方法接收一个 Function,当异步任务抛出异常时,会执行这个 Function,并返回一个默认值或进行异常处理。在上述代码中,当任务抛出异常时,exceptionally 方法捕获异常并返回了一个默认结果。

使用 handle 方法同时处理结果和异常

handle 方法可以同时处理异步任务的正常结果和异常情况。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Task failed");
}).handle((result, exception) -> {
    if (exception != null) {
        System.out.println("Caught exception: " + exception.getMessage());
        return "Default result";
    } else {
        return result;
    }
});
try {
    String finalResult = future.get();
    System.out.println("Final Result: " + finalResult);
} catch (InterruptedException e) {
    e.printStackTrace();
}

handle 方法接收一个 BiFunction,它会在任务完成时(无论成功还是失败)被调用。BiFunction 的第一个参数是任务的结果(如果任务成功),第二个参数是异常(如果任务失败)。通过这种方式,我们可以在一个方法中统一处理结果和异常。

CompletableFuture 在实际项目中的应用场景

微服务调用

在微服务架构中,经常需要调用多个不同的微服务接口来获取数据并进行处理。使用 CompletableFuture 可以并行调用这些微服务接口,提高整体响应速度。

// 假设这是两个微服务调用方法
CompletableFuture<String> service1Future = CompletableFuture.supplyAsync(() -> {
    // 模拟调用微服务1
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Service 1 result";
});
CompletableFuture<String> service2Future = CompletableFuture.supplyAsync(() -> {
    // 模拟调用微服务2
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Service 2 result";
});
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(service1Future, service2Future);
allOfFuture.join();
try {
    String result1 = service1Future.get();
    String result2 = service2Future.get();
    // 对两个微服务的结果进行处理
    System.out.println("Combined result: " + result1 + " and " + result2);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在上述代码中,我们并行调用了两个微服务接口,并在所有调用完成后处理结果。这样可以避免顺序调用微服务接口带来的时间浪费,提高系统的性能。

数据批处理

当需要对大量数据进行处理时,可以将数据分成多个批次,使用 CompletableFuture 并行处理每个批次,最后合并结果。

List<Integer> dataList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<CompletableFuture<Integer>> futureList = new ArrayList<>();
int batchSize = 2;
for (int i = 0; i < dataList.size(); i += batchSize) {
    List<Integer> batch = dataList.subList(i, Math.min(i + batchSize, dataList.size()));
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        int sum = 0;
        for (int num : batch) {
            sum += num;
        }
        return sum;
    });
    futureList.add(future);
}
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
allOfFuture.join();
int totalSum = 0;
for (CompletableFuture<Integer> future : futureList) {
    try {
        totalSum += future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
System.out.println("Total sum: " + totalSum);

在上述代码中,我们将数据分成批次,每个批次在一个异步任务中进行求和计算,最后将所有批次的结果累加得到最终结果。这种方式可以充分利用多核 CPU 的性能,提高数据处理效率。

性能优化与注意事项

减少线程创建和销毁开销

频繁地创建和销毁线程会带来较大的开销,因此使用线程池可以复用线程,减少这种开销。通过合理配置线程池的参数,如核心线程数、最大线程数和任务队列,可以使线程池在不同的负载情况下都能高效运行。

避免任务堆积

如果任务队列设置不合理,可能会导致任务堆积,占用大量内存。对于有界队列,要根据系统的负载情况设置合适的队列容量,并选择合适的拒绝策略。对于无界队列,要密切关注任务的处理速度,避免任务无限堆积。

注意线程安全

在异步任务中,如果多个任务需要共享数据,要注意线程安全问题。可以使用线程安全的数据结构,如 ConcurrentHashMapCopyOnWriteArrayList 等,或者使用同步机制,如 synchronized 关键字、Lock 接口等。

监控和调优

在实际应用中,要对异步任务的执行情况进行监控,例如监控线程池的状态(活跃线程数、任务队列大小等)、任务的执行时间和成功率等。根据监控数据,对线程池参数和异步任务的逻辑进行调优,以提高系统的性能和稳定性。

综上所述,CompletableFuture 为 Java 开发者提供了强大的异步编程能力,通过合理管理线程资源和正确处理异常,能够显著提高应用程序的性能和响应性。在实际项目中,要根据具体的业务场景和需求,灵活运用 CompletableFuture 的各种特性,以实现高效、稳定的异步任务处理。