一、前言

AQS 是抽象的队列同步器,是用来构建锁或其他同步组件的重量级基础框架及整个 JUC 体系的基石。

二、相关组件

下边的组件都是基于 AQS 框架扩展实现的:

  • ReentrantLock:可重入锁,避免多线程竞争资源的安全问题
  • Semaphore:信号量,限制多线程的访问数量
  • CountDownLatch:计数器,用于线程之间的等待场景(如线程A等待其他多个线程完成任务后,线程A才能执行自己的任务)
  • CyclicBarrier:回环栅栏,用于线程之间的等待场景(如在一组线程中,如果线程A执行到代码段S点就会停下等待,等到组内其他线程都执行到S点时它们才会立刻一起执行剩余的任务)

虽然这些组件在多线程场景下有不同的作用,但代码中也有相似之处,如都需要管理锁状态,维护阻塞线程,维护唤醒线程。而 AQS 的作用就是将这些相似的、公共的代码封装在一起。

三、运行原理

AQS 使用一个 volatile 的 int 类型的 state 变量来表示锁竞争状态,将每条要去抢占资源的线程封装成一个 Node 节点放入到内置的 CLH 同步队列(FIFO 双向队列)来维护排队工作,通过 CASstate 值进行修改。

我们常说的 AQS 指的是 java.util.concurrent.locks.AbstractQueuedSynchronizer 类,其源码的核心如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

static final class Node {
// 等待状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 工作线程
volatile Thread thread;

...
}

// 头结点
private transient volatile Node head;

// 尾节点
private transient volatile Node tail;

// 同步状态,默认值 0,说明 0:资源未抢占 1:资源已抢占
private volatile int state;

...
}

注意:源码中有个状态:一个是 state,针对资源抢占的状态;另一个是 waitStatus,针对 node 节点的状态。

将上文的源码转成图形,可便于我们理解,加深记忆,其运行模型图如下:

四、源码分析

文中涉及到 CAS 和 LockSupport 相关内容,不清楚的读者可以先跳至末尾,浏览相关的参考资料。

由于 AQS 并非单独使用,为了完整的讲解 AQS 的源码,本篇章以 ReentrantLock 组件为例,一步一步揭开 AQS 如何作为基石使用的。

  • 演示案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class LockTest {

public static void main(String[] args) {

Lock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
lock.lock();
// 业务代码
System.out.println(Thread.currentThread().getName() + " 开始工作");
lock.unlock();
}, "t1");

Thread t2 = new Thread(() -> {
lock.lock();
// 业务代码
System.out.println(Thread.currentThread().getName() + " 开始工作");
lock.unlock();
}, "t2");

Thread t3 = new Thread(() -> {
lock.lock();
// 业务代码
System.out.println(Thread.currentThread().getName() + " 开始工作");
lock.unlock();
}, "t3");

t1.start();
t2.start();
t3.start();

}
}

创建一把非公平锁,3 个线程通过抢占锁来执行任务。

此时,AQS 的模型如下:

其中,state 表示锁的抢占状态,ownerThread 表示抢占锁的线程。

进入 lock() 方法,来到 ReentrantLock 源码中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ReentrantLock implements Lock, java.io.Serializable {

...

public void lock() {
sync.lock();
}

abstract static class Sync extends AbstractQueuedSynchronizer {
// 锁定
abstract void lock();

// 尝试获取非公平锁
final boolean nonfairTryAcquire(int acquires) {
...
}

// 尝试释放锁
protected final boolean tryRelease(int releases) {
...
}

...
}

...
}

可以看出 lock() 方法是通过 Sync 类来实现的。而 Sync 是一个抽象的静态内部类,它继承了 AbstractQueuedSynchronizer 类,因此它具备了 AQS 的“能力”。

Q1: Sync 定义了抽象的 lock() 方法,需要通过其子类来实现具体的锁方式(模板方法模式)。为何要这要设计的? 当然是为了方便扩展。

