Java PriorityBlockingQueue 实现

引言

本文着重介绍 Java 并发容器中 PriorityBlockingQueue 的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>

PriorityBlockingQueue

PriorityBlockingQueue 是一个带排序功能的阻塞队列,因为它是一个队列,没必要保证整个队列的内部顺序,只需要保证出队时按照排序结果出即可,所以其内部使用了二分堆得形式实现,同时,PriorityBlockingQueue 也是线程安全的,内部通过一个锁来控制堆数据的维护。

PriorityBlockingQueue 的堆数据都保存在如下的 queue 数组中,堆的根节点是 queue[0] , 就像如下注释所说的,我们可以根据一个节点的下标 n 快速计算出它的两个子节点的下标,即 queue[2*n+1]queue[2*(n+1)] 。这是用数组来描述二分堆(树)的常见方法。

1
2
3
4
5
6
7
8
9
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;

因为维护二分堆时,我们不需要保证整个堆内所有元素有序,只需要保证父子节点之间有序即可,所以当我们要插入一个元素时,直接将其插入到堆尾,然后通过其与父节点的关系,进行适当地父子节点换位,就能保证堆的性质。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* Inserts the specified element into this priority queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Queue#offer})
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 最新节点的下标(n) 等于 size,确保容量没问题
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 通过比较函数,进行父子节点换位
siftUpComparable(n, e, array);
else
// 通过比较函数,进行父子节点换位
siftUpUsingComparator(n, e, array, cmp);
// 修改 size 并唤醒等待中的出队操作
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}

/**
* Inserts item x at position k, maintaining heap invariant by
* promoting x up the tree until it is greater than or equal to
* its parent, or is the root.
*
* To simplify and speed up coercions and comparisons. the
* Comparable and Comparator versions are separated into different
* methods that are otherwise identical. (Similarly for siftDown.)
* These methods are static, with heap state as arguments, to
* simplify use in light of possible comparator exceptions.
*
* @param k the position to fill
* @param x the item to insert
* @param array the heap array
*/
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 计算父节点下标
int parent = (k - 1) >>> 1;
Object e = array[parent];
// 和父节点比较,因为默认是升序(最小堆),所以当新添加的节点大于父节点时就停下来了
if (key.compareTo((T) e) >= 0)
break;
// 新添加的节点小于父节点,那么父子节点换位,然后重复上述过程
array[k] = e;
k = parent;
}
array[k] = key;
}

当从 PriorityBlockingQueue 中执行出队操作时,直接提取下标0元素,然后用 queue 中的最后一个元素,接替 0 号元素的位置,自上而下地修正堆中元素的位置关系,使其满足堆的性质。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
// 加锁并循环出队,如果没有数据就在 notEmpty condition上等待
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}

/**
* Mechanics for poll(). Call only while holding lock.
*/
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
// 出队 0 号元素
E result = (E) array[0];
// 接替者是 queue 的末尾元素
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 接替者下移
siftDownComparable(0, x, array, n);
else
// 接替者下移
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}

从下移节点的实现中,我们可以看到,它先会比较两个子节点,选取最小的节点最为下移的目标节点。然后通过比较器比较当前节点 x 和目标节点是否满足堆的性质,如果不满足则交换节点位置,并重复上述过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Inserts item x at position k, maintaining heap invariant by
* demoting x down the tree repeatedly until it is less than or
* equal to its children or is a leaf.
*
* @param k the position to fill
* @param x the item to insert
* @param array the heap array
* @param n heap size
*/
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
// 比较左右两个节点,以最小节点作为下移的目标节点
c = array[child = right];
if (key.compareTo((T) c) <= 0)
// 父节点最小,停止
break;
// 子节点小,交换子节点和父节点,并重复上述过程
array[k] = c;
k = child;
}
array[k] = key;
}
}
贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/java/concurrent/priority-blocking-queue/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议,转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。