Java ConcurrentHashMap 实现

引言

从本篇文章开始,我们将介绍 Java 并发容器的实现方式,本文会着重介绍其中的 ConcurrentHashMap 部分。所有关于 Java 并发的文章均收录于<Java并发系列文章>

ConcurrentHashMap

用过 HashMap 的同学应该都知道,它不是线程安全的,在极端情况下可能会发生死循环,要想在多线程环境下使用 HashMap 一个简单的思路是加一个锁,在调用 HashMap 的函数前首先要获得该锁。但是这样做有一个问题就是效率会比较差。

ConcurrentHashMap 的数据组织和 HashMap 基本相同。通过一个数组来实现 Hash 桶,当没发生 Hash 冲突时,每个 Hash 桶内都保存一个 Key-Value Entry(Node 对象)。对桶内数据的修改都是通过 CAS 操作进行的,因为数组中的元素没法声明为 volatile, 所以从哈希表中读取数据时,使用到了 UNSAFE 的 getObjectVolatile 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* The bin count threshold for using a tree rather than list for a
* bin. Bins are converted to trees when adding an element to a
* bin with at least this many nodes. The value must be greater
* than 2, and should be at least 8 to mesh with assumptions in
* tree removal about conversion back to plain bins upon
* shrinkage.
*/
static final int TREEIFY_THRESHOLD = 8;

/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;

SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

ConcurrentHashMap 的 Entry 只是用到了对象 hash 码的正数部分,因为它把一些负数的 Hash 码用来描述状态了。比如用 -1 表达当前节点正在迁移,-2 表示当前节点时一个红黑树的根。-3 表示当前节点是一个保留节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

/**
* Spreads (XORs) higher bits of hash to lower and also forces top
* bit to 0. Because the table uses power-of-two masking, sets of
* hashes that vary only in bits above the current mask will
* always collide. (Among known examples are sets of Float keys
* holding consecutive whole numbers in small tables.) So we
* apply a transform that spreads the impact of higher bits
* downward. There is a tradeoff between speed, utility, and
* quality of bit-spreading. Because many common sets of hashes
* are already reasonably distributed (so don't benefit from
* spreading), and because we use trees to handle large sets of
* collisions in bins, we just XOR some shifted bits in the
* cheapest possible way to reduce systematic lossage, as well as
* to incorporate impact of the highest bits that would otherwise
* never be used in index calculations because of table bounds.
*/
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

当发生 Hash 冲突时,先通过链表来保存 Hash 相同的所有 Key-Value Entry(Node 对象)。从下面 Node 的实现中,我们可以看到它实际上就是一个链表的实现(包含next指针)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
//...
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

当链表的数量大于 TREEIFY_THRESHOLD(8)时,会用红黑树的 Node 代替链表来保存 Key-Value Entry。红黑树是一个自平衡的二叉树,能以 LogN 的时间复杂度修改和查找数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}

/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}

介绍完主要的内部数据结构,我们来看一看 hash 表的初始化部分。这里面用到了一个 sizeCtl,它初始保存的是 HashMap 初始大小,在完成hash表的初始化之后,它保存的是下次进行扩容时的表内数据的数量。在进行初始化时,sizeCtl 还充当了锁的角色,我们需要通过它来控制进行初始化工作的线程数量,只让一个线程进行初始化,其他线程等待。初始化完成后,sizeCtl 保存了下次进行扩容时,需要的数据数量,计算规则是 0.75 * 当前容量 。而当进行扩容时,sizeCtl 又起到了记录并发扩容线程数的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0) // 有其他线程在初始化,直接 yield
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 加锁
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 初始化
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 初始化完成后,sizeCtl 保存了下次进行扩容时,需要的数据数量,计算规则是 0.75 * 当前容量
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

