引言
本文着重介绍 AQS 的排他模式的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>。
排他模式
获取资源
入队操作介绍完之后,我们来看一下什么情况下需要执行入队操作,我们先从排他模式说起。下面的 acquire 是 AQS 提供的一个以排他模式获取资源的函数,我们可以看到它的执行流程是:
- 先尝试获取资源 tryAcquire,tryAcquire 是一个抽象函数,看完前面的锁的分类部分大家应该对它比较熟悉,因为通过 AQS 实现的各类锁实际上就是通过对 tryAcquire 这类抽象函数的覆写来达到各种锁的效果的。
- 如果尝试加锁失败,也就是说当前该资源已经被加锁了,就通过 addWaiter 将当前线程添加到同步队列中,注意参数是 Node.EXCLUSIVE 意味着排它锁。
- 加入到同步队列后,开始执行 acquireQueued 函数,猜一下应该能猜到这里面一定是进行睡眠等待的逻辑
1 | /** |
下面的就是 acquireQueued 的代码。
- 这里面有一个循环,它会不断地获取当前节点的前序节点,如果前序节点是 head 节点(也就是那个虚拟节点,还记得吗,虚拟节点不保存有效数据,只用作指针),这里 head 节点充当一个标志位的效果,如果一个节点的前序节点是 head,那么该节点就排在队列的第一位。
- 如果 p == head ,我们就需要再调用一次 tryAcquire 尝试获取锁
- 如果第二步成功的话,当前线程已经获得到锁了,这时候要将 head 指针进行修改,可以看到 setHead 并没有使用到 CAS 指令,因为能执行到 setHead 函数的线程相当于已经获得到同步资源了,不存在竞争
- 在 setHead 函数中,对当前 Node 的 thread 和 prev 进行了清除,因为这时候该 Node 已经扮演了虚拟节点的角色,有必要把虚拟节点中用不到的属性进行清除
- 如果 CAS 执行失败(这里有很多种可能,比如被中断了,或者是刚入队,又或者是虚假唤醒(后面介绍)),则检查是否需要继续进入睡眠,一般来说如果前序节点的状态成功改为 SIGNAL 之后(但是改成 SIGNAL 之后还会再尝试获取一次锁,失败之后才会睡眠,这是防止死等的重中之重),就可以进入等待了,SIGNAL 表明当前节点肩负着唤醒下一个节点的责任,除此之外,在检查是否需要睡眠时,如果发现前序节点的请求已经被取消,则删除该节点。
- 如果发现自己确实需要睡眠,则会通过 park 函数进入等待状态,因为使用的是 park 所以不需要担心,别的线程先执行唤醒之后,当前线程再进入等待的情况,因为在这种情况下 park 函数会直接返回不会进行等待
- 最后从等待状态中恢复过来之后,检查是否是因为中断而唤醒的,是的话,就记录一下,在返回的时候以返回值的形式告诉调用者。可见 acquire 在遇到中断时,不会抛出 InterruptedException 异常,而是循环重试。如果想要达到被中断时立即抛出异常的效果,可以使用 acquireInterruptibly, 它的实现逻辑和 acquire 基本相同,主要的区别就是 park 被中断时,会抛出 InterruptedException
1 | /** |
作为延伸,我们这里带大家回顾一下 ReentrantLock 中公平锁部分对 tryAcquire 的实现。它对大家理解 AQS 如何在 CAS 修改同步队列的情况下(先修改前序指针->CAS 修改尾结点->修复后续指针),以哪种方式访问队列中的数据能够避开数据不同步的风险。简单地说,通过前序指针(prev)访问队列中的数据肯定是安全的。
1 | /** |
从上述代码中可以看到,在尝试获取公平锁时,先会判断队列中是否存在前序节点。之所以这么做是因为,只有发生互斥等待时,才会出现入队等待的情况,如果全是共享模式使用资源的话,队列会一直是空的,大家别急我们接下来就介绍共享模式的实现。这里我们先着重看一下 hasQueuedPredecessors 的实现,它是怎么判断有前序节点的呢:
- 首先如果 h != t 是说,队列的不为空,因为 head == tail 时,队列中只存在一个虚拟节点不存在实际的等待线程
- 在此基础上,我们还要判断一下当前持有锁的线程是不是自己,如果 head.next == null 说明有其他线程刚执行完入队的setTail工作(因为 h != t),但是前序指针还没修复,这种情况下 head.next == null,说明有别的线程已经持有锁了
- 另外一种可能就是 head.next != Thread.currentThread() 这时候队首持有锁的线程不是当前线程,所以存在前序节点
既然前面提到了 acquireInterruptibly 这里我们就简单地说一下,就像前面所说它和 acquire 基本相同,确实如此,从下面的代码中可以看到,它就是遇到中断时将 InterruptedException 外抛。
1 | /** |
当抛出异常时,就会执行到最后面的 cancelAcquire 函数,该函数中负责将前序节点中状态为 CANCELLED 的节点清除,然后再把自己的状态变为 CANCELLED。紧接着是一些清除工作:
- 如果当前节点是尾结点,则通过 CAS 修改尾结点即可。如果 CAS 失败了也不要紧,因为当前线程状态已经是 CANCELLED 了所以其他线程会把自己清除
- 如果当前节点不是第一个节点,即 pred != head ,我们要保证前序节点的状态是 SIGNAL,并且前序节点的线程不是当前线程,因为自己不是尾结点,所以自己当前的状态很可能就是 SIGNAL,所以这里我们无论如何要确保前序节点的状态能够修改为 SIGNAL,如果做到了,就可以大胆地通过 CAS 将自己从队列中移除
- 否则,说明自己可能是头结点,或者前序节点都取消了,也有可能前序节点的线程就是当前线程,那么就只能由自己来唤醒后续的线程了
1 | /** |
唤醒的过程也很简单:
- 如果当前节点状态不是 CANCELLED 就清除状态
- 然后先看一下 next 指针指向的节点是否需要被唤醒,next == null 代表了可能发生的前序指针和后续指针不同步,s.waitStatus > 0 表示后继节点已被取消,这时候我们就需要找到下一个需要被唤醒的节点。
- 我们需要从尾结点出发,逐个向前找,因为前序指针肯定是安全的
- 如果找到了需要被唤醒的线程,就执行 unpark 唤醒它
1 | /** |
释放资源
介绍完资源的获取,我们再来看看资源的释放流程。
- 首先,调用 tryRelease 函数,它也是一个抽象函数,在上层实现中一般会进行必要的检查,比如检查持有锁的线程是否是当前线程等,就比如如下 ReentrantLock 中对 tryRelease 的实现。
- 释放成功后,判断当前同步队列是否为空,不为空并且当前线程承担唤醒职责时(waitStatus < 0),因为当前线程能够成功执行 tryRelease,所以当前线程的 waitStatus 不会是 CANCELLED,剩下的状态 SIGNAL 是需要承担唤醒职责的,CONDITION 和 PROPAGATE 我们后面介绍。
- 具体的唤醒函数 unparkSuccessor 我们前面刚介绍过,这里就不再赘述了
1 | /** |
AQS 框架使用
看到这里,不知道大家对 AQS 的使用流程有没有体会,我的理解是在使用它时,一般我们需要覆写 tryAcquire 和 tryRelease 这类函数,它们是直接修改互斥资源 state 的函数,AQS 通过将具体的状态修改职责移交到子类中,能让子类实现各种类型的锁,就比如前一章我们介绍的那些。
而子类向外暴露 lock 和 unlock 函数时,又直接使用 AQS 中的 acquire 和 releas 函数,因为这些函数中封装了尝试加锁过程和加锁失败入队等待过程。
1 | public void lock() { acquire(1); } |
总结一下就是 AQS 对变化报开放的态度,你可以通过它完成各种同步策略,同时对与那些样板代码,都已经被它封装在了自己内部,并使用 final 关键字修饰,例如 acquire 和 release。
参考内容
[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理