引言
本文着重介绍 Java 并发容器中 ConcurrentLinkedQueue 的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>。
ConcurrentLinkedQueue
Java提供的线程安全的 Queue 可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。
ConcurrentLinkedQueue 使用了链表作为其数据结构.内部使用 CAS 来进行链表的维护。ConcurrentLinkedQueue 适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的ConcurrentLinkedQueue来替代。
接下来我们简单地看一下 ConcurrentLinkedQueue 的实现,在 ConcurrentLinkedQueue 中所有数据通过单向链表存储,同时我们还会保存该链表的头指针和尾指针。
1 | // 链表中的节点 |
在对象实例化时,会创建一个虚节点。看到后面你会发现,如果想通过 CAS 维护一个链表,一般都会使用到虚节点。
1 | public ConcurrentLinkedQueue() { |
介绍完内部数据结构,我们看一下增删节点的实现方式。先来看一下增加数据的逻辑:
- 入队操作是在一个循环中尝试 CAS 操作,首先判断,尾结点p.next 是不是null,是的话就通过 CAS 将 null-> newNode,如果 CAS 成功,说明该节点就已经算是加入到队列中了
- 但是这里并没有直接修改尾结点,因为ConcurrentLinkedQueue 中 tail 并不一定是实际上的尾结点,在并发很大时,如果所有线程都要去竞争修改尾结点的话,对性能会有影响,所以,当实际的尾结点(代码中的变量 p)不等于 tail 时,才会进行更新。
- 在ConcurrentLinkedQueue中会出现 Node1(head)->Node2(tail)->null 以及 Node1(head)->Node2(tail)->Node3->null 这样的情况甚至 Node1(head)->Node2(tail)->Node3->Node4 这样的情况,虽然 tail 指针没有直接指向尾结点会导致将新节点加入链表时,需要从tail 向后查找实际的尾结点,但是这个过程相较于对tail节点的竞争来说,影响较小,最终效率也更高
- 如果发现当前p节点不是实际上的尾结点,会先检查它的next 指针是否指向自己,在出队函数poll中,将一个元素出队后会把它的next指针指向自己,所以这一步实际上是判断当前的 p 节点是否已经出队
- 如果满足上述情况,我们需要重新获取 tail 指针,如果发现在上述过程中 tail 指针发生了变化,这说明期间已经好有个并发插入过程完成了,我们直接从最新的tail对象开始上述流程即可,所以这里就将 p 赋为最新的 tail 指向的对象,
- 如果整个过程中 tail 指针都没变,说明当前的情况类似于 Node1(head,tail)-> Node2->null, 但是在判断
p == q
之前,发生了出队操作,状态变成了 Node1(tail, 已经出队的对象) Node2(head)->null,这个时候我们要将 p 设置为 head 然后从head开始向后遍历
- 最后就是单纯的没有遍历到尾结点的情况了, Node1(head)->Node2(tail,当前 p 变量)->Node3(当前q变量)->null
- 如果发现已经进行过一次向后遍历的过程,即
p != t
,并且 tail 指针发生了变化,我们就直接使用 tail 指针,不再向后遍历了 p = t(最新的tail指针) - 如果不满足上述情况,比如还从来没遍历过,或者虽然遍历过但是 tail 指针没变,我们就继续遍历 p = q(p.next)
- 如果发现已经进行过一次向后遍历的过程,即
1 | public boolean offer(E e) { |
最后,我们介绍一下出队的操作,整个出队过程也是在一个 CAS 循环中进行:
- 首先我们检查头指针的 p(head).item 是不是null,不是的话才说明该节点是一个有效节点,因为初始化是创建的虚节点item才等于null,这里通过 item 是不是 null 来判断是不是虚节点也就是说 ConcurrentLinkedQueue 中不能添加值为 null 的节点
- 找到有效节点后,通过 cas 将item改为null,后续的操作和添加元素时类似,因为 head 指针也是一个竞争点,所以这里并没有直接修改 head 指针,而是发现从 head 至少向后遍历过一次时,才会修改 head 指针,这和 offer 中的方式类似
- 如果当前线程要负责修改 head 指针,会判断 刚删掉的节点 p 的 next 是不是null,是的话就让 p 作为 head(此时p充当新的虚节点),如果不是的话,就让 p.next 作为 next(此时head就是实际上的头结点)
- 如果 p 的item == null 或者cas 失败(别的线程已经把p.item置为 null),我们要检查一下 p.next 是不是null,如果是的话说明 p已经是最后一个节点了,我们需要返回 null,但是在此之前,我们不妨把p设为新的head来减少其他线程的遍历开销
- 检查当前 p 节点的 next 指针是不是指向自己,是的话说明当前检查的这个节点已经被别的线程从队列中移除了,那我们就重新开始执行 poll
- 否则,让 p = q(p.next),也就是说这是从 head 开始向后遍历的过程
1 | public E poll() { |
updateHead 的过程中先会检查是不是真的有必要重置 head 指针,有必要的话在通过 CAS 修改 head 指针,如果CAS 失败了也无妨,毕竟我们不要求 head 一定指向实际的头结点,poll 中的遍历过程能 cover 这种情况。如果 CAS 成功,会将删掉的 head 指针指向自己。
1 | /** |
这里大家可能会有疑问,为什么要 lazySet
next 指针呢?要想理解这个问题,我们需要先理解 putOrderedObject
和 putObjectVolatile
的区别。因为 Node 中的 next 属性是用 volatile 修饰的,而 volatile 有什么特点呢?一个是防止指令重拍,一个是将其他 CPU cache 中的相关数据无效化,迫使这些 CPU 重新从主存中拉取最新数据。这是通过 Fence(内存屏障) 实现的,在 linux x86 架构中一般是 lock; addl $0,0(%%esp).
, 这里的 lock
是一个指令前缀, 它蕴含了 storeload 内存屏障的语义, 后面的 addl $0,0(%%esp)
是一个空指令, 因为 lock
前缀不能独立存在(它不是一条完整的指令), 所以在使用它的时候一般会在后面跟一条什么都不做的指令。
而 putObjectVolatile
函数等效于声明一个 volatile 变量,然后直接对该变量进行修改。也就是说,无论是 putObjectVolatile
还是对 volatile 变量的直接修改,都依赖与 StoreLoad barriers
,这里 StoreLoad barriers
就是说如果指令的顺序是 Store1; StoreLoad; Load2
,就需要确保 Store1 保存的数据在 Load2 访问数据之前,一定要能够对所有线程可见。关于内存屏障的解释,可以参考这篇手册, 其中介绍了各个内存屏障的要求,以及在不同架构上的实现方式。
而 putOrderedObject
函数呢,只需要保证当前 cpu 内指令是有序的,不会出现非法的内存访问即可,这也就是说,putOrderedObject 没有多处理期间的可见性保证,也就不会有多余的开销。在我们 ConcurrentLinkedQueue 的场景中,最终将 next 指针指向自己并不需要这么高的可见性需求,而且 next 是修饰为 volatile 的,所以,我们需要显式地调用 putOrderedObject
才能达到 “去 volatile 特性”的效果,从而提升效率。
关于它们的实现,可以参考如下代码,可以看到 ordered_store 最后插入了一个 Op_MemBarCPUOrder 内存屏障,而 putObjectVolatile
对应了 inline_unsafe_access
中的 is_volatile=true && is_store == true 的逻辑,也就是插入了 Op_MemBarVolatile 内存屏障。
1 | bool LibraryCallKit::inline_unsafe_ordered_store(BasicType type) { |
再来看看 memnode.hpp 中对这两种内存屏障的解释。MemBarVolatileNode 需要保证多 CPU 的可见性,MemBarCPUOrderNode 只需要保证单 CPU 顺序即可,而且 CPU 已经做了所有的排序工作,我们无须多做。
1 | // Ordering between a volatile store and a following volatile load. |
最后在 x86_64.ad 文件中,记录了 MemBarVolatile 内存屏障在 x86 架构下的实现也就是 lock addl...
。
1 | instruct membar_volatile(rFlagsReg cr) %{ |
参考内容
[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理