接下来我们介绍一下添加数据时的处理逻辑。

  1. 必要时先进行初始化
  2. 如果当前 key 所在槽位为空,通过 CAS 创建初始 Node,其中直接保存了 key value,如果成功则直接返回
  3. 否则,检查是否正在进行扩容,多线程一起扩容
  4. 走到这,说明当前hash表槽位已经被占,这时候我们需要对该槽位保存的 Node 加锁,该 Node 可能是链表的头也可能是红黑树的”树根”
  5. 加锁成功后,要确保锁没加错对象,因为在此之前可能别的线程已经把这个槽位的Node由链表改成了红黑树
  6. 接下来根据 Node 节点的hash进行分情况处理,hash码大于0说明当前是链表
    1. 检查对应的 key 是不是已经在链表中,则直接修改
    2. 检查到尾结点仍然没找到对应的key,则在尾部添加节点
  7. 否则如果 Node 节点是红黑树的树根节点类型,则在红黑树中添加或修改节点,这里面需要对数进行平衡,这里就不展开介绍了,在红黑树算法那篇文章中有红黑树的介绍
  8. 添加完数据之后,如果该槽位的 Node 是链表,则检查链表长度,如果链表长度大于等于 8 则适时地将其转换为红黑树
  9. 如果对应的 key 是第一次 put 进map中,则修改当前数据数量,并适时地扩容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果没有初始化,进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果当前 key 所在槽位为空,通过 CAS 创建初始 Node,其中直接保存了 key value
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 扩容中,帮助一起扩容,多线程扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 对 Node 加锁
synchronized (f) {
if (tabAt(tab, i) == f) { // 确保加锁后,锁没加错对象,因为在此之前可能别的线程已经把这个槽位的Node由链表改成了红黑树
if (fh >= 0) { // hash码大于0说明当前是链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 检查对应的 key 是不是已经在链表中
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 检查到尾结点仍然没找到对应的key,则在尾部添加节点
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 如果 Node 节点是红黑树的树根节点类型,则在红黑树中添加节点
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果该槽位的 Node 是链表,则检查链表长度,如果链表长度大于等于 8 则适时地将其转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
// 如果发现覆盖了之前的值,则不进行后续扩容,直接返回结果
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

先来看一下链表转红黑树的逻辑:

  1. 这里先会检查哈希桶的数量,如果桶的容量过小(小于64),会进行提前扩容,而不会转为红黑树
  2. 否则,对该桶节点重新加锁(因为执行此函数时,putVal 已经释放了锁),然后构建 TreeNode 节点,这里只是构建 TreeNode 链表结构,实际的红黑树构建过程在 TreeBin 的构造函数中,红黑树的构建这里就不展开了,有兴趣的同学可以看一下红黑树算法那篇文章
  3. 最后,将构建好的红黑树 TreeBin 设置到对应槽位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Replaces all linked nodes in bin at given index unless table is
* too small, in which case resizes instead.
*/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 如果桶的容量过小(小于64),会进行提前扩容,而不会转为红黑树
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
// 对该桶节点重新加锁(因为执行此函数时,putVal 已经释放了锁)
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
// 这里只是构建 TreeNode 链表结构
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// TreeBin 的构造函数中构建红黑树,红黑树的构建这里就不展开了,最后,将构建好的红黑树 TreeBin 设置到对应槽位
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
//...
}

介绍完红黑树的转换过程后,我们再来介绍一下扩容逻辑,这里我们以 addCount 函数为例,它会在每次 putVal 添加了新元素之后调用,其中 x 是增加的元素数量,check 表示是否要进行扩容检查,规则是 check < 0 不进行检查(常用与移除元素时),check <=1 在没有竞争的时候检查:

  • putVal 因槽位为 null 而新添加元素时(check=0)
  • putVal 时已经存在元素,而且该元素是链表结构,如果目标key是链表的头结点(check=1), 或者链表只有一个元素(check=1),而当头结点不是目标key或者链表长度大于 1 时(check>1)
  • putVal 时, 如果对应槽位保存的是红黑树节点,则 check= 2
  • remove 函数移除元素时,check=-1

在ConcurrentHashMap 中,为了拉满性能,对数据size的维护也进行了优化,它的优化策略很像 linux 中多cpu联合计数器的思路。ConcurrentHashMap 有一个基计数器 baseCount,所有线程在增加size时,先通过 CAS 对 baseCount 进行修改,如果修改失败,它会为当前线程开辟一个服务于当前线程的计数器(以类似于哈希表的形式存储),不过这个计数器也会发生冲突,当发生冲突时,一般采用扩容和重新hash的方式处理,通过种种操作,降低互斥时长。光说的话有点抽象,我们看一下相关代码吧。

  1. 如果线程独享的计数器 hash 表 counterCells 不为空或者通过 CAS 修改 baseCount 失败的话,说明 baseCount 上出现了竞争,对 size 的计算需要通过线程独享的计数器来实现
  2. 紧接着,如果 counterCells 为空,或者counterCells大小为0, 或者当前线程还没有分配 counterCells 槽位,或者从属于当前线程的 counterCell 计数器也发生冲突时,会通过 fullAddCount 进行 counterCells hash 表的创建,或为当前线程分配 counterCells 槽位,或counterCells 哈希表扩容,或者rehash等操作来规避竞争
  3. 如果存在baseCount竞争,并且check <= 1 则不进行扩容检查
  4. 通过baseCount加所有counterCells的值统计合计size
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 统计容量 size, 执行加一操作
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 如果线程独享的计数器 hash 表 counterCells 不为空或者通过 CAS 修改 baseCount 失败的话,说明 baseCount 上出现了竞争,对 size 的计算需要通过线程独享的计数器来实现
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// counterCells 为空,或者counterCells大小为0, 或者当前线程还没有分配 counterCells 槽位,或者从属于当前线程的 counterCell 计数器也发生冲突时
// 会通过 fullAddCount 进行 counterCells hash 表的创建,或为当前线程分配 counterCells 槽位,或counterCells 哈希表扩容,或者rehash等操作来规避竞争
fullAddCount(x, uncontended);
return;
}
// 如果存在baseCount竞争,并且check <= 1 则不进行扩容检查
if (check <= 1)
return;
// 通过baseCount加所有counterCells的值统计合计size
s = sumCount();
}
// ...
}

