Java DelayQueue的延迟元素处理机制
2021-03-236.7k 阅读
Java DelayQueue简介
在Java并发包java.util.concurrent
中,DelayQueue
是一个无界阻塞队列,用于存放实现了Delayed
接口的对象。队列中的元素只有当其延迟时间到期后才能够出队。这一特性使得DelayQueue
在实现诸如定时任务调度、缓存数据过期清理等场景中发挥着重要作用。
Delayed接口剖析
Delayed
接口继承自Comparable<Delayed>
接口,定义如下:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
getDelay(TimeUnit unit)
方法:该方法用于返回距离该元素到期还剩余的时间,时间单位由unit
参数指定。例如,如果希望获取距离到期剩余的秒数,可以调用getDelay(TimeUnit.SECONDS)
。Comparable<Delayed>
接口的实现:Delayed
接口继承了Comparable
接口,这意味着实现Delayed
接口的类需要实现compareTo
方法。在DelayQueue
中,这个方法用于比较元素的到期时间,以确定元素在队列中的顺序。通常情况下,到期时间早的元素会排在队列前面。
DelayQueue的内部实现
- 存储结构:
DelayQueue
内部使用PriorityQueue
来存储元素。PriorityQueue
是一个基于堆数据结构的优先队列,它会根据元素的自然顺序或者自定义的比较器对元素进行排序。在DelayQueue
中,元素按照到期时间的先后顺序进行排序,最早到期的元素位于队列头部。 - 锁机制:
DelayQueue
使用一把ReentrantLock
来保证线程安全。在对队列进行插入、删除等操作时,需要获取这把锁,以避免多线程并发访问导致的数据不一致问题。 - 等待机制:
DelayQueue
中有一个Condition
对象,用于实现线程的等待和唤醒。当从队列中获取元素时,如果队列为空或者队首元素的延迟时间还未到期,获取元素的线程会在Condition
上等待,直到有新元素加入队列且该元素到期,或者等待被中断。
代码示例1:简单的延迟任务调度
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
class DelayedTask implements Delayed {
private final long delayTime;
private final long startTime;
private final String taskName;
public DelayedTask(String taskName, long delayTime) {
this.taskName = taskName;
this.delayTime = delayTime;
this.startTime = System.nanoTime();
}
@Override
public long getDelay(TimeUnit unit) {
long elapsedTime = System.nanoTime() - startTime;
long remainingDelay = delayTime - elapsedTime / 1000000;
return unit.convert(remainingDelay, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
long diff = this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);
return (int) diff;
}
@Override
public String toString() {
return "DelayedTask{" +
"taskName='" + taskName + '\'' +
", delayTime=" + delayTime +
'}';
}
}
public class DelayQueueExample {
public static void main(String[] args) {
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.add(new DelayedTask("Task1", 3000));
delayQueue.add(new DelayedTask("Task2", 1000));
new Thread(() -> {
try {
while (true) {
DelayedTask task = delayQueue.take();
System.out.println("Executing task: " + task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
在上述代码中:
- 定义了
DelayedTask
类实现Delayed
接口,在getDelay
方法中计算任务距离到期的剩余时间。 compareTo
方法用于比较任务的到期时间。- 在
main
方法中,创建了DelayQueue
并添加了两个延迟任务,一个延迟3秒,一个延迟1秒。启动一个线程从队列中获取任务并执行,会先执行延迟1秒的任务,再执行延迟3秒的任务。
代码示例2:缓存数据过期清理
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
class CacheEntry<T> implements Delayed {
private final long expirationTime;
private final T value;
private final String key;
public CacheEntry(String key, T value, long duration, TimeUnit unit) {
this.key = key;
this.value = value;
this.expirationTime = System.nanoTime() + unit.toNanos(duration);
}
public T getValue() {
return value;
}
@Override
public long getDelay(TimeUnit unit) {
long currentTime = System.nanoTime();
long remainingTime = expirationTime - currentTime;
return unit.convert(remainingTime, TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
return (int) diff;
}
}
class Cache<T> {
private final Map<String, T> cacheMap = new HashMap<>();
private final DelayQueue<CacheEntry<T>> delayQueue = new DelayQueue<>();
public void put(String key, T value, long duration, TimeUnit unit) {
CacheEntry<T> entry = new CacheEntry<>(key, value, duration, unit);
cacheMap.put(key, value);
delayQueue.add(entry);
}
public T get(String key) {
return cacheMap.get(key);
}
public void startExpirationThread() {
new Thread(() -> {
try {
while (true) {
CacheEntry<T> expiredEntry = delayQueue.take();
cacheMap.remove(expiredEntry.key);
System.out.println("Removed expired entry: " + expiredEntry.key);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
public class CacheExample {
public static void main(String[] args) {
Cache<String> cache = new Cache<>();
cache.put("key1", "value1", 3, TimeUnit.SECONDS);
cache.put("key2", "value2", 1, TimeUnit.SECONDS);
cache.startExpirationThread();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在这个示例中:
- 定义了
CacheEntry
类实现Delayed
接口,用于表示缓存中的数据项及其过期时间。 Cache
类包含一个Map
用于存储缓存数据,一个DelayQueue
用于管理过期的缓存项。put
方法将数据放入缓存并添加到延迟队列,get
方法从缓存中获取数据。startExpirationThread
方法启动一个线程,不断从延迟队列中取出过期的缓存项并从Map
中移除。在main
方法中,向缓存中添加两个数据项,分别设置不同的过期时间,并启动过期清理线程,模拟缓存数据的过期清理过程。
注意事项
- 元素的顺序性:由于
DelayQueue
基于PriorityQueue
,元素的顺序依赖于Delayed
接口中compareTo
方法的实现。确保compareTo
方法正确实现,以保证元素按照预期的顺序排列,否则可能导致任务执行顺序错误或队列操作异常。 - 内存管理:虽然
DelayQueue
是无界队列,但如果不断向队列中添加元素而不及时取出,可能会导致内存占用不断增加。在实际应用中,需要根据业务场景合理控制元素的添加和移除,避免内存溢出问题。 - 线程安全:
DelayQueue
本身是线程安全的,但在与其他数据结构(如上述缓存示例中的Map
)结合使用时,需要注意整体的线程安全。如果多个线程同时对相关数据结构进行读写操作,可能需要额外的同步机制来保证数据一致性。 - 任务取消:在使用
DelayQueue
实现任务调度时,如果需要支持任务取消功能,需要在任务类中添加相应的标识,并在获取任务和执行任务的过程中检查该标识,以实现任务的及时取消。
总结
DelayQueue
是Java并发包中一个强大且实用的工具,通过实现Delayed
接口和合理利用队列的特性,可以轻松实现延迟任务调度、缓存数据过期清理等功能。在实际应用中,需要深入理解其内部机制和注意事项,以确保系统的高效、稳定运行。无论是小型应用的定时任务,还是大型分布式系统中的缓存管理,DelayQueue
都能发挥重要作用。通过不断实践和优化,能够更好地利用DelayQueue
的优势,提升系统的性能和可维护性。同时,结合其他并发工具和设计模式,可以进一步拓展其应用场景,满足复杂多变的业务需求。