我们都知道通过 ReentrantLock 类能创建公平锁和非公平锁,其原因是 Sync 有两个实现类: FairSyncNonfairSync,它们都实现了 lock() 的具体细节。案例中,我们创建出的 lock 实例底层使用到的就是 NonfairSync 的实例(多态特性),即非公平锁。假设哪天 ReentrantLock 需要新增第三种锁,只需新增个子类继承 Sync ,实现 lock() 方法即可。

演示案例中我们创建的是非公平锁,我们来看看 Sync 对应非公平锁的子类 NonfairSync ,它同样是定义在 ReentrantLock 的静态内部类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ReentrantLock implements Lock, java.io.Serializable {

...

// 非公平锁实现
static final class NonfairSync extends Sync {
final void lock() {
// (1)
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// (2)
acquire(1);
}

// (3)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

...
}

我们开始沿着演示案例讲解:

假设 t1 线程率先启动获取到 CPU 资源调用了 lock() 方法,执行到 (1) 处,即 compareAndSetState(0, 1),该方法来自 AQS。通过 CAS 方式将 AQS 中的 state 值变成 1,执行成功返回 true。

由于 t1 是第一条执行的线程,结果肯定返回 true,然后将 ownerThread 设置为当前线程 t1。最终整个 lock() 方法也成功返回,获取锁成功,执行自己的业务代码。

此时,AQS 的模型如下:

这时 t2 和 t3 线程也启动,“轮流” 执行到了 (1) 处,判断肯定失败,然后执行 (2),即 acquire(1) 方法,该方法来自 AQS

我们进到 acquire(1) 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

...

public final void acquire(int arg) {
// (4)
if (!tryAcquire(arg) &&
// (5):addWaiter
// (6): acquireQueued
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// (7)
selfInterrupt();
}

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

...
}

假设当前线程为 t2,执行到 (4) 处,即 tryAcquire(arg) 方法,用于尝试获取锁。它是个抽象方法,由子类 NonfairSync 实现,NonfairSynctryAcquire 方法最终调用其父类 SyncnonfairTryAcquire() 方法来实现,可以返回至 (3) 处查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
abstract static class Sync extends AbstractQueuedSynchronizer {

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// (8)
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

t2 线程进入到 nonfairTryAcquire() 方法,先获取当前线程(t2 线程),然后执行到 (8) 处,即 getState(),此方法来自 AQS,用于返回 AQSstate 状态。

由于 t1 线程没有释放锁,因此 state = 1OwnerThread = t1 ,下边的 if 判断都不成立,最终方法返回 false。

回到 acquire() 方法中,由于返回 false,!tryAcquire(arg) 判断成立,t2 线程会继续执行到 (5) 处,即 addWaiter(Node.EXCLUSIVE),该方法用于将线程封装到 Node 节点中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// (9)
if (pred != null) {
node.prev = pred;
// (10)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// (11)
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// (12)
if (compareAndSetHead(new Node()))
tail = head;
} else {
// (13)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

...
}

t2 线程进入到 addWaiter 方法中,先被封装到 node1 节点中,由于是首个线程被封装成 Node,因此 tail 和 pred 必定为 null,(9) 处的判断不成立,执行到 (11) 处,即 enq() 方法。该方法中开启无限循环,通过 CAS 方式设置 CLH 队列的头结点/尾节点。

第一次循环,由于 tail 为空,因此线程执行到 (12) 处,创建一个傀儡节点(无数据,用于占位)设置为头结点和尾节点。

此时,AQS 的模型图如下:

由于没有遇到循环终止的指令,将执行下一次循环。

在第二次循环中,尾节点不为空,因此进入(13) 处,将 node1 节点设置成尾节点,同时前后两个 node 节点建立关系,最终返回 node1 节点。

此时,AQS 的模型图如下:

返回的 node1 节点后,t2 线程开始执行 (6) ,node1 节点被当作参数传入到 acquireQueued() 方法中,该方法用于将节点放入到 CLH 队列中,将其线程挂起等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// (14)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// (15): shouldParkAfterFailedAcquire
// (16): parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

...
}

acquireQueued() 方法中也开启无限循环。在循环中,node1 节点先获取它的前驱节点(傀儡节点),然后判断是否为头结点,是则调用 tryAcquire()尝试获取锁,该方法在 (4) 处出现过一次,此处不再赘述。

由于 t1 线程还没有释放锁,(14) 处的最终判断肯定为 false。t2 执行到 (15) 处,即 shouldParkAfterFailedAcquire(),该方法用于修改 node1 节点的前驱节点的 waitStatus 状态。

Node 节点的 waitStatus 有以下 5 种状态:

状态说明
0初始状态
1线程已取消
-1当前节点封装的线程释放锁后,会唤醒后继节点的线程
-2线程处在等待状态,在等待队列中
-3表示下一次的共享状态会被无条件的传播下去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的 waitStatus 值
int ws = pred.waitStatus;
// (17) 状态为 -1
if (ws == Node.SIGNAL)

return true;
if (ws > 0) {
// (18) 状态值 > 0,即线程被取消,从后往前遍历节点,删除已取消状态的线程的节点

do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// (19) 将前驱节点的 waitStatus 值改成 -1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

...
}

t2 线程进入 shouldParkAfterFailedAcquire() 方法中, 获取 node1 前驱节点的 waitStatus 状态,值为 0,直接跳到 (19) 处,通过 CAS 方式将 waitStatus 值改成 -1,最终的方法是返回 false 的。

此时,AQS 的模型图如下:

Q2: 为何在创建 node 节点封装线程时,不直接将 waitStatus 的值设置成 -1,而是专门定义这个方法进行修改? 答案我们留在下文解答。

shouldParkAfterFailedAcquire() 方法返回 false,t2 线程回到 acquireQueued() 方法中,由于之前是在无限循环中进行的,没有遇到终止指定,因此 t2 线程将执行第二次循环的操作。

毫无疑问,t2 线程又会再次执行 shouldParkAfterFailedAcquire() 方法,此时 node1 的前驱节点的 waitStatus = -1,最终方法返回 true。随后 t2 开始执行 (16),即 parkAndCheckInterrupt() 方法,用于线程等待。

1
2
3
4
5
6
7
8
9
10
11
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// (20)
return Thread.interrupted();
}

