谈谈ConcurrentHashMap的扩容机制

概述

​ HashMap是我们日常开发中最常用的一个类,但是该类在多线程情况下无法宝成集合操作的安全性,JDK提供了HashTable和ConcurrentHashMap作为多线程下并发安全的集合,而ConcurrentHashMap以其高并发性成功使用最广泛的类。我们都知道HashMap通过哈希值将元素映射到特定的桶中,无论多么优秀的hash算法都无法避免而冲突,以上所述的几种hashmap都是通过拉链法解决冲突。而每个桶中链表较长有会影响查询效率,极端情况下所有的元素都在一个桶中,此时查找效率从O(1)退化为O(n),为了尽可能的避免这种现象hashmap通常会在装载因子大于特定值时开始扩容,也就是增加桶的数量。这篇博客就讲讲ConcurrentHashMap的扩容机制。

Jdk8中ConcurrentHashMap扩容机制

​ Jdk8摒弃了Jdk7中ConcurrentHashMap的段锁的实现方式,使用CAS+synchronized保证线程安全性,而扩容操作的线程安全也是通过这种方式保证。在map中元素个数较多的情况下,扩容操作是一个费时的操作,ConcurrentHashMap为了充分利用多线程,会将map中的桶按一定数量分配给各个线程处理,每个线程不会处理到相同的桶。我们先看下示意图,map中有64个槽,每个线程每次处理16个槽,有3个线程参与扩容并且线程1最先完成了分配的第一段槽,具体分配示意图如下:

​ 下面我们看下扩容的核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/**
* @param tab 扩容前槽数组
* @param nextTab 扩容后槽数组
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//1.计算步长(每个线程每次分配的超节点数目),如果机器核数大于1并且原槽节点数目除以8*NCPU大于16,那么步长就是原槽节点数目除以8*NCPU,否者默认是16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//2.如果是第一个线程进入扩容操作,要首先初始化扩容后槽数组,size会变为原来的2倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//该变量很关键记录槽节点已经分配的位置,volatile类型,保证修改可见性
transferIndex = n;
}
int nextn = nextTab.length;
//标记节点已经完成迁移,查询需要在新的槽数组中查找
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//标记是否进行下一轮槽节点分配
boolean advance = true;
//标记扩容是否完成
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//处理下一个槽节点
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//CAS分配槽节点,开始槽为nextIndex - stride(如果剩余槽数量小于stride,开始为0)
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//进入扩容时,扩容已经结束,进行相应处理
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//更新阈值
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果第i处的槽位null,则设置为fwd,个人认为是处理那些原本为null的槽
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果节点已经是fwd,那么继续循环,分配槽节点
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//进行节点重分配,真正的核心,对头节点加锁,防止向槽的链表(或红黑树)中插入数据
synchronized (f) {
if (tabAt(tab, i) == f) {
//扩容过程中会根据节点key的hash值的logn + 1 (logn表示以2为底n的对数)位是0还是1分为两个链表,是0的节点新老槽数组中位置不变,是1的链表会放在心数据的 n+i个槽上(是不是很巧妙,省去了hash值的计算,这也是map中长度一直是2的指数次带来的便利)
Node<K,V> ln, hn;
//如果是普通的链表节点
if (fh >= 0) {
//首先取头结点的是0还是1
int runBit = fh & n;
Node<K,V> lastRun = f;
//第一次遍历链表,找到到尾节点的高位都是0或者都是1的第一个节点,因为这个节点之后的所有节点会在新槽数组的相同位置,下面会统一处理不用循环遍历
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//如果上面循环找到的是链表是地位那么赋值为ln
if (runBit == 0) {
ln = lastRun;
hn = null;
}
//否则赋值为hn
else {
hn = lastRun;
ln = null;
}
//重新遍历链表,构建ln和hn
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//ln放在原来的位置第i个槽
setTabAt(nextTab, i, ln);
//hn放在第i+n个槽
setTabAt(nextTab, i + n, hn);
//将老的槽数组置为fwd
setTabAt(tab, i, fwd);
advance = true;
}
//如果节点是红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//同样按高位是0和1分配为两个链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果链表数目小于等6,那么将红黑树节点链表转为普通节点链表,否则生成一棵红黑树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

​ 以上就是JDK8中ConcurrentHashMap的扩容过程,充分利用多线程提高扩容效率,同时通过为每个线程分配不同范围的槽节点,避免竞争,通过synchronized关键字解决put操作带来的可能造成的并发问题。相比JDK7中将hashMap的桶分为16个互不影响的段,每个段中的扩容只有一个线程会进行扩容操作,完全通过端上的锁保证安全性。JDK8中的扩容机制更加的高效,多线程同时参与。

袁琼琼 wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
多谢支持,共同成长!
0%