MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 线程池与串行执行任务

2022-03-251.7k 阅读

Java 线程池基础

在Java中,线程池是一种管理和复用线程的机制。线程的创建和销毁是有一定开销的,如果频繁地创建和销毁线程,会严重影响系统性能。线程池通过预先创建一定数量的线程,并将它们放入一个池中进行管理,当有任务需要执行时,从池中取出一个线程来执行任务,任务执行完毕后,线程并不会被销毁,而是返回池中等待下一个任务,这样就大大减少了线程创建和销毁的开销。

线程池的创建

在Java中,可以通过ThreadPoolExecutor类来创建线程池,也可以使用Executors类提供的一些静态方法来创建不同类型的线程池。

  1. 使用ThreadPoolExecutor类创建线程池 ThreadPoolExecutor类有多个构造函数,其最常用的构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 构造函数实现
}
  • corePoolSize:核心线程数,线程池中会一直保持的线程数量,即使这些线程处于空闲状态,也不会被销毁。
  • maximumPoolSize:最大线程数,线程池中允许存在的最大线程数量。当任务队列已满,且线程数量小于最大线程数时,会创建新的线程来处理任务。
  • keepAliveTime:线程的存活时间,当线程数量超过核心线程数时,多余的空闲线程在存活时间内没有任务可执行,就会被销毁。
  • unit:存活时间的单位,例如TimeUnit.SECONDS表示秒。
  • workQueue:任务队列,用于存放等待执行的任务。常用的任务队列有ArrayBlockingQueue(有界队列)、LinkedBlockingQueue(无界队列)等。
  • threadFactory:线程工厂,用于创建新的线程。可以通过线程工厂来设置线程的名称、优先级等属性。
  • handler:拒绝策略,当任务队列已满且线程数量达到最大线程数时,新提交的任务会被拒绝,此时就会使用到拒绝策略。常见的拒绝策略有AbortPolicy(默认策略,直接抛出异常)、CallerRunsPolicy(由调用者线程来执行任务)、DiscardPolicy(直接丢弃任务)、DiscardOldestPolicy(丢弃队列中最老的任务,然后尝试提交新任务)。

以下是一个使用ThreadPoolExecutor创建线程池的示例:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建任务队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // 核心线程数
                4, // 最大线程数
                10, // 存活时间
                TimeUnit.SECONDS,
                workQueue,
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 提交任务
        for (int i = 0; i < 15; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}

在这个示例中,我们创建了一个核心线程数为2,最大线程数为4,任务队列容量为10的线程池,并使用CallerRunsPolicy作为拒绝策略。然后提交了15个任务,由于任务队列容量为10,当提交第13个任务时,任务队列已满,且线程数量达到最大线程数4,此时第13、14、15个任务会由调用者线程(即主线程)来执行。

  1. 使用Executors类创建线程池 Executors类提供了一些静态方法来创建不同类型的线程池,方便我们快速创建线程池。
  • newFixedThreadPool(int nThreads):创建一个固定大小的线程池,核心线程数和最大线程数都为nThreads,任务队列是LinkedBlockingQueue(无界队列)。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}
  • newCachedThreadPool():创建一个可缓存的线程池,核心线程数为0,最大线程数为Integer.MAX_VALUE,任务队列是SynchronousQueue。如果线程池中有空闲线程,则复用空闲线程;如果没有空闲线程,则创建新的线程来执行任务。当线程空闲60秒后,会被销毁。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}
  • newSingleThreadExecutor():创建一个单线程的线程池,核心线程数和最大线程数都为1,任务队列是LinkedBlockingQueue。它保证所有任务按照提交的顺序依次执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}
  • newScheduledThreadPool(int corePoolSize):创建一个支持定时及周期性执行任务的线程池,核心线程数为corePoolSize,最大线程数为Integer.MAX_VALUE,任务队列是DelayedWorkQueue
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        executor.schedule(() -> {
            System.out.println("Delayed task is running on thread " + Thread.currentThread().getName());
        }, 3, TimeUnit.SECONDS);

        executor.scheduleAtFixedRate(() -> {
            System.out.println("Periodic task is running on thread " + Thread.currentThread().getName());
        }, 0, 2, TimeUnit.SECONDS);
    }
}

串行执行任务

在Java中,串行执行任务意味着一个任务执行完毕后,再执行下一个任务,任务之间是顺序执行的关系。

普通方式串行执行任务

最直接的方式就是在主线程中依次调用任务的执行方法。例如,假设有两个任务类Task1Task2

