publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); // 先会将当前线程添加到 Condition 的**条件队列**中 Node node = addConditionWaiter(); // 添加到**条件队列**后,它才将锁释放,并保存了之前的锁状态,因为在唤醒时,必须要恢复锁状态 int savedState = fullyRelease(node); int interruptMode = 0; // 只要不处于同步队列中,就通过 park 等待 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 如果被打断了,就将当前线程加入到同步队列中,因为从 await 返回时,无论如何我们也要获取到锁,被打断和被 Signal 都一样 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 走到这说明当前节点处于等待队列中时(无论是被Signal唤醒还是被打断),开始调用 acquireQueued 等待获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter(){ Node t = lastWaiter; // If lastWaiter is cancelled, clean out. 清除无用的节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 因为此时持有锁,所以 addConditionWaiter 就是拿到尾指针然后新建一个节点并加入(使用Node的nextWaiter字段保存下一个Node) // 注意新建节点的状态是 CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } /** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ privatevoidunlinkCancelledWaiters(){ Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; // 状态不等于 CONDITION 代表了已经被取消或者已经被Signal唤醒,将其从条件队列中清楚 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } /** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ finalbooleanisOnSyncQueue(Node node){ if (node.waitStatus == Node.CONDITION || node.prev == null) returnfalse; if (node.next != null) // If has successor, it must be on queue returntrue; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node); } /** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ privateintcheckInterruptWhileWaiting(Node node){ return Thread.interrupted() ? // 检查是否被打断 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * Transfers node, if necessary, to sync queue after a cancelled wait. * Returns true if thread was cancelled before being signalled. * * @param node the node * @return true if cancelled before the node was signalled */ finalbooleantransferAfterCancelledWait(Node node){ // 如果是被 Signal 唤醒前被打断了(当前节点状态是 CONDITION),就将当前节点状态改为 0(表示取消等待),并添加到**同步队列**中,然后返回 true,这意味着最终 await 会抛出一个 InterruptedException if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); returntrue; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ // 如果是被 Signal 唤醒之后被打断了(当前节点状态不是 CONDITION,说明已经进入了或即将进入同步队列),这里就通过自旋等待进入同步队列即可 // 最后返回了 false,意味着 await 只会设置线程的中断标志位,不会抛出 InterruptedException while (!isOnSyncQueue(node)) Thread.yield(); returnfalse; }
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node){ for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ privatevoidreportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) thrownew InterruptedException(); elseif (interruptMode == REINTERRUPT) selfInterrupt(); }
signal
接下来,我们看一下通知signal的实现逻辑。
我们知道调用 signal 时,也是要求持有锁 lock 的,所以在 signal 中首先会判断是不是当前线程持有锁
将节点添加到同步队列中还不够,我们需要修改前序节点的状态,使其状态为 SIGNAL 这样才能放心,所以在 transferForSignal 的最后检查了前序节点的状态如果发现该节点已经被取消,或者修改 SIGNAL 失败,就主动唤醒该节点,因为唤醒之后该节点的线程会进入到 acquireQueued 函数中,那里有更完备的删除取消节点,加锁以及等待的处理流程
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignal(){ // 我们知道调用 signal 时,也是要求持有锁 lock 的,所以在 signal 中首先会判断是不是当前线程持有锁 if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ privatevoiddoSignal(Node first){ do { // 首先通过修改 firstWaiter 指针,将条件队列中的队首结点排除 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 然后将刚才移除的节点移动到同步队列中,因为在这个过程中可能该节点对应的线程被打断,所以当发生这种情况时,我们需要唤醒队列中的下一个节点 // 直到成功将一个节点移动到同步队列中或者条件队列为空为止 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ finalbooleantransferForSignal(Node node){ /* * If cannot change waitStatus, the node has been cancelled. * 只要节点的状态不是 CONDITION 那它就已经被取消 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); returntrue; }
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node){ for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
最后,我们简单地说一下 signalAll 的实现,它和 signal 的实现基本相同,只不过它会对条件队列中的每一个节点执行 transferForSignal 函数。
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignalAll(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ privatevoiddoSignalAll(Node first){ lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }