关于 AQS,网上已经有无数的文章阐述 AQS 的使用及其源码,所以多这么一篇文章也没啥所谓,还能总结一下研究过的源码。源码解析和某某的使用,大概是互联网上 Java 文章中写得最多的主题了。

AQS

AQS 是 AbstractQueuedSynchronizer 的缩写,中文翻译过来就是抽象队列同步器。ReentrantLockReentrantReadWriteLockSemaphoreCountDownLatch 都是基于 AQS。AQS 的核心思想是,当线程请求获取资源时,如果资源空闲,则会将当前线程设置为资源的独占线程,成功获得锁;否则将获取锁失败的线程加入到排队队列中(CLH),并提供线程阻塞和线程唤醒机制。CLH 是一个虚拟的双向队列。

首先看一下 AQS 的关键属性。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private transient volatile Node head; //队列的头节点
private transient volatile Node tail; //队列的尾节点
private volatile int state; //同步状态

state 用于实现锁的可重入性。

  • 0 为表示没有线程持有该锁
  • 当存在线程获取锁成功,则 state 变为 1。如果是同一线程重复获得,则 state++
  • 如果存在线程释放锁,则 state–

上面的三个属性都存在对应的以 CAS 方式进行修改的方法,state 对应的是 compareAndSetState() 方法, head 对应的是 compareAndSetHead() 方法,tail 对应的是 compareAndSetTail()。以 CAS 的方式修改值能避免锁的竞争。

因为请求获取锁的线程会以 Node 节点的方式在 CLH 队列中排队,在分析 AQS 机制时也会大量涉及到 Node 节点,所以很有必要对 Node 节点进行分析。

CLH 队列中 Node 节点。

// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
volatile int waitStatus;  //当前节点在队列中的状态
volatile Node prev;  //前驱节点
volatile Node next;  //后继节点
volatile Thread thread;  //在该节点中排队的线程
Node nextWaiter;  //下一个处于condition或共享状态的节点

//等待锁的两种状态
static final Node SHARED = new Node();  //共享
static final Node EXCLUSIVE = null; //独占


//waitStatus的几个值 
static final int CANCELLED =  1;  //已取消
static final int SIGNAL    = -1;  //后继节点的线程需要唤醒
static final int CONDITION = -2;  //节点处于等待队列中
static final int PROPAGATE = -3; //线程处在SHARED情况下使用该字段

ReentrantLock

说是研究 AQS 源码,但 AQS 毕竟是一个抽象类,只实现了部分方法,另外一些方法会在子类中实现。所以我们也同样会涉及到 ReentrantLock 源码的研究。

ReentrantLock 的通常使用方式是:

class X {    
	private final ReentrantLock lock = new ReentrantLock();       
	public void m() {      
		lock.lock();       
		try {        
			// ... method body      
		} finally {        
			lock.unlock()      
		}    
	}  
}

ReentrantLock 存在两种锁:公平锁(FairSync)和非公平锁(NonfairSync),默认为非公平锁。使用公平锁时,线程会直接进入队列中排队,只有队列中第一个线程才能获取锁;使用非公平锁时,线程会先尝试获取锁,成功则占用锁,失败则在队列排队。对于 AQS 来说,公平锁和非公平锁的绝大部分方法都是共用的。

ReentrantLock 的主要方法有两个,一是使用 lock() 方法加锁,二是使用 unlock() 方法解锁。

加锁

非公平锁

我们首先来分析非公平锁的加锁过程。

// java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() {  
    if (compareAndSetState(0, 1))  //通过cas方式设置同步状态
        setExclusiveOwnerThread(Thread.currentThread());  //成功,设置为独占线程
    else        
	    acquire(1);  //失败,获取锁
}

