MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java 定时及周期执行线程池的任务调度

2023-05-091.4k 阅读

Java 定时及周期执行线程池的任务调度

在 Java 编程中,任务调度是一个常见的需求。我们经常需要在特定的时间点执行任务,或者按照一定的周期重复执行任务。Java 提供了多种方式来实现任务调度,其中使用线程池进行定时及周期任务调度是一种高效且灵活的方法。

1. 任务调度基础概念

在深入探讨线程池的任务调度之前,我们先来了解一些基本概念。

  • 定时任务:指在某个特定的时间点执行一次的任务。例如,每天凌晨 2 点进行数据备份,这就是一个定时任务。

  • 周期任务:按照一定的时间间隔重复执行的任务。比如,每隔 5 分钟检查一次系统状态,这是一个周期任务。

  • 线程池:是一种管理和复用线程的机制。它维护着一组线程,当有任务提交时,线程池会分配一个线程来执行任务,任务执行完毕后,线程不会被销毁,而是返回线程池等待下一个任务,这样可以避免频繁创建和销毁线程带来的开销。

2. Java 中的任务调度相关类

在 Java 中,有几个关键的类和接口用于任务调度,尤其是结合线程池来实现定时和周期任务调度。

  • ScheduledExecutorService:这是一个接口,继承自 ExecutorService。它提供了用于调度任务的方法,允许我们在给定的延迟后执行任务,或者定期执行任务。

  • ScheduledThreadPoolExecutor:是 ScheduledExecutorService 接口的实现类。它创建了一个线程池,可以安排在给定的延迟后运行命令,或者定期执行命令。

  • Runnable:这是一个定义任务的接口,任何实现了 Runnable 接口的类都可以作为一个任务提交给线程池执行。

  • Callable:与 Runnable 类似,也是用于定义任务的接口,但 Callable 可以返回一个结果,并且可以抛出异常。

3. 定时任务调度实现

下面我们通过代码示例来展示如何使用 ScheduledThreadPoolExecutor 实现定时任务调度。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class OneTimeTaskScheduler {
    public static void main(String[] args) {
        // 创建一个 ScheduledThreadPoolExecutor,线程池大小为 1
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        // 定义要执行的任务
        Runnable task = () -> System.out.println("One-time task executed at " + System.currentTimeMillis());

        // 在延迟 5 秒后执行任务
        scheduler.schedule(task, 5, TimeUnit.SECONDS);

        // 关闭线程池
        scheduler.shutdown();
    }
}

在上述代码中:

  • 首先,通过 Executors.newScheduledThreadPool(1) 创建了一个 ScheduledThreadPoolExecutor,线程池大小为 1,意味着同一时间最多只有一个任务在执行。

  • 然后,定义了一个 Runnable 任务,该任务在执行时会打印当前时间。

  • 接着,使用 scheduler.schedule(task, 5, TimeUnit.SECONDS) 方法安排任务在延迟 5 秒后执行。schedule 方法的第一个参数是要执行的任务,第二个参数是延迟时间,第三个参数是延迟时间的单位。

  • 最后,调用 scheduler.shutdown() 关闭线程池。如果不调用 shutdown(),线程池会一直运行,程序不会结束。

4. 周期任务调度实现

实现周期任务调度同样使用 ScheduledThreadPoolExecutor,不过使用的方法略有不同。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class PeriodicTaskScheduler {
    public static void main(String[] args) {
        // 创建一个 ScheduledThreadPoolExecutor,线程池大小为 1
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        // 定义要执行的周期任务
        Runnable task = () -> System.out.println("Periodic task executed at " + System.currentTimeMillis());

        // 初始延迟 0 秒后开始执行任务,之后每隔 3 秒执行一次
        scheduler.scheduleAtFixedRate(task, 0, 3, TimeUnit.SECONDS);

        // 这里不调用 shutdown(),因为任务是周期执行的,程序会一直运行
    }
}

