MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java异步编程的最佳实践与技巧

2023-04-188.0k 阅读

Java异步编程基础

在Java中,异步编程允许程序在执行某些任务时不阻塞主线程,从而提高应用程序的响应性和性能。传统的同步编程模型下,代码按照顺序逐行执行,一个任务完成后才会执行下一个任务。如果某个任务执行时间较长,比如进行网络请求或磁盘I/O操作,主线程就会被阻塞,导致整个应用程序看起来像是“冻结”了一样。而异步编程则可以让这些耗时操作在后台线程中执行,主线程可以继续处理其他任务。

线程基础

Java的异步编程离不开线程的概念。线程是程序执行流的最小单元,一个Java程序至少有一个主线程。可以通过继承Thread类或实现Runnable接口来创建线程。

  1. 继承Thread类
class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("This is a new thread: " + Thread.currentThread().getName());
    }
}
public class ThreadExample {
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();
        System.out.println("This is the main thread: " + Thread.currentThread().getName());
    }
}

在上述代码中,MyThread类继承自Thread类,并重写了run方法。在main方法中,创建了MyThread的实例并调用start方法启动线程。start方法会在一个新的线程中执行run方法的代码,而主线程会继续执行System.out.println("This is the main thread: " + Thread.currentThread().getName());

  1. 实现Runnable接口
class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("This is a new thread: " + Thread.currentThread().getName());
    }
}
public class RunnableExample {
    public static void main(String[] args) {
        Thread thread = new Thread(new MyRunnable());
        thread.start();
        System.out.println("This is the main thread: " + Thread.currentThread().getName());
    }
}

这里MyRunnable类实现了Runnable接口,同样在main方法中创建Thread实例并传入MyRunnable的实例来启动新线程。

Callable和Future

Callable接口和Future接口为异步编程提供了更强大的功能。Callable接口类似于Runnable,但Callablecall方法可以返回一个值并且可以抛出异常。Future接口用于获取Callable任务的执行结果。

import java.util.concurrent.*;
class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        // 模拟一些耗时操作
        Thread.sleep(2000);
        return 42;
    }
}
public class CallableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new MyCallable());
        System.out.println("Main thread is doing other things...");
        Integer result = future.get();
        System.out.println("The result is: " + result);
        executorService.shutdown();
    }
}

在上述代码中,MyCallable实现了Callable<Integer>接口,call方法模拟了一个耗时2秒的操作并返回42。在main方法中,通过ExecutorService提交MyCallable任务,返回一个Future对象。主线程可以继续执行其他任务,然后通过future.get()获取任务的执行结果。这里需要注意,future.get()方法会阻塞主线程,直到任务完成并返回结果。

线程池与异步任务执行

手动创建和管理大量线程会带来性能开销和资源管理问题。线程池是一种有效的解决方案,它可以复用已有的线程,减少线程创建和销毁的开销。

Executor框架

Java的Executor框架提供了一种灵活的线程池管理方式。主要涉及的接口和类有ExecutorExecutorServiceScheduledExecutorServiceThreadPoolExecutor等。

  1. Executor接口 Executor接口是一个简单的执行任务的接口,它只有一个execute(Runnable task)方法,用于提交一个Runnable任务。
class MyTask implements Runnable {
    @Override
    public void run() {
        System.out.println("Task is running: " + Thread.currentThread().getName());
    }
}
public class ExecutorExample {
    public static void main(String[] args) {
        Executor executor = Executors.newSingleThreadExecutor();
        executor.execute(new MyTask());
    }
}

在上述代码中,通过Executors.newSingleThreadExecutor()创建了一个单线程的Executor,并提交了一个MyTask任务。

  1. ExecutorService接口 ExecutorService继承自Executor接口,提供了更多管理线程池和任务执行的方法,比如submit方法可以提交CallableRunnable任务并返回Future对象,shutdown方法用于关闭线程池等。
import java.util.concurrent.*;
class MyCallableTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        return "Task completed";
    }
}
public class ExecutorServiceExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        Future<String> future = executorService.submit(new MyCallableTask());
        System.out.println("Main thread is doing other things...");
        String result = future.get();
        System.out.println("The result is: " + result);
        executorService.shutdown();
    }
}

这里通过Executors.newFixedThreadPool(3)创建了一个固定大小为3的线程池,提交了一个MyCallableTask任务,并通过Future获取任务执行结果。

  1. ScheduledExecutorService接口 ScheduledExecutorService继承自ExecutorService接口,用于执行定时任务和周期性任务。