...
}

方法里边调用了 LockSupport.park(this),t2 线程立即被挂起,变成 WAIT 状态(此处的状态是线程的状态,与 AQS 中的 state 状态,Node 节点的 waitStatus 无关)。

假设 t1 线程仍未释放锁,轮到 t3 线程运行,不出意外其运行的最终结果也是被封装到 node2 节点,放到 CLH 队列中被挂起等待。

此时, AQS 的模型图如下:

现在只有 t1 线程处在运行状态,当它运行完业务代码,随后执行 unlock() 方法:

1
2
3
4
5
6
7
8
9
10
11
public class ReentrantLock implements Lock, java.io.Serializable {

...

public void unlock() {
sync.release(1);
}

...

}

方法通过调用 Syncrelease() 方法来实现,而 release() 方法来自 AQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

...

public final boolean release(int arg) {
// (21)
if (tryRelease(arg)) {
Node h = head;
// (22)
if (h != null && h.waitStatus != 0)
// (23)
unparkSuccessor(h);
return true;
}
return false;
}

...
}

当 t1 线程执行到 (21) 处,即 tryRelease(arg) 方法,该方法都抽象方法,由 Sync 子类实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ReentrantLock implements Lock, java.io.Serializable {

...

abstract static class Sync extends AbstractQueuedSynchronizer {

...

// 尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

}

...
}

进入该方法:

  1. 先获取 AQS 状态 state = 1,减去入参(值为 1),结果 c = 0。
  2. 判断当前线程(t1) 是否不等于 OwnerThread 的线程(t1),显然是相等的。
  3. 判断 c 是否为 0,判断成立设置返回值 free 为 true, 将 OwnerThread 线程设置为 null。
  4. 修改 AQS 状态为 c 的值,即 AQSstate = 0
  5. 最终返回 free。