lock() 方法中,会先尝试通过 CAS 的方式去获取锁,成功则设置为独占线程,否则执行 acquire() 方法。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {  
    if (!tryAcquire(arg) &&  //尝试获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  // 失败,则添加到等待队列
        selfInterrupt();  
}

acquire() 方法中,会再次尝试去获取锁,如果失败则加入到排队队列。

// java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {  
    return nonfairTryAcquire(acquires);  
}

// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {  
    final Thread current = Thread.currentThread();  
    int c = getState();  
    if (c == 0) {  //如果当前没有线程持有锁
        if (compareAndSetState(0, acquires)) {  //state++
            setExclusiveOwnerThread(current);  //将当前线程设置为独占线程
            return true;        }  
    }  
    else if (current == getExclusiveOwnerThread()) {  //如果已有线程持有锁,且当前线程为独占线程
        int nextc = c + acquires;  
        if (nextc < 0) // overflow  
            throw new Error("Maximum lock count exceeded");  
        setState(nextc);   //state++,实现锁的可重入性
        return true;   //获得锁
     }  
    return false;  //如果已有线程持有锁,且当前线程不为独占线程,则获取锁失败
}

nonfairTryAcquire() 方法中,如果资源空闲,则再次尝试通过 CAS 的方式去获取锁。如果当前资源已被当前线程占用,则将 state++,以实现锁的可重入性。

// java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
// 设置队列尾节点
private Node addWaiter(Node mode) {  
    Node node = new Node(Thread.currentThread(), mode);   //新建排队节点 
    Node pred = tail;  //pred指向尾节点
    if (pred != null) {  //如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改)
        node.prev = pred;  //将新建节点的prev指向pred
        if (compareAndSetTail(pred, node)) {  // 设置新建节点为尾节点
            pred.next = node;  
            return node;  
        }  
    }  
    enq(node);  
    return node;  
}

//java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {  
    for (;;) {  
        Node t = tail;  
        if (t == null) {   //如果尾节点为空,则说明队列还未初始化,
            if (compareAndSetHead(new Node()))  //初始化一个头节点
                tail = head;  
        } else {  //如果已经初始化,则设置为尾节点
            node.prev = t;  
            if (compareAndSetTail(t, node)) {  
                t.next = node;  
                return t;  
            }  
        }  
    }  
}

如果尝试多次获取锁都失败,则在 addWaiter() 方法中会将线程放到节点中,并设置为排队队列的尾节点。

//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {  
	//是否成功获取到资源
    boolean failed = true;  
    try {  
	    //循环等待过程中是否中断过
        boolean interrupted = false;  
        for (;;) {  
            final Node p = node.predecessor();  //获取当前节点的前驱节点
            if (p == head && tryAcquire(arg)) {  //如果前驱节点为头节点,则尝试获取锁
                setHead(node);  //获取锁成功,设置当前节点为头节点
                p.next = null; // help GC  
                failed = false;  
                return interrupted;  
            }  
            //来到这里,说明前驱节点不是头节点或者获取锁失败。判断当前节点是否要被阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&  
                parkAndCheckInterrupt())  
                interrupted = true;  //线程中断成功
        }  
    } finally {  
        if (failed)  
            cancelAcquire(node);  
    }  
}

加入到排队队列中后,会在 acquireQueued() 方法中循环等待资源的获取,并判断线程是否需要被阻塞,直到线程获取成功或者抛出异常。

//java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
//靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
	//获取前驱节点的状态
    int ws = pred.waitStatus;  
    if (ws == Node.SIGNAL)  //如果前驱节点处于唤醒状态  
         return true;  
    if (ws > 0) {  //前驱节点处于取消状态      
         do {  //向前查找取消的节点,并将其从队列中删除
            node.prev = pred = pred.prev;  
        } while (pred.waitStatus > 0);  
        pred.next = node;  
    } else {  
         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //设置前驱节点为唤醒状态
    }  
    return false;  
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
//挂起当前线程,阻塞调用栈,返回当前线程的中断状态
private final boolean parkAndCheckInterrupt() {  
    LockSupport.park(this);  
    return Thread.interrupted();  
}

