MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java并发包中的Executor框架

2024-05-268.0k 阅读

Java并发包中的Executor框架概述

在Java的并发编程领域中,Executor框架是一个极为重要的工具集,它为我们管理和执行任务提供了一种优雅且高效的方式。在传统的Java线程使用中,我们通常会通过创建Thread对象,然后调用start()方法来启动线程。然而,随着并发任务数量的增加以及对线程管理复杂度的提升,这种方式变得难以维护和扩展。Executor框架应运而生,它将任务的提交与执行分离,使得我们能够更方便地管理线程资源,提高程序的性能和可维护性。

Executor框架主要由三个核心部分组成:Executor接口、ExecutorService接口及其实现类,以及ThreadPoolExecutor类等相关工具类。Executor接口是整个框架的基础,它定义了一个简单的方法execute(Runnable task),用于提交一个Runnable任务。ExecutorService接口继承自Executor接口,在其基础上增加了生命周期管理和任务提交的更多方法,比如submit(Callable<T> task),它可以返回一个Future对象,用于获取任务的执行结果。ThreadPoolExecutor类则是ExecutorService接口的一个重要实现类,它提供了灵活的线程池管理功能,我们可以通过配置参数来控制线程池的大小、任务队列的容量等。

Executor接口

Executor接口是Executor框架的最基础部分,其定义非常简单:

public interface Executor {
    void execute(Runnable task);
}

这个接口只有一个方法execute,该方法接收一个Runnable任务,并异步执行这个任务。这里所谓的“异步执行”意味着调用execute方法后,调用线程不会等待任务执行完成,而是继续执行后续的代码。

下面是一个简单的使用Executor接口的示例:

import java.util.concurrent.Executor;

class MyExecutor implements Executor {
    @Override
    public void execute(Runnable task) {
        new Thread(task).start();
    }
}

public class ExecutorExample {
    public static void main(String[] args) {
        Executor executor = new MyExecutor();
        executor.execute(() -> System.out.println("Task executed by custom executor"));
    }
}

在上述代码中,我们定义了一个MyExecutor类实现了Executor接口,在execute方法中,我们简单地创建了一个新线程并启动它来执行传入的Runnable任务。在main方法中,我们创建了MyExecutor的实例,并提交了一个任务。虽然这是一个简单的实现,但它展示了Executor接口的基本用法。

ExecutorService接口

ExecutorService接口继承自Executor接口,为我们提供了更多的功能,主要包括生命周期管理和任务提交的扩展方法。

生命周期管理方法

  • shutdown():启动一个有序关闭,不再接受新任务,但会继续执行已提交的任务。
  • shutdownNow():尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
  • isShutdown():如果已经调用了shutdown()shutdownNow(),则返回true
  • isTerminated():如果所有任务都已完成关闭,则返回true

任务提交扩展方法

  • submit(Callable<T> task):提交一个返回值的任务,并返回一个表示任务执行结果的Future对象。
  • submit(Runnable task):提交一个Runnable任务,并返回一个表示任务执行结果的Future对象,Future.get()方法将返回null
  • submit(Runnable task, T result):提交一个Runnable任务,并返回一个表示任务执行结果的Future对象,Future.get()方法将返回传入的result对象。

下面是一个使用ExecutorService接口的示例:

import java.util.concurrent.*;

