Java AQS 实现——Condition

引言

本文着重介绍 AQS 的 Condition 实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>

Condition

介绍完 AQS 的共享模式和互斥模式后,我们来看一看 AQS 是如何实现条件等待的,即 Condition。在 AQS 中通过 ConditionObject 实现 Condition。从 ConditionObject 的核心数据中,我们会发现它内部也会维护一个 Node 的队列。

1
2
3
4
5
6
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}

那么这个队列和同步队列有什么联系吗?带着这样的疑问我们走读一下在 Condition 上 await 入队的流程。这里我们以 ReentrantLock 的 Condition 实现作为例子。 ReentrantLock 的 newCondition 函数能够创建一个 Condition 对象。内部实际上就是创建了一个 AQS 的 ConditionObject 类。

1
2
3
4
5
6
7
8
// ReentrantLock 的 newCondition 函数能够创建一个 Condition 对象
public Condition newCondition() {
return sync.newCondition();
}
// 内部实际上就是创建了一个 AQS 的 ConditionObject 类
final ConditionObject newCondition() {
return new ConditionObject();
}

await

我们知道,在使用 Condition 对象时,要先持有对应的锁,然后再执行 Condition 的 await 方法。因为 Condition 内部不提供同步保证,我们需要通过 lock 来保护 Condition 的正确性。

  1. 在 Condition 的 await 函数中,先会将当前线程添加到 Condition 的条件队列中,因为此时持有锁,所以 addConditionWaiter 就是拿到尾指针然后新建一个节点并加入(使用Node的nextWaiter字段保存下一个Node),注意新建节点的状态是 CONDITION
  2. 添加到条件队列后,它才将锁释放,并保存了之前的锁状态,因为在唤醒时,必须要恢复锁状态
  3. 紧接着是一个循环,只要当前节点不处于同步队列中,就通过 park 等待,进入到同步队列中,说明当前节点已经从条件队列移除,并开始了尝试获取锁的过程
  4. 如果从等待状态中恢复,首先要检查一下是不是被中断了,如果是要根据被打断的时间点做出不同的处理
    1. 如果是被 Signal 唤醒前被打断了(当前节点状态是 CONDITION),就将当前节点状态改为 0(表示取消等待),并添加到同步队列中,然后返回 THROW_IE,这意味着最终它会抛出一个异常
    2. 如果是被 Signal 唤醒之后被打断了(当前节点状态不是 CONDITION,说明已经进入了或即将进入同步队列),这里就通过自旋等待进入同步队列即可,CONDITION 代表当前处于条件队列,!CONDITION 代表当前处于同步队列,最后返回了 REINTERRUPT,意味着只会设置线程的中断标志位,不会抛出异常
  5. 当前节点处于等待队列中时(有可能被Signal唤醒也有可能被打断),开始调用 acquireQueued 等待获取锁
  6. 成功获取锁之后,进行简单地清理工作,然后如果当前线程被打断,要根据情况做出不动处理
    1. Signal 唤醒前被打断了:抛出 InterruptedException
    2. Signal 唤醒之后被打断: 记录中断标志位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 先会将当前线程添加到 Condition 的**条件队列**中
Node node = addConditionWaiter();
// 添加到**条件队列**后,它才将锁释放,并保存了之前的锁状态,因为在唤醒时,必须要恢复锁状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 只要不处于同步队列中,就通过 park 等待
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 如果被打断了,就将当前线程加入到同步队列中,因为从 await 返回时,无论如何我们也要获取到锁,被打断和被 Signal 都一样
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 走到这说明当前节点处于等待队列中时(无论是被Signal唤醒还是被打断),开始调用 acquireQueued 等待获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out. 清除无用的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 因为此时持有锁,所以 addConditionWaiter 就是拿到尾指针然后新建一个节点并加入(使用Node的nextWaiter字段保存下一个Node)
// 注意新建节点的状态是 CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 状态不等于 CONDITION 代表了已经被取消或者已经被Signal唤醒,将其从条件队列中清楚
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? // 检查是否被打断
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*
* @param node the node
* @return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(Node node) {
// 如果是被 Signal 唤醒前被打断了(当前节点状态是 CONDITION),就将当前节点状态改为 0(表示取消等待),并添加到**同步队列**中,然后返回 true,这意味着最终 await 会抛出一个 InterruptedException
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 如果是被 Signal 唤醒之后被打断了(当前节点状态不是 CONDITION,说明已经进入了或即将进入同步队列),这里就通过自旋等待进入同步队列即可
// 最后返回了 false,意味着 await 只会设置线程的中断标志位,不会抛出 InterruptedException
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

signal

接下来,我们看一下通知signal的实现逻辑。

  1. 我们知道调用 signal 时,也是要求持有锁 lock 的,所以在 signal 中首先会判断是不是当前线程持有锁
  2. 然后检查等待队列是不是空,如果不为空,则唤醒第一个线程,唤醒线程的逻辑也很简单
    1. 首先通过修改 firstWaiter 指针,将条件队列中的队首结点排除
    2. 然后将刚才移除的节点移动到同步队列中,因为在这个过程中可能该节点对应的线程被打断,所以当发生这种情况时,我们需要唤醒队列中的下一个节点,直到成功将一个节点移动到同步队列中或者条件队列为空
      1. 怎么判断一个线程的 await 被打断了呢?就像前面所说的只要节点的状态不是 CONDITION 那它就已经被取消
      2. 将节点添加到同步队列中还不够,我们需要修改前序节点的状态,使其状态为 SIGNAL 这样才能放心,所以在 transferForSignal 的最后检查了前序节点的状态如果发现该节点已经被取消,或者修改 SIGNAL 失败,就主动唤醒该节点,因为唤醒之后该节点的线程会进入到 acquireQueued 函数中,那里有更完备的删除取消节点,加锁以及等待的处理流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 我们知道调用 signal 时,也是要求持有锁 lock 的,所以在 signal 中首先会判断是不是当前线程持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
// 首先通过修改 firstWaiter 指针,将条件队列中的队首结点排除
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 然后将刚才移除的节点移动到同步队列中,因为在这个过程中可能该节点对应的线程被打断,所以当发生这种情况时,我们需要唤醒队列中的下一个节点
// 直到成功将一个节点移动到同步队列中或者条件队列为空为止
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
* 只要节点的状态不是 CONDITION 那它就已经被取消
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

最后,我们简单地说一下 signalAll 的实现,它和 signal 的实现基本相同,只不过它会对条件队列中的每一个节点执行 transferForSignal 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/java/concurrent/aqs-condition/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议,转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。