class Task1 implements Runnable {
    @Override
    public void run() {
        System.out.println("Task1 is running on thread " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Task2 implements Runnable {
    @Override
    public void run() {
        System.out.println("Task2 is running on thread " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

可以在主线程中依次执行这两个任务:

public class SerialExecutionExample {
    public static void main(String[] args) {
        Task1 task1 = new Task1();
        Task2 task2 = new Task2();

        task1.run();
        task2.run();
    }
}

在这个示例中,task1.run()执行完毕后,才会执行task2.run(),实现了任务的串行执行。

使用单线程线程池串行执行任务

正如前面提到的newSingleThreadExecutor()创建的线程池,它只有一个线程,所有提交的任务会在这个单一线程中依次执行。例如:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorSerialExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Task1 task1 = new Task1();
        Task2 task2 = new Task2();

        executor.submit(task1);
        executor.submit(task2);

        executor.shutdown();
    }
}

在这个示例中,task1task2会被提交到单线程线程池中,由于线程池只有一个线程,task1会先执行,执行完毕后task2才会执行,从而实现了任务的串行执行。

线程池与串行执行任务的比较

  1. 性能方面
  • 线程池:在处理大量短时间任务时,线程池由于复用线程,减少了线程创建和销毁的开销,性能会明显优于串行执行任务。例如,在一个Web服务器中,处理大量的HTTP请求,如果每次请求都创建一个新线程,会导致系统资源的大量消耗,而使用线程池可以高效地处理这些请求。
  • 串行执行任务:在任务数量较少,且任务执行时间较短的情况下,串行执行任务的性能与线程池相差不大,因为串行执行避免了线程调度和上下文切换的开销。但当任务数量增多或任务执行时间变长时,串行执行任务的性能会明显下降,因为所有任务都在一个线程中依次执行,无法利用多核CPU的优势。
  1. 资源占用方面
  • 线程池:线程池需要维护一定数量的线程,这些线程会占用系统资源,如内存、CPU时间片等。如果线程池设置不当,例如线程数量过多,会导致系统资源耗尽,影响系统的稳定性。
  • 串行执行任务:串行执行任务只需要一个线程,资源占用相对较少。但在处理大量任务时,由于任务执行时间长,可能会导致主线程长时间被占用,影响其他操作的执行。
  1. 应用场景方面
  • 线程池:适用于需要处理大量并发任务的场景,如Web服务器、分布式计算等。通过合理设置线程池参数,可以充分利用系统资源,提高系统的并发处理能力。
  • 串行执行任务:适用于任务之间存在依赖关系,必须按顺序执行的场景,或者任务数量较少且对资源占用要求较低的场景。例如,在一些简单的批处理任务中,任务之间需要顺序执行,此时串行执行任务更为合适。

线程池中的任务调度与串行执行的结合

在实际应用中,有时需要在线程池的基础上实现部分任务的串行执行。例如,有一组任务,其中某些任务之间存在依赖关系,需要顺序执行,而其他任务可以并发执行。

可以通过CompletableFuture来实现这种需求。CompletableFuture提供了丰富的方法来处理异步任务的组合和依赖关系。

假设我们有三个任务TaskATaskBTaskC,其中TaskB依赖于TaskA的执行结果,TaskC可以与TaskA并发执行。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class TaskA implements Runnable {
    @Override
    public void run() {
        System.out.println("TaskA is running on thread " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class TaskB implements Runnable {
    private String resultFromA;

    public TaskB(String resultFromA) {
        this.resultFromA = resultFromA;
    }

    @Override
    public void run() {
        System.out.println("TaskB is running on thread " + Thread.currentThread().getName() + " with result from A: " + resultFromA);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class TaskC implements Runnable {
    @Override
    public void run() {
        System.out.println("TaskC is running on thread " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

使用CompletableFuture和线程池来实现任务调度:

public class CombinedExecutionExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        CompletableFuture<Void> futureA = CompletableFuture.runAsync(new TaskA(), executor);
        CompletableFuture<Void> futureC = CompletableFuture.runAsync(new TaskC(), executor);

        CompletableFuture<Void> futureB = futureA.thenApplyAsync(result -> {
            return "Result from A";
        }, executor)
               .thenAcceptAsync(result -> {
                    new TaskB(result).run();
                }, executor);

        CompletableFuture.allOf(futureA, futureB, futureC).get();

        executor.shutdown();
    }
}

在这个示例中,TaskATaskC通过CompletableFuture.runAsync()方法提交到线程池中并发执行。TaskB通过futureA.thenApplyAsync()thenAcceptAsync()方法依赖于TaskA的执行结果,并在TaskA执行完毕后顺序执行。最后通过CompletableFuture.allOf()方法等待所有任务执行完毕。

总结线程池与串行执行任务的要点

  1. 线程池
  • 线程池是一种高效管理和复用线程的机制,通过合理设置核心线程数、最大线程数、任务队列和拒绝策略,可以提高系统的并发处理能力。
  • 不同类型的线程池适用于不同的场景,newFixedThreadPool适用于需要固定线程数量的场景,newCachedThreadPool适用于任务数量不确定且执行时间短的场景,newSingleThreadExecutor适用于需要顺序执行任务的场景,newScheduledThreadPool适用于需要定时或周期性执行任务的场景。
  • 在使用线程池时,要注意避免线程泄漏和资源耗尽的问题,合理设置线程池参数,并根据实际情况进行调优。
  1. 串行执行任务
  • 串行执行任务是任务顺序执行的方式,适用于任务之间存在依赖关系或对资源占用要求较低的场景。
  • 可以通过在主线程中依次调用任务方法或使用单线程线程池来实现串行执行任务。
  • 在处理大量任务时,串行执行任务可能会导致性能瓶颈,需要根据实际情况考虑是否使用并发执行方式。
  1. 结合使用
  • 在实际应用中,往往需要结合线程池和串行执行任务的方式来满足复杂的业务需求。
  • CompletableFuture等工具可以帮助我们在线程池的基础上实现任务的依赖和顺序执行,提高系统的灵活性和可扩展性。

通过深入理解Java线程池和串行执行任务的原理及应用场景,并结合实际业务需求进行合理选择和优化,可以开发出高效、稳定的多线程应用程序。在实际开发中,还需要注意线程安全、资源管理等问题,确保程序的正确性和可靠性。