public class ExecutorServiceExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        Future<String> future1 = executorService.submit(() -> {
            TimeUnit.SECONDS.sleep(2);
            return "Task 1 completed";
        });

        Future<String> future2 = executorService.submit(() -> {
            TimeUnit.SECONDS.sleep(1);
            return "Task 2 completed";
        });

        try {
            System.out.println(future1.get());
            System.out.println(future2.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}

在上述代码中,我们通过Executors.newFixedThreadPool(2)创建了一个固定大小为2的线程池。然后提交了两个Callable任务,这两个任务分别模拟了不同的执行时间。通过Future.get()方法,我们可以获取任务的执行结果。最后,调用shutdown()方法关闭线程池。

ThreadPoolExecutor类

ThreadPoolExecutor类是ExecutorService接口的核心实现类,它提供了高度可定制的线程池管理功能。通过构造函数,我们可以设置线程池的各种参数,以满足不同的应用场景需求。

ThreadPoolExecutor的构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:核心线程数,线程池中会一直存活的线程数量,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
  • maximumPoolSize:线程池允许创建的最大线程数。当任务队列已满且活动线程数小于最大线程数时,线程池会创建新的线程来执行任务。
  • keepAliveTime:当线程数大于核心线程数时,多余的空闲线程的存活时间。即如果一个线程空闲时间超过了keepAliveTime,且当前线程数大于核心线程数,那么这个线程将会被终止。
  • unitkeepAliveTime的时间单位。
  • workQueue:任务队列,用于存放等待执行的任务。常用的任务队列有ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue等。
  • threadFactory:线程工厂,用于创建新的线程。通过线程工厂,我们可以对创建的线程进行一些定制,比如设置线程名称、线程优先级等。
  • handler:拒绝策略,当任务队列已满且线程数达到最大线程数时,新提交的任务将被拒绝,此时会调用拒绝策略来处理被拒绝的任务。常用的拒绝策略有AbortPolicy(默认策略,直接抛出异常)、CallerRunsPolicy(将任务交回给调用者执行)、DiscardPolicy(直接丢弃任务)和DiscardOldestPolicy(丢弃队列中最老的任务,然后尝试提交新任务)。

示例:自定义ThreadPoolExecutor

import java.util.concurrent.*;

public class ThreadPoolExecutorExample {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                4,
                10,
                TimeUnit.SECONDS,
                workQueue,
                threadFactory,
                handler
        );

        for (int i = 0; i < 6; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

在上述代码中,我们创建了一个ThreadPoolExecutor实例,核心线程数为2,最大线程数为4,任务队列容量为2,线程存活时间为10秒。我们提交了6个任务,由于任务队列只能容纳2个任务,且核心线程数为2,所以一开始会有2个任务在核心线程中执行,2个任务进入任务队列,另外2个任务会导致线程池创建新的线程(因为最大线程数为4)。当所有任务提交完毕后,我们调用shutdown()方法关闭线程池。

常用的线程池创建方式

除了直接使用ThreadPoolExecutor类来创建线程池外,Executors类还提供了一些静态工厂方法来创建不同类型的线程池,这些线程池适用于不同的场景。

newFixedThreadPool(int nThreads)

创建一个固定大小的线程池,线程池中的线程数量始终保持为nThreads。当有新任务提交时,如果线程池中有空闲线程,则立即执行任务;如果没有空闲线程,则任务会被放入任务队列中等待。

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

这种线程池适用于对线程数量有明确限制,且任务执行时间相对较长的场景,比如数据库连接池的管理。

newCachedThreadPool()

创建一个可缓存的线程池,线程池的大小会根据任务的数量自动调整。如果线程池中的线程空闲时间超过60秒,那么这些线程将会被回收。当有新任务提交时,如果线程池中有空闲线程,则立即执行任务;如果没有空闲线程,则会创建新的线程来执行任务。

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

这种线程池适用于任务执行时间较短,但任务数量较多且提交频率不稳定的场景,比如处理大量的短期异步任务。

newSingleThreadExecutor()

创建一个单线程的线程池,线程池中只有一个线程。所有任务会按照提交的顺序依次在这个线程中执行。

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

这种线程池适用于需要顺序执行任务,且不希望多个任务并发执行的场景,比如对文件的顺序读写操作。

newScheduledThreadPool(int corePoolSize)

创建一个固定大小的线程池,支持定时及周期性任务执行。线程池中的核心线程数为corePoolSize

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
scheduledThreadPool.schedule(() -> System.out.println("Delayed task executed"), 5, TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(() -> System.out.println("Periodic task executed"), 0, 2, TimeUnit.SECONDS);

在上述代码中,schedule方法用于延迟执行任务,scheduleAtFixedRate方法用于周期性执行任务。这种线程池适用于需要定时执行任务的场景,比如定时备份数据、定时清理缓存等。

Future和Callable接口

Executor框架中,FutureCallable接口是非常重要的组成部分,它们为我们提供了获取任务执行结果以及处理异步任务的能力。

Callable接口

Callable接口类似于Runnable接口,不同之处在于Callable接口的call方法可以返回一个值,并且可以抛出异常。

public interface Callable<V> {
    V call() throws Exception;
}

下面是一个简单的Callable示例:

import java.util.concurrent.Callable;

class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        Thread.sleep(2000);
        return 42;
    }
}

在上述代码中,MyCallable类实现了Callable接口,call方法模拟了一个耗时2秒的任务,并返回一个整数值42。

Future接口

Future接口用于表示异步任务的执行结果,我们可以通过Future对象获取任务的执行结果、取消任务、判断任务是否完成等。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
  • cancel(boolean mayInterruptIfRunning):尝试取消任务的执行。如果任务已经完成、已经被取消或者由于某些原因无法取消,则返回false;如果任务还未开始执行,则任务将被取消且返回true;如果任务正在执行,且mayInterruptIfRunningtrue,则会尝试中断执行任务的线程来取消任务。
  • isCancelled():判断任务是否在完成之前被取消。
  • isDone():判断任务是否已经完成,任务完成包括正常结束、异常结束或被取消。
  • get():获取任务的执行结果,如果任务还未完成,则调用线程会被阻塞,直到任务完成。
  • get(long timeout, TimeUnit unit):在指定的时间内获取任务的执行结果,如果在指定时间内任务未完成,则抛出TimeoutException

下面是一个使用FutureCallable的示例:

import java.util.concurrent.*;

public class FutureAndCallableExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new MyCallable());

        try {
            System.out.println("Task result: " + future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}

class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        Thread.sleep(2000);
        return 42;
    }
}

