MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java编程中的异步编程基础

2022-04-294.8k 阅读

一、异步编程概述

在传统的同步编程模型中,程序按照顺序依次执行各个任务。一个任务执行完毕后,才会接着执行下一个任务。例如,假设有一个方法 methodA 调用另一个方法 methodB,只有当 methodB 执行完返回结果后,methodA 才会继续执行后续代码。这种方式在任务之间存在依赖关系,且任务执行时间较短时表现良好。

然而,在现代软件开发中,尤其是涉及到网络请求、I/O 操作(如文件读写)等场景,这些操作往往需要较长时间等待外部资源响应。如果采用同步编程,主线程会被阻塞,导致整个应用程序在等待过程中无法响应用户操作,极大影响用户体验。

异步编程应运而生,它允许程序在执行一个耗时操作时,不阻塞主线程,而是继续执行后续代码。当耗时操作完成后,通过特定的机制(如回调函数、Future、CompletableFuture 等)通知程序并处理结果。这样可以显著提高程序的并发性能和响应性。

二、Java 中的异步编程方式

(一)线程(Thread)

在 Java 中,最基础的实现异步的方式就是使用线程。线程是操作系统能够进行运算调度的最小单位,每个线程都可以独立执行一段代码。

  1. 创建线程的方式
    • 继承 Thread 类
class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("This is a thread running asynchronously.");
    }
}
public class Main {
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();
        System.out.println("Main thread continues execution.");
    }
}

在上述代码中,MyThread 类继承自 Thread 类,并重写了 run 方法,run 方法中的代码就是线程要执行的异步任务。在 main 方法中,创建 MyThread 实例并调用 start 方法启动线程。启动后,主线程和新创建的线程并行执行,System.out.println("Main thread continues execution."); 语句不会等待 MyThread 执行完毕就会继续执行。

- **实现 Runnable 接口**
class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("This is a runnable running asynchronously.");
    }
}
public class Main {
    public static void main(String[] args) {
        Thread thread = new Thread(new MyRunnable());
        thread.start();
        System.out.println("Main thread continues execution.");
    }
}

这里 MyRunnable 类实现了 Runnable 接口,同样在 run 方法中定义异步任务。通过将 MyRunnable 实例作为参数传递给 Thread 构造函数来创建线程并启动。

  1. 线程的生命周期
    • 新建(New):当创建一个 Thread 实例或实现 Runnable 接口并传入 Thread 构造函数时,线程处于新建状态。此时线程尚未启动。
    • 就绪(Runnable):调用 start 方法后,线程进入就绪状态。处于就绪状态的线程等待 CPU 调度,一旦获得 CPU 时间片,就会进入运行状态。
    • 运行(Running):线程获取 CPU 时间片,开始执行 run 方法中的代码。
    • 阻塞(Blocked):在运行过程中,线程可能会因为某些原因(如等待 I/O 操作完成、调用 sleep 方法、获取锁失败等)进入阻塞状态。处于阻塞状态的线程不会获得 CPU 时间片,直到阻塞原因解除,重新进入就绪状态。
    • 死亡(Dead):当 run 方法执行完毕或者因为异常导致 run 方法提前结束,线程进入死亡状态。此时线程生命周期结束,不能再次启动。

(二)Callable 和 Future

虽然线程提供了异步执行的能力,但它存在一些局限性。例如,Thread 类的 run 方法没有返回值,如果需要获取异步任务的执行结果,使用 Thread 就不太方便。CallableFuture 接口解决了这个问题。

  1. Callable 接口 Callable 接口类似于 Runnable 接口,但 Callablecall 方法可以返回一个值并且可以抛出异常。
import java.util.concurrent.Callable;
class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        // 模拟一些耗时操作
        Thread.sleep(2000);
        return "Task completed asynchronously.";
    }
}

在上述代码中,MyCallable 类实现了 Callable 接口,call 方法返回一个字符串,并模拟了一个 2 秒的耗时操作。

  1. Future 接口 Future 接口用于获取 Callable 任务的执行结果。Future 提供了方法来检查任务是否完成、获取任务执行结果以及取消任务等。