经过上述的操作,此时 AQS 模型图为:

方法执行完回到 (21),条件判断成立,进入 if 方法体中:

  1. 获取头结点(值为 dummy 节点)
  2. 头节点进行非空判断 和 waitStatus 的非零判断(值为 -1),判断成立进入到 (23) 处,即 unparkSuccessor() 方法,用于唤醒下个节点的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

...

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 还原 waitStatus 状态为 0
compareAndSetWaitStatus(node, ws, 0);
// (24)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找 waitStatus <= 0 的节点(剔除取消状态的线程节点)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}

// (25)
if (s != null)
// 唤醒后继节点封装的线程
LockSupport.unpark(s.thread);
}

...

}

头结点被当作参数传入到 unparkSuccessor() 方法中,该方法执行 3 个操作:

  1. 将头结点的 waitStatus 状态恢复成 0。
  2. 获取头结点的后继节点,如果后继节点为空或 waitStatus 状态为已取消,则从后往前遍历 CLH 队列,获取非取消状态的节点。
  3. 唤醒后继节点中封装的线程。

回到案例中,线程执行到 (24) 处,获取的后继节点为 node1(封装 t2 线程)。来到 (25) 处,唤醒 node1 中的 t2 线程。

讲解到此处,大家是否还记得 Q2:为何在创建 Node 节点封装线程时,不直接将 waitStatus 的值设置成 -1。

我们以创建的 Node 节点时, waitStatus 值设置为 -1 为前提,进行案例推演:

  1. 依然是 3 个线程在运行,t1 线程先拿到锁执行业务代码。
  2. CPU 切换到 t2 线程,执行到 (5) 处,即 addWaiter() ,被封装到 node1 节点中(waitStatus = -1),此时 node1 节点已经和头结点建立关系。
  3. 在 t2 线程执行 (16) 处,即在执行 parkAndCheckInterrupt() 方法,t2 线程要被挂起之前,CPU 又切换至 t1 线程。
  4. t1 线程执行完业务代码,要释放锁时,会执行 (25) 处代码 LockSupport.unpark(s.thread); ,唤醒 t2 。
  5. 但实际上 t2 线程并没有被挂起等待,如果某个线程被提前 unpark(thread),那么当该 thread 线程调用 park() 时是不会被挂起等待的,这样锁的机制就乱套了(线程未获取到锁,但又不挂起等待)。

因此,Node 节点的 waitStatus 值不能一开始被设置的成 -1。

回到正常案例中,t1 线程唤醒 t2 线程结束任务。t2 线程被唤醒并拿到 CPU 资源,执行到 (20) 处,即 Thread.interrupted() 方法,检测当前线程是否被中断,t2 线程并未中断,因此 parkAndCheckInterrupt() 方法返回 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...

final boolean (final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// (14)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// (15): shouldParkAfterFailedAcquire
// (16): parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

...
}

t2 线程将在 acquireQueued() 方法中进行第三次循环操作:

  1. 获取前驱节点 oldHead (dummy)
  2. 前驱节点是否为头结点,是则尝试获取锁。因为 AQSstate 已恢复成 0,因此 t2 可以成功获取到锁(修改 state = 1ownerThread = t2)。
  3. 将 node1 节点设置为头结点(将前驱节点设置为 null,封装的线程设置为 null)
  4. 将 oldHead 节点的后继节点设置为 null。

此时,AQS 的模型图如下:

t2 线程获取到锁执行任务,之后释放锁。。。

t3 线程之后的执行流程与 t2 类似,此处也不在赘述。

五、执行流程

最后附加代码执行流程图:

流程图并未充分展示执行的细节,最终还得需要读者自行阅读源码加深理解。

六、参考资料

CAS 原理新讲

LockSupport 工具介绍