import java.util.concurrent.*;
public class ScheduledExecutorServiceExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.schedule(() -> System.out.println("Delayed task executed"), 3, TimeUnit.SECONDS);
        scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("Periodic task executed"), 0, 2, TimeUnit.SECONDS);
        scheduledExecutorService.shutdown();
    }
}

在上述代码中,通过schedule方法提交了一个延迟3秒执行的任务,通过scheduleAtFixedRate方法提交了一个从0秒开始,每2秒执行一次的周期性任务。

  1. ThreadPoolExecutor类 ThreadPoolExecutorExecutorService的具体实现类,提供了更细粒度的线程池参数控制。构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
  • corePoolSize:核心线程数,线程池会一直保持的线程数量,即使这些线程处于空闲状态。
  • maximumPoolSize:线程池允许的最大线程数。
  • keepAliveTime:当线程数大于核心线程数时,多余的空闲线程的存活时间。
  • unitkeepAliveTime的时间单位。
  • workQueue:用于存储等待执行任务的队列。
import java.util.concurrent.*;
class MyThreadPoolTask implements Runnable {
    @Override
    public void run() {
        System.out.println("Task is running: " + Thread.currentThread().getName());
    }
}
public class ThreadPoolExecutorExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, workQueue);
        for (int i = 0; i < 10; i++) {
            executor.submit(new MyThreadPoolTask());
        }
        executor.shutdown();
    }
}

在上述代码中,创建了一个核心线程数为2,最大线程数为4,空闲线程存活时间为10秒,任务队列容量为5的线程池,并提交了10个任务。

CompletableFuture异步编程

CompletableFuture是Java 8引入的一个强大的异步编程工具,它结合了FutureCompletionStage接口的功能,提供了更灵活和链式调用的异步编程模型。

创建CompletableFuture

  1. 使用supplyAsync方法创建有返回值的CompletableFuture
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureCreation {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, CompletableFuture!";
        });
        String result = future.get();
        System.out.println(result);
    }
}

在上述代码中,CompletableFuture.supplyAsync方法接受一个Supplier,在一个异步线程中执行Supplierget方法,并返回一个CompletableFuture对象。通过future.get()获取异步任务的执行结果。

  1. 使用runAsync方法创建无返回值的CompletableFuture
import java.util.concurrent.CompletableFuture;
public class CompletableFutureRunAsync {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Task completed without return value.");
        });
        // 这里不需要获取结果,因为没有返回值
    }
}

CompletableFuture.runAsync方法接受一个Runnable,在异步线程中执行Runnablerun方法,返回的CompletableFuture的泛型类型为Void

CompletableFuture的链式调用

CompletableFuture支持链式调用,使得异步任务的编排更加简洁和直观。

  1. thenApply方法 thenApply方法用于对CompletableFuture的结果进行转换。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureThenApply {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
               .thenApply(s -> s + ", World!");
        String result = future.get();
        System.out.println(result);
    }
}

在上述代码中,CompletableFuture.supplyAsync返回一个包含“Hello”的CompletableFuture,然后通过thenApply方法将结果转换为“Hello, World!”。

  1. thenAccept方法 thenAccept方法用于在CompletableFuture完成时执行一个消费操作,不返回新的结果。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureThenAccept {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Hello")
               .thenAccept(s -> System.out.println(s + ", World!"));
        // 这里不需要获取结果,因为thenAccept不返回值
    }
}

CompletableFuture.supplyAsync返回一个包含“Hello”的CompletableFuturethenAccept方法消费这个结果并打印“Hello, World!”。

  1. thenRun方法 thenRun方法用于在CompletableFuture完成时执行一个无参数的Runnable,同样不返回新的结果。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureThenRun {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Hello")
               .thenRun(() -> System.out.println("Task completed"));
        // 这里不需要获取结果,因为thenRun不返回值
    }
}

CompletableFuture.supplyAsync返回一个包含“Hello”的CompletableFuturethenRun方法在任务完成时打印“Task completed”。

处理多个CompletableFuture

  1. allOf方法 CompletableFuture.allOf方法用于等待所有的CompletableFuture都完成。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureAllOf {
    public static void main(String[] args) throws InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 1 completed";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2 completed";
        });
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
        allFuture.join();
        System.out.println(future1.join());
        System.out.println(future2.join());
    }
}

在上述代码中,创建了两个异步任务future1future2,通过CompletableFuture.allOf方法等待这两个任务都完成,然后获取并打印它们的结果。

  1. anyOf方法 CompletableFuture.anyOf方法用于等待任意一个CompletableFuture完成。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureAnyOf {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 1 completed";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Future 2 completed";
        });
        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
        Object result = anyFuture.get();
        System.out.println(result);
    }
}