其中 sumCount 比较简单,就是把 baseCount 和所有counterCells的值加起来。

1
2
3
4
5
6
7
8
9
10
11
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

fullAddCount 的实现很复杂,我们这里制作简单的介绍,不往深挖。

  1. 在每个线程中,会分配一个探针值,这个探针值通过 localInit 进行初始化,我觉得这里大家就把他简单地理解为线程中保存的随机数,它保存在 java.lang.Thread#threadLocalRandomProbe 字段,通过 Contended 注解来解决伪共享问题。

    标注了 Contented 注解的字段 JVM 会自动为其增加内存填充,使对象的大小大于缓存行的大小,防止伪共享问题,伪共享问题我们前面介绍 FastThreadLocal 时提过,这里就不再赘述。

  2. 如果 counterCells hash 表等于空(代码在fullAddCount的后半段),就初始化 counterCells hash 表,初始大小是2,创建好之后,对当前线程对应的槽位进行赋值。所有对 counterCells 的修改都是通过一个 CELLSBUSY 自旋锁进行保护的
  3. 如果创建 counterCells hash 表的过程也发生了冲突就重新通过 baseCount 进行 size 的更新, 代码在fullAddCount的最后几行
  4. 如果 counterCells hash 表不为空,通过前面得到的线程探针值与 counterCells hash 表的容量-1 相与,得到所属的槽位
    1. 如果所属槽位为空,先加 CELLSBUSY 自旋锁,然后创建 CounterCell 对象并存在对应槽位,如果这一步操作成功了的话,就返回
    2. 如果上述操作没有成功,说明出现了很严重的冲突,这里先试着对当前线程对应的槽位 CounterCell 进行更新,如果成功就返回
    3. 如果上述操作都失败,就对 counterCells hash 表进行扩容,扩为原来的2倍,然后重新执行上述操作
    4. 如果上述操作全失败,而且扩容的时候还发生冲突,就重置当前线程的探针值,相当于再换一个随机数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) { // 0 表示未分配探针值
ThreadLocalRandom.localInit(); // force initialization
// 对探针进行初始化
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
// 如果 counterCells hash 表不为空,前面得到的线程探针值与 counterCells hash 表的容量-1 相与,得到所属的槽位,
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 如果上述操作没有成功,说明出现了很严重的冲突,
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 如果上述操作都失败,就对 counterCells hash 表进行扩容,扩为原来的2倍,然后重新执行上述操作
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 如果上述操作全失败,而且扩容的时候还发生冲突,就重置当前线程的探针值,相当于再换一个随机数
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 如果 counterCells hash 表等于空,就初始化 counterCells hash 表,所有对 counterCells 的修改都是通过一个 CELLSBUSY 自旋锁进行保护的
boolean init = false;
try { // Initialize table
if (counterCells == as) {
// hash 表的初始大小为 2,将当前线程对应的槽位进行设置
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 创建 counterCells hash 表的过程也发生了冲突就重新通过 baseCount 进行 size 的更新
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}

看完 ConcurrentHashMap 的 size 计算方案,我就在想要是 JDK 能提供一个类似于 per-CPU 属性的功能,并提供原子的修改操作(只需要在修改前后禁用中断即可)以及其他线程访问的能力,基本上就能实现上述这么一大堆代码所达到的效果了,而且效率应该还能更高。

讲了这么一大串 addCount 的计数功能总算讲完了,接下来我们讲扩容。

  1. 通过前面计算出来的当前 size 和 sizeCtl(保存了扩容阈值) 对比,如果 size>sizeCtl 并且小于 MAXIMUM_CAPACITY,则开始扩容逻辑
  2. 首先,我们需要计算一个标志位,来描述扩容的 epoch
    • 熟悉 hashmap 的同学应该知道 hashmap 的最大容量永远是 2 的幂,无论是初始化大小还是扩容之后的大小,每次扩容都是 size * 2,而扩容的阈值是 size 的四分之三,我们观察一下它们在二进制数上的特征,假设当前size=2^16, 0b1 0000 0000 0000 0000 。有没有发现只要最大容量不发生变化,那么该数中前面0的个数就不会发生变化。
    • 就是因为如此,resizeStamp 计算方式就是统计最大容量tab.length中前面 0 的数量然后 | (1 << 15) ,这里为什么要或上(1 << 15)呢?因为我们需要用负数SIZECTL来表示正在扩容的过程,现在计算的标志位之后会左移 16 位,后 16 位用来保存参与扩容的线程数。所以这里的 1 << 15 最后会变成 1<< 31, 变成符号位,这样整个 SIZECTL 就是负数了,我们也就可以通过 SIZECTL 是正数还是负数来表示当前是扩容中,还是正常使用中。
  3. 计算好标志位后,我们需要检查一下当前的 sizeCtl 是不是小于0
    • 如果是说明正在进行扩容,我们加入进去,但是加入之前,我们得看看当前的标志位和自己的相不相同(在不在同一个 epoch),怎么判断呢?
      • sizeCtl 右移 16 位看看和之前计算的标志位一样不,如果不一样说明容量已经变了,所以直接跳出
      • 如果标志位一样,检查一下是不是sizeCtl刚好比计算的标志位多1,这里我一直感觉它写的有问题,我猜测这里的本意是判断 sizeCtl 的后16位中描述线程数的部分是不是1,因为描述线程数的部分保存的是(threadNumber+1),所以如果这个地方为1,则说明当前参与扩容的线程数为 0,那么我们这时候应该跳出扩容过程,如果是按照这个思路来看的话,这一段代码应该改为 (sc == ( rs<<<RESIZE_STAMP_SHIFT ) +1 才对,我后来从网上搜了一下,发现这确实是一个 JDK 的 BUG,这个BUG在JDK 11之后才修复,虽然 Bug 描述中提供的解决方案也是错的,但是下面的留言中有一个人说对了。
      • 如果标志位一样,但是达到最大并发resize线程数时,也需要跳出,也就是代码中的 sc == rs + MAX_RESIZERS ,这段代码也有 bug,和上面的bug都在 JDK 的 BUG中提到了,它应该改为 sc == ( rs<<<RESIZE_STAMP_SHIFT ) + MAX_RESIZERS
      • 如果上面的都不满足,检查 nextTable 是不是空,nextTable 是用来保存扩容之后的新 map 的,如果它为空,说明扩容已经结束,可以直接跳出
      • 最后检查一下 transferIndex 是不是小于等于 0,transferIndex 表述的是当前扩容过程处理到的的 index 是多少,因为是逆序处理的,所以小于等于 0 意味着处理结束,直接跳出
      • 如果上述检查都没通过,说明确实应该让当前线程加入到扩容任务中,所以最后通过 CAS 修改 SIZECTL,将最后面的线程数+1,如果成功则正式开始扩容任务,否则说明 SIZECTL 发生了变化,重新执行上述步骤
    • 如果不是,说明我们是第一个发现要扩容的线程,这时候将标志位左移 16 位,然后 + 2,为什么 +2 呢,因为在进行扩容时,后 16 保存的是扩容中的 thread size +1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 统计容量 size, 执行加一操作
//...
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果 size>sizeCtl 并且小于 MAXIMUM_CAPACITY,则开始扩容逻辑
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 计算了一个标志位
int rs = resizeStamp(n);
// 其他线程正在扩容
if (sc < 0) {
// 扩容完成, 本线程就不需要继续扩容了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// CAS 把 sizeCtl 成功加一, 本线程开始协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}

// 本线程是第一个扩容的
// 此时就把 sizeCtl 设置成一个非常大的负数
// 因为是第一个扩容, 所以新数组是 null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

/**
* 返回一个标识, 这个标识经过 RESIZE_STAMP_SHIFT 左移必定为负数
* Integer.numberOfLeadingZeros 返回 n 对应 32 位二进制数左侧 0 的个数
* 如 9(0000 0000 0000 0000 0000 0000 0000 1001)返回 28
* 1 << (RESIZE_STAMP_BITS - 1) = 2^15,其中 RESIZE_STAMP_BITS = 16
* RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS = 16
*/
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

同样,在 helpTransfer 中(putVal 时发现对应节点正在移动时,会执行它),也有和上述逻辑相同的代码,而且同样存在 BUG。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 协助扩容方法
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 移动中的槽位会保存一个 ForwardingNode 对象,描述了该节点正准备往哪里移动,((ForwardingNode<K,V>)f).nextTable)
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// 循环判断是否扩容完成
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 再次判断是否扩容完成,这里的bug和前面一样
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// sizeCtl 加一,也就是resize线程数+1, 然后协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

实际的扩容过程就是新建了一个哈希表,然后将当前哈希表的内容移到新hash表中。

  1. 首先会计算单个线程每个批次处理的节点数 stide
  2. 然后如果 nextTab 为null则新建一个size*2 的hash表,设置到 nextTable 属性上,并将 transferIndex 改为之前 hash 表的槽位数量,这里不用担心两个线程重复覆盖 nextTable,因为只有成功将 sizeCtl 的后16位设置为 2 的线程才会出现 nextTab 为null的情况
  3. 紧接着就是实际的复制过程,首先要根据计算的单轮处理节点数,从 transferIndex 上”预约任务”,实际上就是通过 CAS 修改 transferIndex 如果改成功了就说明这一段数据归自己负责
  4. 然后对于归自己负责的每一个槽位进行处理,如果槽位为空,就通过 CAS 改为ForwardingNode节点
  5. 如果发现一个待处理节点,则先对节点加锁,然后根据该节点是链表还是红黑树,进行对应的 rehash 过程,处理完之后,将之前的节点槽位改为ForwardingNode节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 单个线程每个批次处理的节点数
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range

// nextTab 作为临时数组先扩容一倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;

// 这是一个特殊的节点, hash 值设置为 -1, 也就是常量 MOVED
// 扩容过程中遇到索引位置为空就设置成该节点
// 或者索引位置不为空, 但是已经处理复制后也把索引位置设置为该节点
// 目的是为了告诉其他线程不需要再处理该索引位置
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

// 表示索引 i 节点是否被复制成功
boolean advance = true;
// 表示所有节点复制完成
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;

// 这个循环目的很简单
// 首先我们要知道扩容是一批一批的复制到新数组的
// 比如把索引范围 [10, 16) 的节点复制到新数组
// 而这里是逆序扩容, 比如原来数组范围是 [0, 16), 首先是对 [10, 16) 进行复制
// 还有变量 stride 就是区间大小, 比如这里就是 6
// 所以这个循环目的就是为了找出允许线程扩容的索引范围 [bound, i]
// 这里只有更新共享变量 transferIndex 才用到 CAS 算法, 其他操作就不需要了
while (advance) {
int nextIndex, nextBound;
// 满足 [bound, i] 这个区间或者已经完成扩容, 跳出这个循环
if (--i >= bound || finishing)
advance = false;
// nextIndex 是边界 i 的临时保存, 如果小于 0, 说明没有要复制的节点了
// transferIndex 是共享变量, 保存区间范围的上限, 初始值是旧数组长度
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}

// 尝试更新 transferIndex
// 如果成功, 当前线程就负责复制 [nextBound, nextIndex) 范围的节点
// transferIndex 变成 nextBound
// 注意这里 i=nextIndex-1, 所以 [nextBound, nextIndex) 也是 [bound, i]
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}

// 下面开始复制 [bound, i] 范围的节点, 逆序复制, 从 i 开始

// 对于扩容完成处理
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
// sizeCtl 设置为总大小的 0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 扩容完成, sizeCtl 减一
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 扩容前 sizeCtl 会设置成 resizeStamp(n) << RESIZE_STAMP_SHIFT + 2
// 如果不相等说明有其他线程执行扩容完成的操作了, 本线程不需要重复操作了
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}

// 对于 i 的节点为空, 那么设置指向特殊节点 ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 当前线程判断到这个节点的 hash 值是 MOVED
// 说明是特殊节点, 已经有其他线程操作了, 可以跳过这个节点
else if ((fh = f.hash) == MOVED)
advance = true; // already processed

// 如果 i 既不是空值, 也不是特殊节点, 说明这是个普通节点
// 那么就开始对这个链表或者树进行复制, 首先是把它锁上, 防止其他线程同时操作它
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// hash 值大于0说明是链表节点
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 这里的实现和 HashMap 相同,因为每次扩容都相当于比之前的hash值多考虑了一位,所以这里可以进行简化处理, 如果 hash & n == 0 说明保持原来的位置,
// 否则挪到当前 index+n 的位置
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 复制完成后用特殊节点代替原来节点
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
// 红黑树节点
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 这里创建 TreeBin 来构造红黑树, 这里的逻辑也和链表相同,如果 hash & n == 0 说明保持原来的位置,否则挪到当前 index+n 的位置
// 如果发现树的size小于 UNTREEIFY_THRESHOLD=6,则转成链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 复制完成后用特殊节点代替原来节点
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

这里我们简单地说一下 hashMap 如何进行 rehash,我们假设一个节点的 hash 是 0b0011 , 当我们的最大容量n是 2^2 = 4 时,我们计算可得该节点对应的槽位是 0b0011 & 0b0011(n-1) , 也就是 3 号槽位,而当发生扩容后,最大容量newN变成了 2^3 = 8,新计算的hash值是 0b0011 & 0b0111(newN-1) ,仍然是3,也就是说rehash之后保持原位。看到这你可能会发现 hashMap rehash 过程的秘密,我们没必要每次计算完整的 hash 值,只需要多计算一位就够了,如果这一位是 0(右数第三位), 0b0011 & 0b0111 index 仍然是 index 3,而如果这一位是 1(右数第三位), 0b0111 & 0b0111 对应的 index 就变成了 7,而 7 正好就是 n + previousIndex 的值。

最后我们简单看一下 remove 的过程,它和 putVal 差不多。

  1. 找到对应的槽位,如果已经是 null,就返回,如果是 MOVED 就帮着进行 resize
  2. 找到对应槽位的节点后,先加锁,然后根据节点类型做出不同处理
    • 如果节点是链表,则将其从链表中移除
    • 如果节点是红黑树, 则将其从树中移除,移除后如果发现树的节点过少(通过检查root,root.left,root.right, root.left.left 是不是等于 null)如果是,说明树节点小于 6,那么就把它转成链表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
* Implementation for the four public remove/replace methods:
* Replaces node value with v, conditional upon match of cv if
* non-null. If resulting value is null, delete.
*/
final V replaceNode(Object key, V value, Object cv) {
// 找到对应的槽位
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
// 如果已经是 null,就返回
break;
else if ((fh = f.hash) == MOVED)
// 如果是 MOVED 就帮着进行 resize
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 如果节点是链表,则将其从链表中移除
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/java/concurrent/concurrent-hashmap/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议,转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。