AQS源码解析
文章目录
关于 AQS,网上已经有无数的文章阐述 AQS 的使用及其源码,所以多这么一篇文章也没啥所谓,还能总结一下研究过的源码。源码解析和某某的使用,大概是互联网上 Java 文章中写得最多的主题了。
AQS
AQS 是 AbstractQueuedSynchronizer 的缩写,中文翻译过来就是抽象队列同步器。ReentrantLock
、ReentrantReadWriteLock
、Semaphore
、CountDownLatch
都是基于 AQS。AQS 的核心思想是,当线程请求获取资源时,如果资源空闲,则会将当前线程设置为资源的独占线程,成功获得锁;否则将获取锁失败的线程加入到排队队列中(CLH),并提供线程阻塞和线程唤醒机制。CLH 是一个虚拟的双向队列。
首先看一下 AQS 的关键属性。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private transient volatile Node head; //队列的头节点
private transient volatile Node tail; //队列的尾节点
private volatile int state; //同步状态
state
用于实现锁的可重入性。
- 0 为表示没有线程持有该锁
- 当存在线程获取锁成功,则 state 变为 1。如果是同一线程重复获得,则 state++
- 如果存在线程释放锁,则 state–
上面的三个属性都存在对应的以 CAS 方式进行修改的方法,state
对应的是 compareAndSetState()
方法, head
对应的是 compareAndSetHead()
方法,tail
对应的是 compareAndSetTail()
。以 CAS 的方式修改值能避免锁的竞争。
因为请求获取锁的线程会以 Node 节点的方式在 CLH 队列中排队,在分析 AQS 机制时也会大量涉及到 Node 节点,所以很有必要对 Node 节点进行分析。
CLH 队列中 Node 节点。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
volatile int waitStatus; //当前节点在队列中的状态
volatile Node prev; //前驱节点
volatile Node next; //后继节点
volatile Thread thread; //在该节点中排队的线程
Node nextWaiter; //下一个处于condition或共享状态的节点
//等待锁的两种状态
static final Node SHARED = new Node(); //共享
static final Node EXCLUSIVE = null; //独占
//waitStatus的几个值
static final int CANCELLED = 1; //已取消
static final int SIGNAL = -1; //后继节点的线程需要唤醒
static final int CONDITION = -2; //节点处于等待队列中
static final int PROPAGATE = -3; //线程处在SHARED情况下使用该字段
ReentrantLock
说是研究 AQS 源码,但 AQS 毕竟是一个抽象类,只实现了部分方法,另外一些方法会在子类中实现。所以我们也同样会涉及到 ReentrantLock 源码的研究。
ReentrantLock 的通常使用方式是:
class X {
private final ReentrantLock lock = new ReentrantLock();
public void m() {
lock.lock();
try {
// ... method body
} finally {
lock.unlock()
}
}
}
ReentrantLock 存在两种锁:公平锁(FairSync)和非公平锁(NonfairSync),默认为非公平锁。使用公平锁时,线程会直接进入队列中排队,只有队列中第一个线程才能获取锁;使用非公平锁时,线程会先尝试获取锁,成功则占用锁,失败则在队列排队。对于 AQS 来说,公平锁和非公平锁的绝大部分方法都是共用的。
ReentrantLock 的主要方法有两个,一是使用 lock()
方法加锁,二是使用 unlock()
方法解锁。
加锁
非公平锁
我们首先来分析非公平锁的加锁过程。
// java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() {
if (compareAndSetState(0, 1)) //通过cas方式设置同步状态
setExclusiveOwnerThread(Thread.currentThread()); //成功,设置为独占线程
else
acquire(1); //失败,获取锁
}
在 lock()
方法中,会先尝试通过 CAS 的方式去获取锁,成功则设置为独占线程,否则执行 acquire()
方法。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) && //尝试获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 失败,则添加到等待队列
selfInterrupt();
}
在 acquire()
方法中,会再次尝试去获取锁,如果失败则加入到排队队列。
// java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //如果当前没有线程持有锁
if (compareAndSetState(0, acquires)) { //state++
setExclusiveOwnerThread(current); //将当前线程设置为独占线程
return true; }
}
else if (current == getExclusiveOwnerThread()) { //如果已有线程持有锁,且当前线程为独占线程
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); //state++,实现锁的可重入性
return true; //获得锁
}
return false; //如果已有线程持有锁,且当前线程不为独占线程,则获取锁失败
}
在 nonfairTryAcquire()
方法中,如果资源空闲,则再次尝试通过 CAS 的方式去获取锁。如果当前资源已被当前线程占用,则将 state++
,以实现锁的可重入性。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
// 设置队列尾节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //新建排队节点
Node pred = tail; //pred指向尾节点
if (pred != null) { //如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改)
node.prev = pred; //将新建节点的prev指向pred
if (compareAndSetTail(pred, node)) { // 设置新建节点为尾节点
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { //如果尾节点为空,则说明队列还未初始化,
if (compareAndSetHead(new Node())) //初始化一个头节点
tail = head;
} else { //如果已经初始化,则设置为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果尝试多次获取锁都失败,则在 addWaiter()
方法中会将线程放到节点中,并设置为排队队列的尾节点。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {
//是否成功获取到资源
boolean failed = true;
try {
//循环等待过程中是否中断过
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head && tryAcquire(arg)) { //如果前驱节点为头节点,则尝试获取锁
setHead(node); //获取锁成功,设置当前节点为头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
//来到这里,说明前驱节点不是头节点或者获取锁失败。判断当前节点是否要被阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; //线程中断成功
}
} finally {
if (failed)
cancelAcquire(node);
}
}
加入到排队队列中后,会在 acquireQueued()
方法中循环等待资源的获取,并判断线程是否需要被阻塞,直到线程获取成功或者抛出异常。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
//靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //如果前驱节点处于唤醒状态
return true;
if (ws > 0) { //前驱节点处于取消状态
do { //向前查找取消的节点,并将其从队列中删除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //设置前驱节点为唤醒状态
}
return false;
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
//挂起当前线程,阻塞调用栈,返回当前线程的中断状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
整一个非公平锁的加锁流程可以用如下流程图表示:
flowchart TB
lock-->compareAndSetState{cas获取锁}
compareAndSetState--成功-->设置为独占线程
compareAndSetState--失败-->tryAcquire{尝试获取锁}
tryAcquire--失败 --> addWaiter(将当前节点添加到队列)
addWaiter-->acquireQueued(队列中的线程去获取锁)
subgraph addWaiter [将当前节点添加到队列]
tail{尾节点是否为空}--不为空-->setTail(设置当前节点为尾节点)
setTail-->return(返回当前节点)
tail--为空--> enq
subgraph enq
tail2{尾节点是否为空}
tail2--为空-->head(初始化一个空的头节点)
head--> tail2
tail2--不为空-->setTail2(设置当前节点为尾节点)
setTail2-->返回当前节点
end
end
subgraph acquireQueued [队列中的线程去获取锁]
predHead{如果当前节点的前驱节点为头节点且获取锁成功}--true-->setHead(设置当前节点为头节点)
setHead-->interrupted(返回是否被中断过)
predHead--false-->shouldParkAfterFailedAcquire{当前节点是否需要被阻塞}
shouldParkAfterFailedAcquire--是-->parkAndCheckInterrupt(阻塞当前线程)
shouldParkAfterFailedAcquire --否--> predHead
parkAndCheckInterrupt--> predHead
end
公平锁
公平锁和非公平锁的流程中只有 lock()
方法和 tryAcquire()
方法存在差别。
//java.util.concurrent.locks.ReentrantLock.FairSync#lock
final void lock() {
acquire(1); //直接获取锁
}
公平锁中会直接调用 acquire()
方法。
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
相对于 nonfair
的 nonfairTryAcquire
方法,在没有线程持有锁时,增加了 hasQueuedPredecessors()
方法的判断,该方法用于判断队列中是否存在线程比当前线程等待时间更长。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
分析一下这段代码:
- 如果
h != t
为 true,说明队列正在初始化,或者已经初始化完成。- 在
h != t
前提下,如果(s = h.next) == null
为 true,说明队列正在初始化,因为在队列初始化过程中,是有可能存在 head 已经被初始化(不再等于 tail 了),但 head.next 还没有被设值为 node,这种情况下,因为队列中已经存在 Node,当前线程需要加到等待队列中,故返回 true。初始化过程在AbstractQueuedSynchronizer#enq()
。 - 但如果
(s = h.next) == null
为 false,说明队列中已经存在 Node,则判断该 Node 的线程是否与当前线程相同。如果s.thread != Thread.currentThread()
为 true,说明不相同,需要进入等待队列。如果相同,说明当前线程可以获取锁。
- 在
- 如果
h != t
为 false,说明队列为空,返回 false,说明可以去获得锁。
另外,s = h.next
这段代码获取的是 head 的下一个节点,因为 head 是虚节点,不存储数据,真正的数据存储在 head.next
。
解锁
相对于加锁过程,解锁过程比较简单,且公平锁和非公平锁共用同一个 lock()
方法。
// java.util.concurrent.locks.ReentrantLock#unlock
public void unlock() {
sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
if (tryRelease(arg)) { //如果锁没有被任何线程持有
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //解除后继节点的线程挂起
return true;
}
return false;
}
关于代码 h != null && h.waitStatus != 0
,分为以下几种情况:
h==null
,说明头节点还未初始化。h!=null && h.waitStatus==0
,说明后继节点还在运行中。因为如果节点已被取消,waitStatus 会被设置为 1;如果后继节点需要唤醒,waitStatus 会被设置为 -1;如果节点正在排队,waitStatus 则会设置为 -2。如果 waitStatus 为 0,说明已经处于运行中。h != null && h.waitStatus != 0
则会去唤醒后继节点。
在 release()
方法中,会先尝试去解锁,如果解锁成功且后继节点需要唤醒,则将后继节点取消挂起。
//java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
//更新state状态,如果重入次数为0,则将锁的独占线程设置为null
protected final boolean tryRelease(int releases) {
//减少可重入次数
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) //如果当前线程不是该锁的独占线程,则抛出异常
throw new IllegalMonitorStateException();
boolean free = false; //该锁是否已被释放
if (c == 0) { //如果可重入次数已为0
free = true;
setExclusiveOwnerThread(null); //将锁的独占线程设置为null,表明不再有线程持有该锁
}
setState(c); //修改锁的状态
return free;
}
在 tryRelease()
方法中,会去减少锁的可重入次数,当可重入次数为 0 时,清空锁的独占线程。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; //获取头节点的waitStatus
if (ws < 0) //如果节点的waitStatus为负数,说明后继节点需要被唤醒或者正在排队
compareAndSetWaitStatus(node, ws, 0); //清除节点当前的waitStatus,设置为0
Node s = node.next; //获取头节点的后继节点
if (s == null || s.waitStatus > 0) { //如果节点为null,或者已被取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) //从尾节点开始向前遍历
if (t.waitStatus <= 0) //找到队列中第一个不被取消的节点
s = t;
}
if (s != null) //如果找到了,则将该节点取消挂起
LockSupport.unpark(s.thread);
}
在 unparkSuccessor()
方法中,会从尾节点开始从后往前遍历,找到队列中第一个没有被取消的节点,将该节点取消挂起。
整个解锁流程可以用如下流程图表示。
flowchart TB
tryRelease-->if(如果释放锁成功且后继节点需要解除挂起)
subgraph tryRelease [尝试释放锁]
direction TB
release(减少可重入次数)-->thread{该线程是否持有该锁}
thread--否-->抛出异常
thread--是-->c{可重入次数是否为0}
c--是-->set(设置该锁的独占线程为null并释放掉锁)
c--否-->setState(修改该锁的状态)
set-->setState
end
if-->unparkSuccessor(解除后继节点的挂起)
subgraph unparkSuccessor [解除后继节点的挂起]
direction TB
ws{如果头节点需要被唤醒} --是 --> compareAndSetWaitStatus(清除该节点的waitStatus状态)
ws--否-->waitStatus{如果后继节点为null或者后继节点已被取消}
compareAndSetWaitStatus -->waitStatus
waitStatus--是-->for(从尾节点开始往前遍历找到第一个不被取消的节点)
waitStatus--否-->unpark(如果找到了不被取消的节点则取消该节点的挂起)
for-->unpark
end
文章作者 梧桐碎梦
上次更新 2023-07-30 22:27:20