在这个示例中:

  • 同样先创建了一个 ScheduledThreadPoolExecutor

  • 定义了一个 Runnable 任务,每次执行时打印当前时间。

  • 使用 scheduler.scheduleAtFixedRate(task, 0, 3, TimeUnit.SECONDS) 方法来安排周期任务。scheduleAtFixedRate 方法的第一个参数是要执行的任务,第二个参数是初始延迟时间,这里设置为 0 秒,意味着任务立即开始执行,第三个参数是任务执行的周期,这里是 3 秒,即每隔 3 秒执行一次任务,第四个参数是时间单位。

如果我们希望任务在固定延迟后执行,而不是固定速率执行,可以使用 scheduleWithFixedDelay 方法,示例如下:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class PeriodicTaskSchedulerWithFixedDelay {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        Runnable task = () -> {
            System.out.println("Task started at " + System.currentTimeMillis());
            try {
                // 模拟任务执行时间
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Task ended at " + System.currentTimeMillis());
        };

        // 初始延迟 0 秒后开始执行任务,每次执行完后延迟 3 秒再执行下一次
        scheduler.scheduleWithFixedDelay(task, 0, 3, TimeUnit.SECONDS);
    }
}

在这个例子中,scheduleWithFixedDelay 方法的作用是在任务完成后,等待指定的延迟时间(这里是 3 秒)再启动下一次任务。任务内部模拟了 2 秒的执行时间,通过打印开始和结束时间可以观察到任务执行的间隔。

5. 任务调度中的线程池管理

在使用 ScheduledThreadPoolExecutor 进行任务调度时,合理管理线程池非常重要。

  • 线程池大小调整:线程池大小的设置要根据任务的类型和数量来决定。如果任务是 I/O 密集型的,线程池大小可以适当设置得大一些,因为 I/O 操作通常会使线程处于等待状态,不会占用太多 CPU 资源。如果是 CPU 密集型任务,线程池大小一般设置为 CPU 核心数加 1,以充分利用 CPU 资源,同时避免过多线程导致的上下文切换开销。

  • 任务队列ScheduledThreadPoolExecutor 内部使用了一个任务队列来存储等待执行的任务。如果任务提交速度过快,超过了线程池的处理能力,任务会在队列中等待。需要注意队列的容量,如果队列容量过小,可能会导致任务被拒绝。可以通过构造函数来设置任务队列的容量。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolWithQueue {
    public static void main(String[] args) {
        // 创建一个容量为 10 的任务队列
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10);
        // 创建 ScheduledThreadPoolExecutor,指定线程池大小和任务队列
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(3, taskQueue);

        // 提交任务
        for (int i = 0; i < 20; i++) {
            int taskNumber = i;
            scheduler.schedule(() -> System.out.println("Task " + taskNumber + " executed"), 1, TimeUnit.SECONDS);
        }

        scheduler.shutdown();
    }
}

在上述代码中,我们创建了一个容量为 10 的 LinkedBlockingQueue 作为任务队列,并将其传递给 ScheduledThreadPoolExecutor 的构造函数。这样,当线程池中的线程都在忙碌时,新提交的任务会进入队列等待,最多可以容纳 10 个任务。如果任务提交数量超过队列容量,会根据线程池的拒绝策略进行处理。

  • 拒绝策略:当任务队列已满且线程池中的线程都在忙碌时,新提交的任务会被拒绝。ScheduledThreadPoolExecutor 提供了几种默认的拒绝策略:

    • AbortPolicy:这是默认的拒绝策略,当任务被拒绝时,会抛出 RejectedExecutionException 异常。

    • CallerRunsPolicy:当任务被拒绝时,会在调用 execute 方法的线程中直接执行被拒绝的任务。

    • DiscardPolicy:当任务被拒绝时,直接丢弃任务,不做任何处理。

    • DiscardOldestPolicy:当任务被拒绝时,丢弃队列中最老的任务(即将被执行的任务),然后尝试提交新任务。

    可以通过构造函数来设置拒绝策略:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolWithRejectionPolicy {
    public static void main(String[] args) {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10);

        // 设置拒绝策略为 CallerRunsPolicy
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(3, taskQueue, handler);

        for (int i = 0; i < 20; i++) {
            int taskNumber = i;
            scheduler.schedule(() -> System.out.println("Task " + taskNumber + " executed"), 1, TimeUnit.SECONDS);
        }

        scheduler.shutdown();
    }
}