整一个非公平锁的加锁流程可以用如下流程图表示:

flowchart TB

lock-->compareAndSetState{cas获取锁}
compareAndSetState--成功-->设置为独占线程
compareAndSetState--失败-->tryAcquire{尝试获取锁}
tryAcquire--失败 --> addWaiter(将当前节点添加到队列)
addWaiter-->acquireQueued(队列中的线程去获取锁)

	subgraph addWaiter [将当前节点添加到队列]
	tail{尾节点是否为空}--不为空-->setTail(设置当前节点为尾节点)
	setTail-->return(返回当前节点)
	tail--为空--> enq
		subgraph enq
		tail2{尾节点是否为空}
		tail2--为空-->head(初始化一个空的头节点)
		head--> tail2
		tail2--不为空-->setTail2(设置当前节点为尾节点)
		setTail2-->返回当前节点
		end
	end

	subgraph acquireQueued [队列中的线程去获取锁]
	predHead{如果当前节点的前驱节点为头节点且获取锁成功}--true-->setHead(设置当前节点为头节点)
	setHead-->interrupted(返回是否被中断过)
	predHead--false-->shouldParkAfterFailedAcquire{当前节点是否需要被阻塞}
	shouldParkAfterFailedAcquire--是-->parkAndCheckInterrupt(阻塞当前线程)
	shouldParkAfterFailedAcquire --否--> predHead
	parkAndCheckInterrupt--> predHead
	end

公平锁

公平锁和非公平锁的流程中只有 lock() 方法和 tryAcquire() 方法存在差别。

//java.util.concurrent.locks.ReentrantLock.FairSync#lock
final void lock() {  
    acquire(1);  //直接获取锁
}

公平锁中会直接调用 acquire() 方法。

// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {  
	final Thread current = Thread.currentThread();  
	int c = getState();  
	if (c == 0) {  
		if (!hasQueuedPredecessors() &&  
			compareAndSetState(0, acquires)) {  
			setExclusiveOwnerThread(current);  
			return true;            
		}  
	}  
	else if (current == getExclusiveOwnerThread()) {  
		int nextc = c + acquires;  
		if (nextc < 0)  
			throw new Error("Maximum lock count exceeded");  
		setState(nextc);  
		return true;        
	}  
	return false;  
}  

相对于 nonfairnonfairTryAcquire 方法,在没有线程持有锁时,增加了 hasQueuedPredecessors() 方法的判断,该方法用于判断队列中是否存在线程比当前线程等待时间更长。

// java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {     
    Node t = tail; 
    Node h = head;  
    Node s;  
    return h != t &&  
        ((s = h.next) == null || s.thread != Thread.currentThread());  
}

分析一下这段代码:

  • 如果 h != t 为 true,说明队列正在初始化,或者已经初始化完成。
    • h != t 前提下,如果 (s = h.next) == null 为 true,说明队列正在初始化,因为在队列初始化过程中,是有可能存在 head 已经被初始化(不再等于 tail 了),但 head.next 还没有被设值为 node,这种情况下,因为队列中已经存在 Node,当前线程需要加到等待队列中,故返回 true。初始化过程在 AbstractQueuedSynchronizer#enq()
    • 但如果 (s = h.next) == null 为 false,说明队列中已经存在 Node,则判断该 Node 的线程是否与当前线程相同。如果 s.thread != Thread.currentThread() 为 true,说明不相同,需要进入等待队列。如果相同,说明当前线程可以获取锁。
  • 如果 h != t 为 false,说明队列为空,返回 false,说明可以去获得锁。

另外,s = h.next 这段代码获取的是 head 的下一个节点,因为 head 是虚节点,不存储数据,真正的数据存储在 head.next

解锁

相对于加锁过程,解锁过程比较简单,且公平锁和非公平锁共用同一个 lock() 方法。

