MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java CompletableFuture runAsync无返回值异步任务实践

2023-01-173.6k 阅读

Java CompletableFuture runAsync 基础介绍

在 Java 编程领域,处理异步任务是提高应用程序性能和响应性的关键手段之一。CompletableFuture 是 Java 8 引入的一个强大的类,它为异步编程提供了更为便捷和灵活的方式。其中,runAsync 方法用于创建一个没有返回值的异步任务。

CompletableFuture 类实现了 FutureCompletionStage 接口。Future 接口是 Java 早期用于异步计算的标准方式,但它在处理复杂异步操作时存在局限性,比如难以处理多个异步任务的组合以及获取异步任务的结果时可能会导致阻塞。而 CompletionStage 接口则提供了更丰富的异步操作组合方法,CompletableFuture 类将两者的功能融合在一起,使得异步编程更加流畅。

runAsync 方法有两个重载版本:

  • public static CompletableFuture<Void> runAsync(Runnable runnable):此方法接受一个 Runnable 对象作为参数,以异步方式执行该 Runnable 任务,并返回一个 CompletableFuture<Void> 对象。这里返回类型为 Void,表明该异步任务没有返回值。任务会在默认的 ForkJoinPool.commonPool() 线程池中执行。
  • public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor):这个版本除了接受 Runnable 对象外,还接受一个 Executor 对象。通过传入自定义的 Executor,我们可以指定任务执行的线程池,而不是使用默认的 ForkJoinPool.commonPool()

使用默认线程池的 runAsync 示例

下面通过一个简单的代码示例来展示如何使用 runAsync 方法并使用默认线程池执行异步任务:

import java.util.concurrent.CompletableFuture;

public class RunAsyncDefaultThreadPoolExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务在默认线程池中执行完毕");
        });

        // 主线程继续执行其他任务
        System.out.println("主线程继续执行");

        try {
            // 等待异步任务完成
            future.get();
            System.out.println("异步任务已完成,主线程继续后续操作");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,CompletableFuture.runAsync 方法接受一个 Runnable 接口的实现,在这个实现中,我们模拟了一个耗时 2 秒的操作(通过 Thread.sleep(2000))。主线程在调用 runAsync 后立即继续执行,并输出“主线程继续执行”。然后,通过 future.get() 方法,主线程会阻塞,直到异步任务完成。当异步任务完成后,会输出“异步任务在默认线程池中执行完毕”,接着主线程继续执行后续操作,输出“异步任务已完成,主线程继续后续操作”。

使用自定义线程池的 runAsync 示例

在实际应用中,我们可能需要根据业务需求使用自定义的线程池来执行异步任务。这可以通过 runAsync 的第二个重载方法来实现。以下是一个使用自定义线程池的示例:

import java.util.concurrent.*;

public class RunAsyncCustomThreadPoolExample {
    public static void main(String[] args) {
        // 创建自定义线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务在自定义线程池中执行完毕");
        }, executor);

        // 主线程继续执行其他任务
        System.out.println("主线程继续执行");

        try {
            // 等待异步任务完成
            future.get();
            System.out.println("异步任务已完成,主线程继续后续操作");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            executor.shutdown();
        }
    }
}

在这个示例中,我们首先通过 Executors.newFixedThreadPool(3) 创建了一个固定大小为 3 的线程池。然后,使用 CompletableFuture.runAsync 的第二个重载方法,将 Runnable 任务和自定义的 Executor(即我们创建的线程池)作为参数传入。主线程的执行流程与前面使用默认线程池的示例类似,只是异步任务现在是在自定义的线程池中执行。最后,在程序结束前,我们调用 executor.shutdown() 方法关闭线程池,以释放资源。

runAsync 与其他 CompletableFuture 方法的组合使用

CompletableFuture 类提供了丰富的方法来组合多个异步任务,runAsync 方法可以与这些方法协同工作,以实现更复杂的异步操作。

thenApply 方法与 runAsync 的组合

thenApply 方法用于在异步任务完成后,对其结果进行转换。虽然 runAsync 本身返回的是 CompletableFuture<Void>,没有实际结果,但我们可以在其完成后启动另一个有返回值的异步任务。例如:

