Java LinkedBlockingQueue 实现

引言

本文着重介绍 Java 并发容器中 LinkedBlockingQueue 的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,满足FIFO的特性,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于Integer. MAX_VALUE。那么什么是阻塞队列呢?我们知道队列有入队出队两个操作,所谓阻塞队列,就是说如果队列已满时,可以阻塞入队操作,而如果队列为空时,可以阻塞出队操作。

为了实现阻塞效果并保证线程安全,它的内部用到了两个锁和两个Condition。

1
2
3
4
5
6
7
8
9
10
11
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

在进行数据出队时,先要获得 takeLock,然后检查当前队列容量是否为 0,如果队列容量为 0,则在 notEmpty 上等待,否则直接执行出队操作。最后判断一下,是不是执行出队操作之前,队列已经达到最大容量,如果是的话,就唤醒等待中的入队操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// 获取锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果队列为空,就等待
while (count.get() == 0) {
notEmpty.await();
}
// 否则,执行出队操作,并修改 size
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 如果队列不为空,则唤醒下一个等待中的出队操作
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果之前队列满了,则唤醒等待中的入队操作
if (c == capacity)
signalNotFull();
return x;
}

/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

入队操作和出队操作正好相反,同样先获取锁,不过这里用的是 putLock,检查当前队列是否已满,是的话就在 notFull 上等待,否则执行入队操作并修改size,如果之前的队列长度为 0,那么就有可能有一些出队操作被阻塞了,所以我们这里需要唤醒所有在 notEmpty 上等待的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 先获取锁
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

LinkedBlockingQueue 除了阻塞版的入队出队操作外,当然也有不阻塞的接口,不过这些接口比较简单,基本上就是在上述基础取消 await 和 signal 逻辑,这里就不再赘述了。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/java/concurrent/linked-blocking-queue/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。