// java.util.concurrent.locks.ReentrantLock#unlock
public void unlock() {  
    sync.release(1);  
}

//java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {  
    if (tryRelease(arg)) {   //如果锁没有被任何线程持有
        Node h = head;  
        if (h != null && h.waitStatus != 0)  
            unparkSuccessor(h);  //解除后继节点的线程挂起
        return true;   
    }  
    return false;  
}

关于代码 h != null && h.waitStatus != 0,分为以下几种情况:

  1. h==null,说明头节点还未初始化。
  2. h!=null && h.waitStatus==0,说明后继节点还在运行中。因为如果节点已被取消,waitStatus 会被设置为 1;如果后继节点需要唤醒,waitStatus 会被设置为 -1;如果节点正在排队,waitStatus 则会设置为 -2。如果 waitStatus 为 0,说明已经处于运行中。
  3. h != null && h.waitStatus != 0 则会去唤醒后继节点。

release() 方法中,会先尝试去解锁,如果解锁成功且后继节点需要唤醒,则将后继节点取消挂起。

//java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
//更新state状态,如果重入次数为0,则将锁的独占线程设置为null
protected final boolean tryRelease(int releases) {  
	//减少可重入次数
    int c = getState() - releases;  
    if (Thread.currentThread() != getExclusiveOwnerThread())  //如果当前线程不是该锁的独占线程,则抛出异常
        throw new IllegalMonitorStateException();  
    boolean free = false;  //该锁是否已被释放
    if (c == 0) {  //如果可重入次数已为0
        free = true;  
        setExclusiveOwnerThread(null);  //将锁的独占线程设置为null,表明不再有线程持有该锁
    }  
    setState(c);  //修改锁的状态
    return free;  
}

tryRelease() 方法中,会去减少锁的可重入次数,当可重入次数为 0 时,清空锁的独占线程。

// java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {     
    int ws = node.waitStatus;  //获取头节点的waitStatus
    if (ws < 0)  //如果节点的waitStatus为负数,说明后继节点需要被唤醒或者正在排队
        compareAndSetWaitStatus(node, ws, 0);  //清除节点当前的waitStatus,设置为0  
    Node s = node.next;  //获取头节点的后继节点
    if (s == null || s.waitStatus > 0) {  //如果节点为null,或者已被取消
        s = null;  
        for (Node t = tail; t != null && t != node; t = t.prev)  //从尾节点开始向前遍历
            if (t.waitStatus <= 0)  //找到队列中第一个不被取消的节点
                s = t;  
    }  
    if (s != null)  //如果找到了,则将该节点取消挂起
        LockSupport.unpark(s.thread);  
}

unparkSuccessor() 方法中,会从尾节点开始从后往前遍历,找到队列中第一个没有被取消的节点,将该节点取消挂起。

整个解锁流程可以用如下流程图表示。

flowchart TB
tryRelease-->if(如果释放锁成功且后继节点需要解除挂起)
	subgraph tryRelease [尝试释放锁]
	direction TB
	release(减少可重入次数)-->thread{该线程是否持有该锁}
	thread--否-->抛出异常
	thread--是-->c{可重入次数是否为0}
	c--是-->set(设置该锁的独占线程为null并释放掉锁)
	c--否-->setState(修改该锁的状态)
	set-->setState
	end

if-->unparkSuccessor(解除后继节点的挂起)

subgraph unparkSuccessor [解除后继节点的挂起]
	direction TB
	ws{如果头节点需要被唤醒} --是 --> compareAndSetWaitStatus(清除该节点的waitStatus状态)
	ws--否-->waitStatus{如果后继节点为null或者后继节点已被取消}
	compareAndSetWaitStatus -->waitStatus
	waitStatus--是-->for(从尾节点开始往前遍历找到第一个不被取消的节点)
	waitStatus--否-->unpark(如果找到了不被取消的节点则取消该节点的挂起)
	for-->unpark
end