import java.util.concurrent.CompletableFuture;

public class RunAsyncThenApplyExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一个异步任务执行完毕");
        }).thenApply(v -> {
            System.out.println("基于第一个任务的结果进行转换操作");
            return "转换后的结果";
        });

        try {
            String result = future.get();
            System.out.println("最终结果: " + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,runAsync 执行完第一个异步任务后,通过 thenApply 方法启动了另一个异步任务。thenApply 方法接受一个 Function 接口的实现,对 runAsync 任务完成后的 Void 值(这里虽然是 Void,但 thenApply 依然会在 runAsync 完成后执行)进行转换,并返回一个 CompletableFuture<String>。最终,我们通过 future.get() 获取转换后的结果并输出。

thenAccept 方法与 runAsync 的组合

thenAccept 方法用于在异步任务完成后,执行一个消费操作,同样不返回结果。它与 runAsync 结合可以方便地在异步任务完成后进行一些后续处理。例如:

import java.util.concurrent.CompletableFuture;

public class RunAsyncThenAcceptExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务执行完毕");
        }).thenAccept(v -> {
            System.out.println("对异步任务的结果进行消费操作");
        });

        try {
            future.get();
            System.out.println("所有操作完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,runAsync 执行完异步任务后,thenAccept 方法会被调用,执行传入的消费操作。thenAccept 接受一个 Consumer 接口的实现,这里我们只是简单地输出一条消息。最后,通过 future.get() 等待所有操作完成并输出“所有操作完成”。

thenRun 方法与 runAsync 的组合

thenRun 方法与 thenAccept 类似,也是在异步任务完成后执行一个后续操作,但它不关心前一个任务的结果。例如:

import java.util.concurrent.CompletableFuture;

public class RunAsyncThenRunExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟一个耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务执行完毕");
        }).thenRun(() -> {
            System.out.println("不关心前一个任务结果,执行后续操作");
        });

        try {
            future.get();
            System.out.println("所有操作完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,runAsync 完成异步任务后,thenRun 方法会执行其传入的 Runnable 任务,不关心 runAsync 的执行结果。最后,通过 future.get() 等待所有操作完成并输出“所有操作完成”。

runAsync 中的异常处理

在异步任务执行过程中,可能会出现各种异常情况。CompletableFuture 提供了相应的机制来处理这些异常。

使用 exceptionally 方法处理异常

exceptionally 方法用于在异步任务抛出异常时,返回一个默认值或执行一些异常处理逻辑。例如:

import java.util.concurrent.CompletableFuture;

public class RunAsyncExceptionallyExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
            // 模拟一个可能抛出异常的操作
            if (Math.random() < 0.5) {
                throw new RuntimeException("异步任务发生异常");
            }
            System.out.println("异步任务执行完毕");
            return "正常结果";
        }).thenApply(String::toUpperCase).exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return "默认值";
        });

        try {
            String result = future.get();
            System.out.println("最终结果: " + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,runAsync 内部的异步任务有 50% 的概率抛出 RuntimeExceptionthenApply 方法尝试对任务结果进行转换,如果任务执行过程中抛出异常,exceptionally 方法会捕获到异常,并返回一个默认值“默认值”。同时,会输出异常信息“捕获到异常: 异步任务发生异常”。最后,通过 future.get() 获取最终结果并输出。

使用 whenComplete 方法处理异常

whenComplete 方法用于在异步任务完成(无论成功还是失败)时执行一个回调。它可以用于处理异常,也可以用于在任务完成后执行一些通用的清理操作。例如:

import java.util.concurrent.CompletableFuture;

public class RunAsyncWhenCompleteExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
            // 模拟一个可能抛出异常的操作
            if (Math.random() < 0.5) {
                throw new RuntimeException("异步任务发生异常");
            }
            System.out.println("异步任务执行完毕");
            return "正常结果";
        }).thenApply(String::toUpperCase).whenComplete((result, ex) -> {
            if (ex != null) {
                System.out.println("捕获到异常: " + ex.getMessage());
            } else {
                System.out.println("任务成功,结果为: " + result);
            }
        });

        try {
            future.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,whenComplete 方法接受一个 BiConsumer 接口的实现,该实现会在异步任务完成时被调用。如果 ex 不为 null,则表示任务执行过程中发生了异常,我们输出异常信息;否则,输出任务成功的结果。

runAsync 在实际项目中的应用场景

后台任务处理

在 Web 应用中,有些任务不需要立即返回结果给用户,比如发送邮件、生成报表等。这些任务可以使用 runAsync 方法在后台异步执行,以提高应用程序的响应速度。例如,一个电商系统在用户下单后,可能需要发送订单确认邮件给用户,同时生成订单报表。这两个任务可以使用 runAsync 方法在后台线程池中异步执行,而用户可以很快看到下单成功的页面。

import java.util.concurrent.CompletableFuture;

public class EcommerceExample {
    public static void main(String[] args) {
        // 模拟用户下单操作
        System.out.println("用户下单成功");

        CompletableFuture.runAsync(() -> {
            sendOrderConfirmationEmail();
        });

        CompletableFuture.runAsync(() -> {
            generateOrderReport();
        });

        System.out.println("系统继续处理其他任务");
    }

    private static void sendOrderConfirmationEmail() {
        // 模拟发送邮件操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("订单确认邮件已发送");
    }

    private static void generateOrderReport() {
        // 模拟生成报表操作
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("订单报表已生成");
    }
}

在上述代码中,用户下单成功后,发送邮件和生成报表的任务通过 runAsync 方法异步执行,主线程继续处理其他任务,提高了系统的响应性。

数据预处理

在大数据处理场景中,常常需要对数据进行预处理,比如数据清洗、格式转换等。这些预处理任务可以使用 runAsync 方法并行执行,以加快整个数据处理流程。例如,一个数据分析系统需要从多个数据源读取数据,并对数据进行预处理。每个数据源的数据预处理任务可以使用 runAsync 方法在不同的线程中执行。

import java.util.concurrent.CompletableFuture;

public class DataPreprocessingExample {
    public static void main(String[] args) {
        CompletableFuture.runAsync(() -> {
            preprocessDataFromSource1();
        });

        CompletableFuture.runAsync(() -> {
            preprocessDataFromSource2();
        });

        CompletableFuture.runAsync(() -> {
            preprocessDataFromSource3();
        });

        System.out.println("数据预处理任务已启动,主线程继续执行其他操作");

        try {
            // 等待所有异步任务完成
            CompletableFuture.allOf(
                CompletableFuture.runAsync(() -> preprocessDataFromSource1()),
                CompletableFuture.runAsync(() -> preprocessDataFromSource2()),
                CompletableFuture.runAsync(() -> preprocessDataFromSource3())
            ).get();

            System.out.println("所有数据预处理任务已完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void preprocessDataFromSource1() {
        // 模拟从数据源 1 进行数据预处理
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("数据源 1 数据预处理完成");
    }

    private static void preprocessDataFromSource2() {
        // 模拟从数据源 2 进行数据预处理
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("数据源 2 数据预处理完成");
    }

    private static void preprocessDataFromSource3() {
        // 模拟从数据源 3 进行数据预处理
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("数据源 3 数据预处理完成");
    }
}

在上述代码中,我们启动了三个异步任务分别对三个数据源的数据进行预处理。通过 CompletableFuture.allOf 方法,我们可以等待所有异步任务完成后再继续执行后续操作,确保数据预处理工作全部完成。

缓存更新

在一些应用中,缓存数据需要定期更新以保证数据的准确性。使用 runAsync 方法可以在后台异步更新缓存,而不会影响应用程序的正常运行。例如,一个新闻网站需要定期更新首页的热门新闻缓存。

import java.util.concurrent.CompletableFuture;

public class CacheUpdateExample {
    public static void main(String[] args) {
        // 启动缓存更新任务
        CompletableFuture.runAsync(() -> {
            updateHotNewsCache();
        });

        System.out.println("应用程序继续提供服务");

        try {
            // 等待缓存更新任务完成
            CompletableFuture.runAsync(() -> updateHotNewsCache()).get();
            System.out.println("热门新闻缓存已更新");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void updateHotNewsCache() {
        // 模拟更新热门新闻缓存操作
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("热门新闻缓存更新完成");
    }
}

在这个示例中,runAsync 方法启动了一个异步任务来更新热门新闻缓存,应用程序主线程继续提供服务。通过 CompletableFuture.runAsync(() -> updateHotNewsCache()).get() 等待缓存更新任务完成,确保缓存更新操作完成后再进行相关的后续处理。

runAsync 性能优化与注意事项

线程池的合理配置

在使用 runAsync 方法时,合理配置线程池是提高性能的关键。如果使用默认的 ForkJoinPool.commonPool(),需要注意其线程数量是根据 CPU 核心数动态调整的,可能不适合某些特定的应用场景。对于 I/O 密集型任务,适当增加线程池大小可以提高并发性能;而对于 CPU 密集型任务,过多的线程可能会导致上下文切换开销增大,反而降低性能。在使用自定义线程池时,需要根据任务的特性(I/O 密集型还是 CPU 密集型)来设置线程池的核心线程数、最大线程数等参数。例如,对于 I/O 密集型任务,可以将核心线程数设置为 CPU 核心数的 2 到 3 倍;对于 CPU 密集型任务,核心线程数一般设置为 CPU 核心数。

避免不必要的阻塞

虽然 CompletableFuture 提供了强大的异步编程能力,但如果不正确使用,可能会导致主线程或其他关键线程阻塞,从而影响应用程序的性能。例如,在获取异步任务结果时,应尽量避免在主线程中直接调用 get() 方法,除非确实需要等待任务完成。可以使用 whenCompletethenApply 等方法来处理异步任务的结果,以实现非阻塞的异步编程。另外,在异步任务内部,如果需要调用其他可能阻塞的方法(如数据库查询、文件读取等),应确保这些操作本身也是异步的,或者使用合适的线程池来执行这些操作,以避免阻塞异步任务所在的线程。

异常处理的完整性

在异步任务执行过程中,确保异常能够被正确捕获和处理非常重要。如果不处理异步任务抛出的异常,可能会导致应用程序出现未处理的异常,影响程序的稳定性。使用 exceptionallywhenComplete 等方法来处理异常时,要确保异常处理逻辑能够覆盖所有可能出现异常的情况。同时,在日志记录方面,要详细记录异常信息,以便于调试和排查问题。例如,在 exceptionally 方法中,可以将异常堆栈信息记录到日志文件中,方便开发人员定位问题。

资源管理

在使用自定义线程池时,要注意资源的管理。在应用程序结束时,应及时关闭线程池,以释放资源。可以使用 ExecutorServiceshutdown()awaitTermination() 方法来优雅地关闭线程池。shutdown() 方法会启动一个有序关闭过程,不再接受新任务,但会继续执行已提交的任务。awaitTermination() 方法则用于等待线程池中的所有任务执行完毕。例如:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolResourceManagementExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 执行异步任务
        CompletableFuture.runAsync(() -> {
            // 任务逻辑
        }, executor);

        // 关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在上述代码中,我们先调用 executor.shutdown() 启动关闭过程,然后通过 executor.awaitTermination(60, TimeUnit.SECONDS) 等待线程池中的任务在 60 秒内完成。如果 60 秒后任务仍未完成,调用 executor.shutdownNow() 尝试停止正在执行的任务,并再次等待 60 秒。如果最终线程池仍未终止,输出错误信息。这样可以确保在应用程序结束时,线程池资源能够被正确释放。

通过以上对 Java CompletableFuture runAsync 无返回值异步任务的详细介绍、代码示例以及在实际项目中的应用场景、性能优化与注意事项的阐述,相信开发者们能够更加深入地理解和掌握这一强大的异步编程工具,在实际项目中灵活运用,提升应用程序的性能和响应性。在实际应用中,需要根据具体的业务需求和系统架构,合理选择线程池、处理异常以及管理资源,以充分发挥 CompletableFuture 的优势。同时,不断地实践和优化,能够让我们在异步编程领域更加得心应手,开发出更加高效、稳定的 Java 应用程序。