本篇文章主要对put方法做一个全面的理解,里面牵涉到很多的内容,也有很多理解不到位的地方,put方法我读源码前几遍时,对大致的脉络理解了,但是对里面比较细节的处理逻辑和判断通过一遍一遍的读和理解,才有点明白,然后把它写出来和大家分享,如有错误欢迎指正,使我们共同进步,此篇文章较长,请耐心看完,但是此篇文章对扩容和线程安全的内容一笔带过,这两个重要的内容,我有专门的文章去分析。
本篇文章的主要内容如下:
1:ConcurrentHashMap的put方法(重点讲解) 2:ConcurrentHashMap的计算hash值的方法spread() 3:Concurrent hash Map的初始化方法:initTable() 4:ConcurrentHashMap的帮助扩容方法helpTransfer() 5:ConcurrentHashMap的标识符方法resizeStamp() 6:解密很多人对:( sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transfer Index <= 0的疑惑
一、Concurrent HashMap 的put方法
相对于HashMap的put方法,ConcurrentHashMap的put方法稍微复杂一点,但是如果细心的去研读,还是可以理解的。接下来我们就一起走进put方法的源码,看一下它是怎样设计的。
public V put(K key, V value) { //直接调用putValue方法 return putVal(key, value, false); } //这个方法的参数 1:key:键值对的key 2:value:键值对的value 3:onlyIfAbsent:如果为true,将不能覆盖已存在key的值。 4:evict:在HashMap中存在,但是没有实际意义,在ConcurrentHashMap中就不存在这个参数了。 final V putVal(K key, V value, boolean onlyIfAbsent) { //这一步可以看到:ConcurrentHashMap不允许null值null键,否则将抛出空指针异常。 if (key == null || value == null) throw new NullPointerException(); //$1:这一步:获取key的hash值 int hash = spread(key.hashCode()); //binCount:表示 int binCount = 0; //$2:这一步:一个无限循环, for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //$3:这一步:判断底层数组未初始化,开始调用初始化,稍后分析 tab = initTable(); //$4:这一步:说明已经初始化了,计算出key所在的下标是否有值 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //$5:这一步:利用CAS原子性的添加到数组中。 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) //插入成功,则跳出循环,插入失败:继续循环。 break ; // no lock when adding to empty bin } //这一步:MOVED=-1,是在ForwardingNode中出现的,说明正在扩容 else if ((fh = f.hash) == MOVED) //$6:这一步:帮助扩容 tab = helpTransfer(tab, f); else { V oldVal = null; //$7:这一步:说明在此下标已经有值了,此时要么在 链表 末尾加入,要么在 红黑树 上加入,所以需要加锁同步这个下标的值,锁对象就是头结点。所以可以看出:ConcurrentHashMap在插入数据时通过CAS+synchronized来保证线程安全的,这个知识点会有专门一篇文章去介绍ConcurrentHashMap的线程安全机制。 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { //这一步:说明此时是链表,查询链表中是否存在key.那为什么fh>=0就说明是链表呢?还记得我们上面分析TreeBin时,构造函数的hash是TREEBIN=-2.所以只要大于等于0说明不是红黑树。 binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { //说明是红黑树,在红黑树上操作 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //$8:这一步:说明链表的长度到了红黑树的阀门,但是是否转变成红黑树,需要看此时数组的长度是否大于了64,如果小于64:则进行扩容,如果大于64则链表才能转变成红黑树,这一个小知识点需要大家掌握。 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //$9:这一步:判断是否需要扩容,这个在扩容时详细讲解 addCount(1L, binCount); return null; }
对于put方法的内容总结如下:
1:首先判断是否初始化,如果没有初始化则进入$3进行初始化工作
2:如果已经初始化了,进入无限循环,$4原子判断key对应的数组下标是否有值了
3:如果key对应的下标没有值,则通过$5:CAS原理插入,插入成功则退出循环,插入失败则继续循环,所以这种无锁的机制都是利用无限循环+CAS操作。
4:如果key对应的下标已经存在值,判断此时hash==MOVED,则进入$6帮助扩容。
5:如果key对应的下标已经存在值,但是hash!=MOVED,则需要$7对数组的这个下标进行加锁了,以保证 线程 的安全。
6:如果数组的这个下标是一个链表,则对操作链表(判断链表用hash>=0)
7:如果数组的这个下标是一个红黑树,则操作红黑树。
8:插入成功后,如果链表的长度已经达到了红黑树的阀门8,则首先判断此时数组的长度是否大于64,如果小于64则进行扩容,如果大于等于64则链表变成红黑树
9:判断容器是否扩容
以上就是ConcurrentHashMap的put操作步骤,接下来我们对一些重要的内容进行讲解。
二、第一个重要的知识点$1:spread()方法
//h:指的是key.hashCode() static final int spread(int h) { //高16位和低16位进行异或后,在与HASH_BITS进行逻辑与操作。 HASH_BITS=0x7fffffff:用二进制表示:1111111111111111111111111111111 return (h ^ (h >>> 16)) & HASH_BITS; } 对于HashMap:只是高16位和低16位进行异或 static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); }
那么为什么ConcurrentHashMap比HashMap多了一个逻辑与操作呢,我上面提到过,hash==MOVED=-1时是正在扩容呢,hash==TREEBIN=-2是红黑树,我猜想ConcurrentHashMap加上逻辑与操作,是为了保证key计算的hash值大于等于0.大家想象一下,如果不进行最后的逻辑与操作,如果高16位和低16位通过异或操作后恰好等于-1了,岂不是和正在扩容的判断冲突了。
三、第二个重要知识点$3:初始化:initTable()
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) //这一步:说明已经有一个线程正在初始化,当前线程不需要在进行初始化了,所以可以让出CPU。 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //这一步:说明当前线程通过CAS操作把sizeCtrl变成了-1.然后进行初始化工作 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //n-(n>>>2)和0.75n相同 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
对于扩容总结如下:
1:通过CAS操作:多个线程竞争把sizeCtrl从0改成-1,因为是原子操作,只能有一个线程竞争成功。 2:竞争成功的线程开始对线程初始化 3:竞争失败的线程,调用Tread.yield()让出当前CPU。
所以初始化工作并没有看到加锁,通过CAS操作保证只有一个线程进行初始化工作。
四、第三个重要知识点$6:t帮助扩容helpTransfer()
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //这一步:判断是否正在扩容,ForwardingNode节点只有在扩容是出现 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //获取当前数组长度的标识符。 int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //原子操作sizeCtrl,成功则加1,然后进行协助扩容,失败继续,扩容会有专门的文章 transfer(tab, nextTab); break; } } return nextTab; } return table; }
大部分同学刚开始读这个帮助方法helpTransfer时,好多的内容总是搞不清楚是干什么的,接下来我会一一解答,但是扩容机制我在这就详细说了,后面会有一篇文章去详细介绍。
五、第一个比较疑惑的地方:获取当前数组长度的标识符:resizeStamp.
static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); } rs=resizeStamp(n); 1: Integer.numberOfLeadingZeros:表示一个整数二进制补码表示形式最高位1前的0的个数 2:1 << (RESIZE_STAMP_BITS - 1)二进制表示:1000 0000 0000 0000 两者逻辑或后,会让 Integer.numberOfLeadingZeros(n)的二进制最高位是1,所rs<<16位后,rs一定是一个负数。
如图所示:
当数组的长度不同时,则计算的rs也是不同的,那这个方法计算的值rs到底有什么用处呢?说是数组长度的标识符,但是还是一头雾水。我通过搜索ConcurrentHashMap的代码,在addCount()中发现了一段代码如下:
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //RESIZE_STAMP_SHIFT:等于16
上面的代码在第一个线程扩容前会执行,它把rs左移16位后+2,并存入到了sizeCtrl,上面说了rs<<16后一定是一个负数,sizeCtrl此时一定是一个负数:
1:sizeCtrl的高16位就是rs 2:sizeCtrl的低16位就是扩容的线程数+1,所以上面的+2:表示有一个线程在扩容 sizeCtrl表示如下: -1:表示正在初始化 -(N+1)表示正在有N个线程进行扩容,+2:-(1+1):一个线程正在扩容。 >0:表示下一次扩容的阀门
上面的文字用一个简单的图示如下:
通过上面的描述,大家是否很容易读懂sc代表什么意思了,rs代表什么意思了吧
六、第二个比较疑惑的地方这些判断:好多同学完全没有头绪
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) 条件1:sc>>>RESIZE_STAMP_SHIFT:sc>>>16:就是rs了,如果两者不相等,说明底层数组的长度改变了,说明扩容结束了,所以直接跳出循环,不在需要进行协助扩容了 条件2:sc==rs+1:表示扩容线程已经达到最大值,在ConcurrentHashMap永远不相等 条件3:sc==rs+MAX_RESIZERS:表示扩容线程已经达到最大值,在ConcurrentHashMap永远不相等 条件4: transferIndex <= 0:表示所有的扩容任务都被领完了,不需要在协助扩容了,这个会在扩容时详解。
上面的条件1和条件4好理解,但是条件2和条件3就非常的看不懂了,sc一定是负数,但是rs上面计算的不是正数吗?两者一定不会相等啊。这几个代码我看了几十遍才从注释上有一点理解,为什么大神Doug Lea会这样判断呢?
我们在仔细看一下resizeStamp方法,里面有一个RESIZE_STAMP_BITS,我们看一下怎样定义的:
/** * The number of bits used for generation stamp in sizeCtl. * Must be at least 6 for 32bit arrays. * */private static int RESIZE_STAMP_BITS = 16; 1:表示生成sizeCtrl的标识符 2:最小是6位。那说明这个不是一个常量啊,可以变化啊 -------------------------------------------------------------------------------------------------------- /** * The maximum number of threads that can help resize. * Must fit in 32 - RESIZE_STAMP_BITS bits. */private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; 1:表示扩容的最大线程 2:因为RESIZE_STAMP_BITS可以变,所以它也不是一个常量 ------------------------------------------------------------------------------------------------------------- /** * The bit shift for recording size stamp in sizeCtl. */private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; 1:表示移动的位数 2:这个也不是一个常量
如果RESIZE_STAMP_BITS=32的话,那么MAX_RESIZERS=0,RESIZE_STAMP_SHIFT=0,那么计算出的rs也应该是一个负数 ,如图所示:
sc == rs+1和sc==rs+MAX_RESIZERS是不是很好理解了,但是在ConcurrentHashMap中RESIZE_STAMP_BITS=16,并没有地方可以去修改它,所以这两个条件在ConcurrentHashMap中是没有意义的,永远不会为true,这是我的理解,不知道对不对,由于技术有限,不知道Doug Lea为什么会这样处理,可能是为了以后的扩展吧。
上面解释后是不是知道这些判断语句是干甚么的了,其实就是判断需要不需要协助扩容,如果不需要直接跳出循环,如果需要协助扩容,则调用transfer()进行协助扩容,在协助前,需要把扩容的线程数+1.
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; }