MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java中的AQS(AbstractQueuedSynchronizer)源码解析

2023-10-225.5k 阅读

Java中的AQS(AbstractQueuedSynchronizer)源码解析

在Java并发编程领域,AQS(AbstractQueuedSynchronizer)是一个核心框架,许多同步工具类如ReentrantLock、Semaphore、CountDownLatch等都是基于AQS构建的。深入理解AQS的原理和源码,对于编写高效、安全的并发程序至关重要。

AQS的基本概念

AQS是一个用于构建锁和同步器的框架,它提供了一种基于FIFO队列来管理线程等待和唤醒的机制。AQS内部维护了一个先进先出(FIFO)的队列,这个队列用于存储等待获取锁的线程。当一个线程尝试获取锁失败时,它会被封装成一个节点加入到这个队列中,并进入等待状态。当锁被释放时,队列中的头节点线程会被唤醒,尝试获取锁。

AQS的核心数据结构

  1. Node类 AQS的队列由一个个Node节点组成,Node类定义了线程等待队列的基本单元。每个Node包含了线程的引用、等待状态、前驱节点和后继节点的引用等信息。
static final class Node {
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;

    // 等待状态:取消
    static final int CANCELLED =  1;
    // 等待状态:唤醒后继节点
    static final int SIGNAL    = -1;
    // 等待状态:线程在Condition上等待
    static final int CONDITION = -2;
    // 等待状态:传播共享锁释放信号
    static final int PROPAGATE = -3;

    // 当前节点的等待状态
    volatile int waitStatus;
    // 前驱节点
    volatile Node prev;
    // 后继节点
    volatile Node next;
    // 关联的线程
    volatile Thread thread;
    // 下一个等待在Condition上的节点
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
  1. head和tail AQS类中有两个重要的成员变量head和tail,分别指向等待队列的头节点和尾节点。
private transient volatile Node head;
private transient volatile Node tail;

AQS的核心方法

  1. acquire(int arg) 该方法用于获取锁,以独占模式获取锁。首先尝试通过tryAcquire方法获取锁,如果获取成功则直接返回。如果获取失败,则将当前线程加入等待队列,并进入等待状态,直到获取到锁或者被中断。
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. tryAcquire(int arg) 这是一个需要子类实现的方法,用于尝试获取锁。不同的同步器根据自身的逻辑实现这个方法。例如,在ReentrantLock中,tryAcquire方法会检查当前线程是否已经持有锁,如果是则增加持有次数,否则尝试获取锁。
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
  1. addWaiter(Node mode) 该方法用于将当前线程封装成一个Node节点,并添加到等待队列的尾部。如果队列不为空,则通过CAS操作快速将节点添加到队列尾部。如果队列为空,则先初始化队列,再添加节点。
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
  1. enq(final Node node) 这是一个自旋操作,用于确保节点能够成功添加到队列尾部。在添加过程中,如果队列还未初始化,则先初始化队列。
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  1. acquireQueued(final Node node, int arg) 该方法用于在等待队列中自旋等待获取锁。当前线程进入队列后,会不断尝试获取锁。如果前驱节点是头节点且当前线程能够获取到锁,则将自己设置为头节点并返回。如果获取锁失败,则根据前驱节点的等待状态决定是否需要挂起当前线程。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. shouldParkAfterFailedAcquire(Node pred, Node node) 该方法用于判断在获取锁失败后,当前线程是否应该被挂起。它会检查前驱节点的等待状态,如果前驱节点的等待状态为SIGNAL,则当前线程可以被挂起。如果前驱节点的等待状态为CANCELLED,则需要向前遍历队列,找到第一个非CANCELLED的节点,并将其设置为当前节点的前驱节点。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  1. parkAndCheckInterrupt() 该方法用于挂起当前线程,并返回线程是否被中断。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
  1. release(int arg) 该方法用于释放锁,以独占模式释放锁。首先尝试通过tryRelease方法释放锁,如果释放成功,则唤醒等待队列中的头节点线程。
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  1. tryRelease(int arg) 这是一个需要子类实现的方法,用于尝试释放锁。例如,在ReentrantLock中,tryRelease方法会减少锁的持有次数,如果持有次数为0,则释放锁。
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
  1. unparkSuccessor(Node node) 该方法用于唤醒后继节点线程。它会找到等待队列中第一个非CANCELLED的后继节点,并使用LockSupport.unpark方法唤醒该节点关联的线程。
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

以ReentrantLock为例分析AQS的应用

ReentrantLock是基于AQS实现的可重入锁。它内部定义了一个继承自AQS的Sync类,并重写了tryAcquiretryRelease方法。

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        abstract void lock();

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        final Thread getOwner() {
            return getState() == 0? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively()? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        private void readObject(java.io.ObjectInputStream s)
                throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair? new FairSync() : new NonfairSync();
    }

    public void lock() {
        sync.lock();
    }

    public void unlock() {
        sync.release(1);
    }

    // 其他方法省略
}

ReentrantLockNonfairSync实现中,lock方法首先尝试通过compareAndSetState方法将状态值从0设置为1,如果设置成功,则表示获取锁成功,并将当前线程设置为独占线程。如果设置失败,则调用acquire(1)方法,进入AQS的获取锁流程。

tryAcquire方法中,首先检查状态值是否为0,如果为0且当前队列中没有前驱节点(对于公平锁),则尝试通过compareAndSetState方法获取锁。如果状态值不为0且当前线程是独占线程,则增加持有次数。

tryRelease方法中,减少状态值,如果状态值减为0,则释放锁,并将独占线程设置为null。

总结AQS的优势和应用场景

AQS提供了一种通用的同步框架,使得开发者可以方便地构建各种同步工具。它的优势在于:

  1. 高效性:通过基于FIFO队列的等待和唤醒机制,减少了线程上下文切换的开销。
  2. 灵活性:可以通过实现tryAcquiretryRelease等方法,灵活地构建不同类型的同步器,如独占锁、共享锁、信号量等。
  3. 可扩展性:AQS的设计具有良好的扩展性,可以在其基础上进一步开发更复杂的同步工具。

AQS的应用场景非常广泛,包括但不限于:

  1. 实现锁:如ReentrantLockReentrantReadWriteLock等。
  2. 实现同步器:如SemaphoreCountDownLatchCyclicBarrier等。
  3. 构建自定义同步工具:开发者可以根据自身需求,基于AQS构建自定义的同步工具。

通过深入理解AQS的原理和源码,开发者能够更好地掌握Java并发编程的核心技术,编写出高效、可靠的并发程序。在实际应用中,根据不同的需求选择合适的同步工具,并合理地使用AQS框架,能够有效地提升程序的性能和并发处理能力。同时,对于一些复杂的并发场景,基于AQS进行自定义同步工具的开发,可以满足特定的业务需求,实现更细粒度的并发控制。