在这个例子中,我们通过构造函数将拒绝策略设置为 CallerRunsPolicy

6. 任务调度中的异常处理

在任务调度过程中,任务可能会抛出异常。如果不进行适当的处理,这些异常可能会导致线程池中的线程终止,影响任务的正常调度。

对于 Runnable 任务,可以在任务内部捕获异常:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TaskExceptionHandling {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        Runnable task = () -> {
            try {
                // 模拟可能抛出异常的操作
                int result = 10 / 0;
                System.out.println("Task executed successfully: " + result);
            } catch (ArithmeticException e) {
                System.out.println("Task caught an exception: " + e.getMessage());
            }
        };

        scheduler.schedule(task, 1, TimeUnit.SECONDS);

        scheduler.shutdown();
    }
}

在上述代码中,Runnable 任务内部捕获了可能抛出的 ArithmeticException 异常,避免了异常导致线程终止。

对于 Callable 任务,可以通过 Future 获取任务执行结果并处理异常:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class CallableTaskExceptionHandling {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        Callable<Integer> task = () -> {
            // 模拟可能抛出异常的操作
            int result = 10 / 0;
            return result;
        };

        Future<Integer> future = scheduler.schedule(task, 1, TimeUnit.SECONDS);

        try {
            Integer result = future.get();
            System.out.println("Task executed successfully: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("Task caught an exception: " + e.getMessage());
        }

        scheduler.shutdown();
    }
}

在这个例子中,通过 future.get() 获取 Callable 任务的执行结果,如果任务执行过程中抛出异常,get() 方法会抛出 ExecutionException,我们可以在 catch 块中进行处理。

7. 应用场景

  • 数据备份与恢复:在数据库管理中,定时任务可以用于每天凌晨进行数据备份,确保数据的安全性。周期任务可以每隔一定时间检查备份状态,确保备份过程正常。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DatabaseBackupScheduler {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        Runnable backupTask = () -> {
            // 执行数据库备份的代码逻辑
            System.out.println("Database backup executed at " + System.currentTimeMillis());
        };

        // 每天凌晨 2 点执行备份任务
        long initialDelay = calculateInitialDelayTo2AM();
        scheduler.scheduleAtFixedRate(backupTask, initialDelay, 24, TimeUnit.HOURS);
    }

    private static long calculateInitialDelayTo2AM() {
        // 计算距离当天凌晨 2 点的时间差
        // 这里省略具体实现,可根据当前时间计算
        return 0;
    }
}
  • 系统监控:周期任务可以每隔几分钟检查系统的 CPU、内存使用率等指标,及时发现系统性能问题。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class SystemMonitorScheduler {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        Runnable monitorTask = () -> {
            // 获取并打印系统 CPU、内存使用率等指标的代码逻辑
            System.out.println("System monitoring executed at " + System.currentTimeMillis());
        };

        scheduler.scheduleAtFixedRate(monitorTask, 0, 5, TimeUnit.MINUTES);
    }
}
  • 消息推送:在一些应用中,需要定时向用户推送消息,如每日新闻推送。可以使用定时任务在特定时间点触发消息推送逻辑。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class MessagePushScheduler {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        Runnable pushTask = () -> {
            // 执行消息推送的代码逻辑
            System.out.println("Message push executed at " + System.currentTimeMillis());
        };

        // 每天早上 8 点推送消息
        long initialDelay = calculateInitialDelayTo8AM();
        scheduler.scheduleAtFixedRate(pushTask, initialDelay, 24, TimeUnit.HOURS);
    }

    private static long calculateInitialDelayTo8AM() {
        // 计算距离当天早上 8 点的时间差
        // 这里省略具体实现,可根据当前时间计算
        return 0;
    }
}

通过以上内容,我们详细介绍了 Java 中使用线程池进行定时及周期任务调度的相关知识,包括基础概念、相关类、具体实现、线程池管理、异常处理以及应用场景等方面。希望这些内容能帮助你在实际项目中灵活运用任务调度技术,提高程序的效率和可靠性。