MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java DelayQueue的延迟元素处理机制

2021-03-236.7k 阅读

Java DelayQueue简介

在Java并发包java.util.concurrent中,DelayQueue是一个无界阻塞队列,用于存放实现了Delayed接口的对象。队列中的元素只有当其延迟时间到期后才能够出队。这一特性使得DelayQueue在实现诸如定时任务调度、缓存数据过期清理等场景中发挥着重要作用。

Delayed接口剖析

Delayed接口继承自Comparable<Delayed>接口,定义如下:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
  • getDelay(TimeUnit unit)方法:该方法用于返回距离该元素到期还剩余的时间,时间单位由unit参数指定。例如,如果希望获取距离到期剩余的秒数,可以调用getDelay(TimeUnit.SECONDS)
  • Comparable<Delayed>接口的实现Delayed接口继承了Comparable接口,这意味着实现Delayed接口的类需要实现compareTo方法。在DelayQueue中,这个方法用于比较元素的到期时间,以确定元素在队列中的顺序。通常情况下,到期时间早的元素会排在队列前面。

DelayQueue的内部实现

  1. 存储结构DelayQueue内部使用PriorityQueue来存储元素。PriorityQueue是一个基于堆数据结构的优先队列,它会根据元素的自然顺序或者自定义的比较器对元素进行排序。在DelayQueue中,元素按照到期时间的先后顺序进行排序,最早到期的元素位于队列头部。
  2. 锁机制DelayQueue使用一把ReentrantLock来保证线程安全。在对队列进行插入、删除等操作时,需要获取这把锁,以避免多线程并发访问导致的数据不一致问题。
  3. 等待机制DelayQueue中有一个Condition对象,用于实现线程的等待和唤醒。当从队列中获取元素时,如果队列为空或者队首元素的延迟时间还未到期,获取元素的线程会在Condition上等待,直到有新元素加入队列且该元素到期,或者等待被中断。

代码示例1:简单的延迟任务调度

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedTask implements Delayed {
    private final long delayTime;
    private final long startTime;
    private final String taskName;

    public DelayedTask(String taskName, long delayTime) {
        this.taskName = taskName;
        this.delayTime = delayTime;
        this.startTime = System.nanoTime();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long elapsedTime = System.nanoTime() - startTime;
        long remainingDelay = delayTime - elapsedTime / 1000000;
        return unit.convert(remainingDelay, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        long diff = this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);
        return (int) diff;
    }

    @Override
    public String toString() {
        return "DelayedTask{" +
                "taskName='" + taskName + '\'' +
                ", delayTime=" + delayTime +
                '}';
    }
}

public class DelayQueueExample {
    public static void main(String[] args) {
        DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
        delayQueue.add(new DelayedTask("Task1", 3000));
        delayQueue.add(new DelayedTask("Task2", 1000));

        new Thread(() -> {
            try {
                while (true) {
                    DelayedTask task = delayQueue.take();
                    System.out.println("Executing task: " + task);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

在上述代码中:

  • 定义了DelayedTask类实现Delayed接口,在getDelay方法中计算任务距离到期的剩余时间。
  • compareTo方法用于比较任务的到期时间。
  • main方法中,创建了DelayQueue并添加了两个延迟任务,一个延迟3秒,一个延迟1秒。启动一个线程从队列中获取任务并执行,会先执行延迟1秒的任务,再执行延迟3秒的任务。

代码示例2:缓存数据过期清理

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class CacheEntry<T> implements Delayed {
    private final long expirationTime;
    private final T value;
    private final String key;

    public CacheEntry(String key, T value, long duration, TimeUnit unit) {
        this.key = key;
        this.value = value;
        this.expirationTime = System.nanoTime() + unit.toNanos(duration);
    }

    public T getValue() {
        return value;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long currentTime = System.nanoTime();
        long remainingTime = expirationTime - currentTime;
        return unit.convert(remainingTime, TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
        return (int) diff;
    }
}

class Cache<T> {
    private final Map<String, T> cacheMap = new HashMap<>();
    private final DelayQueue<CacheEntry<T>> delayQueue = new DelayQueue<>();

    public void put(String key, T value, long duration, TimeUnit unit) {
        CacheEntry<T> entry = new CacheEntry<>(key, value, duration, unit);
        cacheMap.put(key, value);
        delayQueue.add(entry);
    }

    public T get(String key) {
        return cacheMap.get(key);
    }

    public void startExpirationThread() {
        new Thread(() -> {
            try {
                while (true) {
                    CacheEntry<T> expiredEntry = delayQueue.take();
                    cacheMap.remove(expiredEntry.key);
                    System.out.println("Removed expired entry: " + expiredEntry.key);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

public class CacheExample {
    public static void main(String[] args) {
        Cache<String> cache = new Cache<>();
        cache.put("key1", "value1", 3, TimeUnit.SECONDS);
        cache.put("key2", "value2", 1, TimeUnit.SECONDS);
        cache.startExpirationThread();

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中:

  • 定义了CacheEntry类实现Delayed接口,用于表示缓存中的数据项及其过期时间。
  • Cache类包含一个Map用于存储缓存数据,一个DelayQueue用于管理过期的缓存项。put方法将数据放入缓存并添加到延迟队列,get方法从缓存中获取数据。
  • startExpirationThread方法启动一个线程,不断从延迟队列中取出过期的缓存项并从Map中移除。在main方法中,向缓存中添加两个数据项,分别设置不同的过期时间,并启动过期清理线程,模拟缓存数据的过期清理过程。

注意事项

  1. 元素的顺序性:由于DelayQueue基于PriorityQueue,元素的顺序依赖于Delayed接口中compareTo方法的实现。确保compareTo方法正确实现,以保证元素按照预期的顺序排列,否则可能导致任务执行顺序错误或队列操作异常。
  2. 内存管理:虽然DelayQueue是无界队列,但如果不断向队列中添加元素而不及时取出,可能会导致内存占用不断增加。在实际应用中,需要根据业务场景合理控制元素的添加和移除,避免内存溢出问题。
  3. 线程安全DelayQueue本身是线程安全的,但在与其他数据结构(如上述缓存示例中的Map)结合使用时,需要注意整体的线程安全。如果多个线程同时对相关数据结构进行读写操作,可能需要额外的同步机制来保证数据一致性。
  4. 任务取消:在使用DelayQueue实现任务调度时,如果需要支持任务取消功能,需要在任务类中添加相应的标识,并在获取任务和执行任务的过程中检查该标识,以实现任务的及时取消。

总结

DelayQueue是Java并发包中一个强大且实用的工具,通过实现Delayed接口和合理利用队列的特性,可以轻松实现延迟任务调度、缓存数据过期清理等功能。在实际应用中,需要深入理解其内部机制和注意事项,以确保系统的高效、稳定运行。无论是小型应用的定时任务,还是大型分布式系统中的缓存管理,DelayQueue都能发挥重要作用。通过不断实践和优化,能够更好地利用DelayQueue的优势,提升系统的性能和可维护性。同时,结合其他并发工具和设计模式,可以进一步拓展其应用场景,满足复杂多变的业务需求。