在上述代码中,我们通过ExecutorService提交了一个MyCallable任务,并获取了对应的Future对象。通过future.get()方法,我们获取了任务的执行结果。

CompletionService接口

CompletionService接口结合了ExecutorBlockingQueue的功能,它允许我们在任务完成时获取任务的结果,而不需要按照任务提交的顺序获取。

CompletionService的使用

import java.util.concurrent.*;

public class CompletionServiceExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);

        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            completionService.submit(() -> {
                Thread.sleep((long) (Math.random() * 3000));
                return taskNumber * taskNumber;
            });
        }

        for (int i = 0; i < 5; i++) {
            try {
                Future<Integer> future = completionService.take();
                System.out.println("Task result: " + future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        executorService.shutdown();
    }
}

在上述代码中,我们创建了一个ExecutorCompletionService实例,并提交了5个任务。通过completionService.take()方法,我们可以按照任务完成的顺序获取任务的结果。这种方式在处理多个异步任务,且希望尽快获取已完成任务的结果时非常有用。

Fork/Join框架

Fork/Join框架是Executor框架的一个扩展,它专门用于处理可以被分解为多个小任务的大任务,通过递归分解任务并并行执行这些小任务,最后合并小任务的结果来得到大任务的结果。

Fork/Join框架的核心类

  • ForkJoinPoolFork/Join框架的线程池,它负责管理和调度ForkJoinTask任务。
  • ForkJoinTask:表示一个可以被分解和并行执行的任务,它是一个抽象类,有两个主要的实现类:RecursiveAction(用于没有返回值的任务)和RecursiveTask(用于有返回值的任务)。

示例:使用Fork/Join框架计算数组元素之和

import java.util.concurrent.*;

public class ForkJoinSumExample {
    public static void main(String[] args) {
        long[] numbers = new long[1000000];
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = i + 1;
        }

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new SumTask(numbers, 0, numbers.length);
        long result = forkJoinPool.invoke(task);
        System.out.println("Sum of array elements: " + result);
    }
}

class SumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long[] numbers;
    private int start;
    private int end;

    public SumTask(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += numbers[i];
            }
            return sum;
        } else {
            int mid = (start + end) / 2;
            SumTask leftTask = new SumTask(numbers, start, mid);
            SumTask rightTask = new SumTask(numbers, mid, end);

            leftTask.fork();
            long rightResult = rightTask.compute();
            long leftResult = leftTask.join();

            return leftResult + rightResult;
        }
    }
}

在上述代码中,我们定义了一个SumTask类继承自RecursiveTask,用于计算数组指定区间内元素的和。通过ForkJoinPool提交任务,并最终获取计算结果。Fork/Join框架通过递归分解任务,利用多线程并行计算,提高了计算效率。

总结

Executor框架是Java并发编程中的一个强大工具集,它涵盖了从简单的任务执行到复杂的线程池管理、异步任务处理以及任务分解并行计算等多个方面。通过合理使用Executor框架,我们可以有效地提高程序的性能、可维护性和资源利用率。无论是处理大量的短期任务,还是执行长时间运行的任务,Executor框架都提供了丰富的功能和灵活的配置选项来满足不同的需求。同时,结合FutureCallableCompletionService以及Fork/Join框架等相关组件,我们能够更全面地掌控并发任务的执行和结果处理,从而编写出高效、健壮的并发程序。在实际开发中,深入理解并熟练运用Executor框架是每个Java开发者必备的技能之一。

希望通过本文的介绍和示例代码,读者能够对Java并发包中的Executor框架有一个全面且深入的了解,并能够在自己的项目中灵活运用这些知识来优化并发程序的性能。