Java中的AQS(AbstractQueuedSynchronizer)源码解析
Java中的AQS(AbstractQueuedSynchronizer)源码解析
在Java并发编程领域,AQS(AbstractQueuedSynchronizer)是一个核心框架,许多同步工具类如ReentrantLock、Semaphore、CountDownLatch等都是基于AQS构建的。深入理解AQS的原理和源码,对于编写高效、安全的并发程序至关重要。
AQS的基本概念
AQS是一个用于构建锁和同步器的框架,它提供了一种基于FIFO队列来管理线程等待和唤醒的机制。AQS内部维护了一个先进先出(FIFO)的队列,这个队列用于存储等待获取锁的线程。当一个线程尝试获取锁失败时,它会被封装成一个节点加入到这个队列中,并进入等待状态。当锁被释放时,队列中的头节点线程会被唤醒,尝试获取锁。
AQS的核心数据结构
- Node类 AQS的队列由一个个Node节点组成,Node类定义了线程等待队列的基本单元。每个Node包含了线程的引用、等待状态、前驱节点和后继节点的引用等信息。
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 等待状态:取消
static final int CANCELLED = 1;
// 等待状态:唤醒后继节点
static final int SIGNAL = -1;
// 等待状态:线程在Condition上等待
static final int CONDITION = -2;
// 等待状态:传播共享锁释放信号
static final int PROPAGATE = -3;
// 当前节点的等待状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 关联的线程
volatile Thread thread;
// 下一个等待在Condition上的节点
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
- head和tail AQS类中有两个重要的成员变量head和tail,分别指向等待队列的头节点和尾节点。
private transient volatile Node head;
private transient volatile Node tail;
AQS的核心方法
- acquire(int arg)
该方法用于获取锁,以独占模式获取锁。首先尝试通过
tryAcquire
方法获取锁,如果获取成功则直接返回。如果获取失败,则将当前线程加入等待队列,并进入等待状态,直到获取到锁或者被中断。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire(int arg)
这是一个需要子类实现的方法,用于尝试获取锁。不同的同步器根据自身的逻辑实现这个方法。例如,在
ReentrantLock
中,tryAcquire
方法会检查当前线程是否已经持有锁,如果是则增加持有次数,否则尝试获取锁。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
- addWaiter(Node mode) 该方法用于将当前线程封装成一个Node节点,并添加到等待队列的尾部。如果队列不为空,则通过CAS操作快速将节点添加到队列尾部。如果队列为空,则先初始化队列,再添加节点。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
- enq(final Node node) 这是一个自旋操作,用于确保节点能够成功添加到队列尾部。在添加过程中,如果队列还未初始化,则先初始化队列。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- acquireQueued(final Node node, int arg) 该方法用于在等待队列中自旋等待获取锁。当前线程进入队列后,会不断尝试获取锁。如果前驱节点是头节点且当前线程能够获取到锁,则将自己设置为头节点并返回。如果获取锁失败,则根据前驱节点的等待状态决定是否需要挂起当前线程。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- shouldParkAfterFailedAcquire(Node pred, Node node) 该方法用于判断在获取锁失败后,当前线程是否应该被挂起。它会检查前驱节点的等待状态,如果前驱节点的等待状态为SIGNAL,则当前线程可以被挂起。如果前驱节点的等待状态为CANCELLED,则需要向前遍历队列,找到第一个非CANCELLED的节点,并将其设置为当前节点的前驱节点。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- parkAndCheckInterrupt() 该方法用于挂起当前线程,并返回线程是否被中断。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- release(int arg)
该方法用于释放锁,以独占模式释放锁。首先尝试通过
tryRelease
方法释放锁,如果释放成功,则唤醒等待队列中的头节点线程。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- tryRelease(int arg)
这是一个需要子类实现的方法,用于尝试释放锁。例如,在
ReentrantLock
中,tryRelease
方法会减少锁的持有次数,如果持有次数为0,则释放锁。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
- unparkSuccessor(Node node)
该方法用于唤醒后继节点线程。它会找到等待队列中第一个非CANCELLED的后继节点,并使用
LockSupport.unpark
方法唤醒该节点关联的线程。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
以ReentrantLock为例分析AQS的应用
ReentrantLock
是基于AQS实现的可重入锁。它内部定义了一个继承自AQS的Sync
类,并重写了tryAcquire
和tryRelease
方法。
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively()? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
// 其他方法省略
}
在ReentrantLock
的NonfairSync
实现中,lock
方法首先尝试通过compareAndSetState
方法将状态值从0设置为1,如果设置成功,则表示获取锁成功,并将当前线程设置为独占线程。如果设置失败,则调用acquire(1)
方法,进入AQS的获取锁流程。
tryAcquire
方法中,首先检查状态值是否为0,如果为0且当前队列中没有前驱节点(对于公平锁),则尝试通过compareAndSetState
方法获取锁。如果状态值不为0且当前线程是独占线程,则增加持有次数。
tryRelease
方法中,减少状态值,如果状态值减为0,则释放锁,并将独占线程设置为null。
总结AQS的优势和应用场景
AQS提供了一种通用的同步框架,使得开发者可以方便地构建各种同步工具。它的优势在于:
- 高效性:通过基于FIFO队列的等待和唤醒机制,减少了线程上下文切换的开销。
- 灵活性:可以通过实现
tryAcquire
和tryRelease
等方法,灵活地构建不同类型的同步器,如独占锁、共享锁、信号量等。 - 可扩展性:AQS的设计具有良好的扩展性,可以在其基础上进一步开发更复杂的同步工具。
AQS的应用场景非常广泛,包括但不限于:
- 实现锁:如
ReentrantLock
、ReentrantReadWriteLock
等。 - 实现同步器:如
Semaphore
、CountDownLatch
、CyclicBarrier
等。 - 构建自定义同步工具:开发者可以根据自身需求,基于AQS构建自定义的同步工具。
通过深入理解AQS的原理和源码,开发者能够更好地掌握Java并发编程的核心技术,编写出高效、可靠的并发程序。在实际应用中,根据不同的需求选择合适的同步工具,并合理地使用AQS框架,能够有效地提升程序的性能和并发处理能力。同时,对于一些复杂的并发场景,基于AQS进行自定义同步工具的开发,可以满足特定的业务需求,实现更细粒度的并发控制。