import java.util.concurrent.*;
public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> future = executorService.submit(new MyCallable());
        try {
            while (!future.isDone()) {
                System.out.println("Task is still running...");
                Thread.sleep(500);
            }
            String result = future.get();
            System.out.println("Task result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

main 方法中,首先创建了一个单线程的线程池 ExecutorService,然后通过 submit 方法提交 MyCallable 任务,返回一个 Future 对象。通过 future.isDone() 方法检查任务是否完成,在任务完成前主线程可以做其他事情,这里简单打印提示信息。当任务完成后,通过 future.get() 方法获取任务的执行结果。最后,关闭线程池。

(三)FutureTask

FutureTask 类实现了 FutureRunnable 接口,它可以作为 Thread 的构造参数,也可以通过 ExecutorService 执行。同时,它还可以获取异步任务的执行结果。

import java.util.concurrent.*;
class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        // 模拟一些耗时操作
        Thread.sleep(2000);
        return "Task completed asynchronously.";
    }
}
public class Main {
    public static void main(String[] args) {
        FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
        Thread thread = new Thread(futureTask);
        thread.start();
        try {
            while (!futureTask.isDone()) {
                System.out.println("Task is still running...");
                Thread.sleep(500);
            }
            String result = futureTask.get();
            System.out.println("Task result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这段代码中,首先创建了 FutureTask 实例,并传入 MyCallable 对象。然后创建 Thread 并将 FutureTask 作为参数传递给 Thread 构造函数。启动线程后,通过 FutureTaskisDoneget 方法来检查任务状态和获取结果。

(四)CompletableFuture

CompletableFuture 是 Java 8 引入的用于异步编程的类,它扩展了 Future 接口,提供了更强大、更灵活的异步处理能力。它支持链式调用、异步任务组合、错误处理等功能,使得异步编程更加简洁和易于理解。

  1. 创建 CompletableFuture
    • runAsync:用于执行没有返回值的异步任务。
import java.util.concurrent.CompletableFuture;
public class Main {
    public static void main(String[] args) {
        CompletableFuture.runAsync(() -> {
            System.out.println("Running an asynchronous task without return value.");
        }).thenRun(() -> {
            System.out.println("Task without return value completed.");
        });
    }
}

在上述代码中,CompletableFuture.runAsync 方法接收一个 Runnable 实例,启动一个异步任务。thenRun 方法在异步任务完成后执行,打印任务完成的提示信息。

- **supplyAsync**:用于执行有返回值的异步任务。
import java.util.concurrent.CompletableFuture;
public class Main {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            // 模拟一些耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task result with return value.";
        }).thenAccept(result -> {
            System.out.println("Task result: " + result);
        });
    }
}

这里 CompletableFuture.supplyAsync 方法接收一个 Supplier 实例,执行异步任务并返回结果。thenAccept 方法在任务完成后接收任务的返回值并进行处理。

  1. 链式调用和组合异步任务 CompletableFuture 支持链式调用,通过一系列方法可以将多个异步任务组合起来,实现复杂的异步逻辑。
import java.util.concurrent.CompletableFuture;
public class Main {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 1 is running.");
            return "Task 1 result";
        }).thenApply(result1 -> {
            System.out.println("Task 2 is running with result of Task 1: " + result1);
            return result1 + " - Task 2 appended";
        }).thenApply(result2 -> {
            System.out.println("Task 3 is running with result of Task 2: " + result2);
            return result2 + " - Task 3 appended";
        }).thenAccept(finalResult -> {
            System.out.println("Final result: " + finalResult);
        });
    }
}

在这个例子中,首先 supplyAsync 启动任务 1 并返回结果。thenApply 方法接收任务 1 的结果作为参数,启动任务 2 并返回新的结果,以此类推。最后 thenAccept 方法接收最终结果并进行处理。

  1. 错误处理 CompletableFuture 提供了方便的错误处理机制。
import java.util.concurrent.CompletableFuture;
public class Main {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("Task failed randomly.");
            }
            return "Task success.";
        }).thenAccept(result -> {
            System.out.println("Task result: " + result);
        }).exceptionally(ex -> {
            System.out.println("Caught exception: " + ex.getMessage());
            return null;
        });
    }
}

在上述代码中,supplyAsync 中的任务有 50% 的概率抛出异常。exceptionally 方法用于捕获异步任务执行过程中抛出的异常,并进行相应处理。

三、异步编程中的线程池

在实际应用中,频繁创建和销毁线程会带来较大的开销,影响系统性能。线程池通过复用已创建的线程来执行任务,避免了线程创建和销毁的开销,提高了系统的并发处理能力。

(一)线程池的创建

在 Java 中,可以使用 ExecutorService 接口及其实现类来创建线程池。常用的线程池创建方式有以下几种:

  1. FixedThreadPool FixedThreadPool 创建一个固定大小的线程池,线程池中的线程数量在创建时就固定下来,不会随着任务的提交而增加或减少。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running in a thread from the fixed thread pool.");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

在上述代码中,通过 Executors.newFixedThreadPool(3) 创建了一个包含 3 个线程的固定大小线程池。然后提交 5 个任务,由于线程池大小为 3,这 5 个任务会分批次执行,每次最多有 3 个任务同时执行。

  1. CachedThreadPool CachedThreadPool 创建一个可缓存的线程池,线程池的大小会根据任务的提交动态调整。如果线程池中有空闲线程,会复用空闲线程执行任务;如果没有空闲线程,会创建新的线程来执行任务。当线程空闲时间超过 60 秒,线程会被回收。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running in a thread from the cached thread pool.");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

这里创建了一个可缓存线程池,提交 5 个任务时,如果当前没有足够的空闲线程,会创建新线程来执行任务,任务执行完毕后线程会缓存起来,等待后续任务复用。

  1. SingleThreadExecutor SingleThreadExecutor 创建一个单线程的线程池,它保证所有任务按照提交顺序依次执行,同一时间只有一个任务在执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskNumber + " is running in the single - threaded pool.");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