这里创建了两个异步任务future1future2CompletableFuture.anyOf方法等待其中任意一个任务完成,并获取完成任务的结果。由于future2执行时间较短,所以通常会先完成并返回结果。

异步编程中的异常处理

在异步编程中,异常处理是非常重要的。如果处理不当,异常可能会导致程序崩溃或难以调试。

Future的异常处理

当使用Future获取异步任务结果时,可以通过Future.get()方法捕获异常。

import java.util.concurrent.*;
class MyCallableWithException implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        throw new RuntimeException("Task failed");
    }
}
public class FutureExceptionHandling {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new MyCallableWithException());
        try {
            Integer result = future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }
}

在上述代码中,MyCallableWithExceptioncall方法抛出了一个RuntimeException。在main方法中,通过future.get()获取结果时,捕获到InterruptedExceptionExecutionException,其中ExecutionExceptiongetCause方法可以获取到实际抛出的RuntimeException

CompletableFuture的异常处理

CompletableFuture提供了更灵活的异常处理方式。

  1. exceptionally方法 exceptionally方法用于在CompletableFuture发生异常时提供一个默认值或执行一个恢复操作。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExceptionally {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Task failed");
        }).exceptionally(ex -> {
            System.out.println("Caught exception: " + ex.getMessage());
            return -1;
        });
        Integer result = future.join();
        System.out.println("Result: " + result);
    }
}

在上述代码中,CompletableFuture.supplyAsync抛出了一个RuntimeException,通过exceptionally方法捕获异常并返回默认值-1。

  1. handle方法 handle方法可以同时处理正常结果和异常情况。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureHandle {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Task failed");
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println("Caught exception: " + ex.getMessage());
                return -1;
            }
            return result;
        });
        Integer result = future.join();
        System.out.println("Result: " + result);
    }
}

handle方法接受一个BiFunction,第一个参数是正常情况下的结果,第二个参数是异常。如果发生异常,ex不为null,可以进行相应的处理并返回默认值。

异步编程的最佳实践

  1. 合理使用线程池
  • 根据任务的类型和数量来设置线程池的参数。对于I/O密集型任务,可以适当增大核心线程数,因为I/O操作等待时间长,线程空闲时间多;对于CPU密集型任务,核心线程数应接近CPU核心数,避免过多线程导致上下文切换开销过大。
  • 监控线程池的状态,比如活跃线程数、任务队列大小等,根据实际情况调整线程池参数。可以通过ThreadPoolExecutor的相关方法获取这些信息。
  1. 避免过度异步 虽然异步编程可以提高性能,但过度使用异步会增加代码的复杂性和调试难度。对于一些简单的、执行时间短的任务,同步执行可能更加合适。

  2. 正确处理异常 如前面所述,在异步编程中要正确处理异常,避免异常被忽略导致程序出现难以排查的问题。使用CompletableFuture时,充分利用exceptionallyhandle等方法进行异常处理。

  3. 考虑并发安全 在异步编程中,多个线程可能同时访问共享资源,需要注意并发安全问题。可以使用synchronized关键字、Lock接口或并发包中的线程安全集合类来保证数据的一致性。

  4. 优化异步任务的编排 使用CompletableFuture的链式调用和组合方法,如thenApplythenAcceptallOfanyOf等,合理编排异步任务,提高代码的可读性和执行效率。

  5. 异步任务的日志记录 在异步任务中添加适当的日志记录,方便调试和监控。记录任务的开始、结束时间,以及可能出现的异常信息。

总结异步编程的技巧与注意事项

  1. 技巧
  • 利用CompletableFuture的链式调用和组合方法来简化异步任务的编排,使得代码逻辑更加清晰。
  • 对于一些重复执行的异步任务,可以考虑使用ScheduledExecutorService进行定时调度,避免手动管理任务的执行间隔。
  • 在创建线程池时,根据任务的特性选择合适的线程池类型,如固定大小线程池、缓存线程池等。
  1. 注意事项
  • 异步任务可能会导致上下文切换开销,要避免创建过多不必要的线程,合理使用线程池来复用线程。
  • 注意异步任务中的资源泄漏问题,如数据库连接、文件句柄等,确保在任务完成后正确释放资源。
  • 在处理多个异步任务时,要注意任务之间的依赖关系,避免出现死锁或数据竞争等问题。

通过遵循这些最佳实践和技巧,可以在Java中实现高效、可靠的异步编程,提升应用程序的性能和响应性。同时,不断实践和总结经验,能够更好地掌握异步编程这一强大的技术。