在这个例子中,提交的 5 个任务会依次在单线程中执行,每个任务执行完毕后,下一个任务才会开始执行。

(二)线程池的原理

线程池内部维护了一个任务队列和一组线程。当有任务提交到线程池时,线程池会按照以下规则处理:

  1. 如果线程池中的线程数量小于核心线程数(对于 FixedThreadPoolSingleThreadExecutor,核心线程数就是线程池大小;对于 CachedThreadPool,核心线程数为 0),会创建新的线程来执行任务。
  2. 如果线程池中的线程数量达到核心线程数,新提交的任务会被放入任务队列中等待执行。
  3. 如果任务队列已满,且线程池中的线程数量小于最大线程数(对于 FixedThreadPool,最大线程数等于核心线程数;对于 CachedThreadPool,最大线程数为 Integer.MAX_VALUE;对于 SingleThreadExecutor,最大线程数为 1),会创建新的线程来执行任务。
  4. 如果任务队列已满,且线程池中的线程数量达到最大线程数,根据不同的拒绝策略处理新提交的任务。常见的拒绝策略有:
    • AbortPolicy:默认策略,直接抛出 RejectedExecutionException 异常。
    • CallerRunsPolicy:将任务返回给调用者,由调用者线程来执行任务。
    • DiscardPolicy:直接丢弃任务,不做任何处理。
    • DiscardOldestPolicy:丢弃任务队列中最老的任务,然后尝试提交新任务。

四、异步编程中的并发控制

在异步编程中,多个线程或异步任务可能同时访问和修改共享资源,这可能导致数据不一致等并发问题。因此,需要采取一些并发控制手段来保证程序的正确性。

(一)锁机制

  1. synchronized 关键字 synchronized 关键字可以用来修饰方法或代码块,保证同一时间只有一个线程能够进入被修饰的方法或代码块,从而实现对共享资源的同步访问。
public class Counter {
    private int count = 0;
    public synchronized void increment() {
        count++;
    }
    public synchronized int getCount() {
        return count;
    }
}

在上述代码中,increment 方法和 getCount 方法都被 synchronized 修饰,当一个线程调用这两个方法中的任意一个时,其他线程无法同时调用,从而保证了 count 变量的一致性。

  1. ReentrantLock ReentrantLock 是 Java 提供的一种可重入的互斥锁,它比 synchronized 关键字提供了更灵活的锁控制。
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
    private int count = 0;
    private ReentrantLock lock = new ReentrantLock();
    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

在这个例子中,通过 ReentrantLocklockunlock 方法来实现对共享资源 count 的同步访问。try - finally 块确保无论在 incrementgetCount 方法执行过程中是否发生异常,锁都会被正确释放。

(二)原子类

Java 提供了一系列原子类,如 AtomicIntegerAtomicLong 等,这些原子类通过硬件级别的原子操作来保证对变量的操作是线程安全的,无需额外的锁机制。

import java.util.concurrent.atomic.AtomicInteger;
public class Counter {
    private AtomicInteger count = new AtomicInteger(0);
    public void increment() {
        count.incrementAndGet();
    }
    public int getCount() {
        return count.get();
    }
}

在上述代码中,AtomicIntegerincrementAndGet 方法是原子操作,多个线程同时调用时不会出现数据不一致问题。

(三)线程安全的集合类

在异步编程中,使用线程安全的集合类可以避免并发访问集合时出现的问题。例如,ConcurrentHashMap 是线程安全的哈希表,CopyOnWriteArrayList 是线程安全的列表。

import java.util.concurrent.ConcurrentHashMap;
public class Main {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        map.put("key1", 1);
        int value = map.getOrDefault("key1", 0);
        System.out.println("Value: " + value);
    }
}

在这个例子中,ConcurrentHashMap 可以被多个线程安全地访问和修改,保证了数据的一致性。

五、异步编程的应用场景

  1. 网络请求:在进行网络请求(如 HTTP 请求)时,由于网络延迟等原因,请求可能需要较长时间才能得到响应。使用异步编程可以避免主线程阻塞,让用户界面保持响应性。例如,在一个 Web 应用中,后台线程发起 HTTP 请求获取数据,主线程继续处理其他用户交互操作,当数据获取完成后,通过回调或其他机制更新界面显示数据。
  2. I/O 操作:文件读写、数据库操作等 I/O 操作往往是耗时的。异步执行 I/O 操作可以提高系统的整体性能。比如在一个文件处理应用中,异步读取文件内容,主线程可以同时处理其他任务,如显示进度条或接收用户的其他指令。
  3. 并行计算:当需要进行大量的计算任务,且这些任务之间相互独立时,可以使用异步编程将任务分配到多个线程或线程池中并行执行,加快计算速度。例如,在数据分析领域,对大量数据进行并行的统计计算。

通过合理运用异步编程,Java 开发者可以构建出高效、响应性强的应用程序,满足现代软件对性能和用户体验的要求。在实际应用中,需要根据具体的业务场景和需求选择合适的异步编程方式,并注意并发控制等问题,以确保程序的正确性和稳定性。