您的位置 首页 java

Java集合系列-ConcurrentHashMap-put方法的全面解析

本篇文章主要对put方法做一个全面的理解,里面牵涉到很多的内容,也有很多理解不到位的地方,put方法我读源码前几遍时,对大致的脉络理解了,但是对里面比较细节的处理逻辑和判断通过一遍一遍的读和理解,才有点明白,然后把它写出来和大家分享,如有错误欢迎指正,使我们共同进步,此篇文章较长,请耐心看完,但是此篇文章对扩容和线程安全的内容一笔带过,这两个重要的内容,我有专门的文章去分析。

本篇文章的主要内容如下:

1:ConcurrentHashMap的put方法(重点讲解)
2:ConcurrentHashMap的计算hash值的方法spread()
3:Concurrent hash Map的初始化方法:initTable()
4:ConcurrentHashMap的帮助扩容方法helpTransfer()
5:ConcurrentHashMap的标识符方法resizeStamp()
6:解密很多人对:( sc  >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
 sc == rs + MAX_RESIZERS ||  transfer Index <= 0的疑惑
 

一、Concurrent HashMap 的put方法

相对于HashMap的put方法,ConcurrentHashMap的put方法稍微复杂一点,但是如果细心的去研读,还是可以理解的。接下来我们就一起走进put方法的源码,看一下它是怎样设计的。

public V put(K key, V value) {
 //直接调用putValue方法
 return putVal(key, value, false);
}
//这个方法的参数
1:key:键值对的key
2:value:键值对的value
3:onlyIfAbsent:如果为true,将不能覆盖已存在key的值。
4:evict:在HashMap中存在,但是没有实际意义,在ConcurrentHashMap中就不存在这个参数了。
final V putVal(K key, V value, boolean onlyIfAbsent) {
//这一步可以看到:ConcurrentHashMap不允许null值null键,否则将抛出空指针异常。
 if (key == null || value == null) throw new NullPointerException();
//$1:这一步:获取key的hash值
 int hash = spread(key.hashCode());
//binCount:表示
 int binCount = 0;
//$2:这一步:一个无限循环,
 for (Node<K,V>[] tab = table;;) {
 Node<K,V> f; int n, i, fh;
 if (tab == null || (n = tab.length) == 0)
//$3:这一步:判断底层数组未初始化,开始调用初始化,稍后分析
 tab = initTable();
//$4:这一步:说明已经初始化了,计算出key所在的下标是否有值
 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//$5:这一步:利用CAS原子性的添加到数组中。
 if (casTabAt(tab, i, null,
 new Node<K,V>(hash, key, value, null)))
//插入成功,则跳出循环,插入失败:继续循环。
  break ; // no lock when adding to empty bin
 }
//这一步:MOVED=-1,是在ForwardingNode中出现的,说明正在扩容
 else if ((fh = f.hash) == MOVED)
//$6:这一步:帮助扩容
 tab = helpTransfer(tab, f);
 else {
 V oldVal = null;
//$7:这一步:说明在此下标已经有值了,此时要么在 链表 末尾加入,要么在 红黑树 上加入,所以需要加锁同步这个下标的值,锁对象就是头结点。所以可以看出:ConcurrentHashMap在插入数据时通过CAS+synchronized来保证线程安全的,这个知识点会有专门一篇文章去介绍ConcurrentHashMap的线程安全机制。
 synchronized (f) {
 if (tabAt(tab, i) == f) {
 if (fh >= 0) {
//这一步:说明此时是链表,查询链表中是否存在key.那为什么fh>=0就说明是链表呢?还记得我们上面分析TreeBin时,构造函数的hash是TREEBIN=-2.所以只要大于等于0说明不是红黑树。
 binCount = 1;
 for (Node<K,V> e = f;; ++binCount) {
 K ek;
 if (e.hash == hash &&
 ((ek = e.key) == key ||
 (ek != null && key.equals(ek)))) {
 oldVal = e.val;
 if (!onlyIfAbsent)
 e.val = value;
 break;
 }
 Node<K,V> pred = e;
 if ((e = e.next) == null) {
 pred.next = new Node<K,V>(hash, key,
 value, null);
 break;
 }
 }
 }
 else if (f instanceof TreeBin) {
//说明是红黑树,在红黑树上操作
 Node<K,V> p;
 binCount = 2;
 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
 value)) != null) {
 oldVal = p.val;
 if (!onlyIfAbsent)
 p.val = value;
 }
 }
 }
 }
 if (binCount != 0) {
 if (binCount >= TREEIFY_THRESHOLD)
//$8:这一步:说明链表的长度到了红黑树的阀门,但是是否转变成红黑树,需要看此时数组的长度是否大于了64,如果小于64:则进行扩容,如果大于64则链表才能转变成红黑树,这一个小知识点需要大家掌握。
 treeifyBin(tab, i);
 if (oldVal != null)
 return oldVal;
 break;
 }
 }
 }
//$9:这一步:判断是否需要扩容,这个在扩容时详细讲解
 addCount(1L, binCount);
 return null;
}
 

对于put方法的内容总结如下:

1:首先判断是否初始化,如果没有初始化则进入$3进行初始化工作
2:如果已经初始化了,进入无限循环,$4原子判断key对应的数组下标是否有值了
3:如果key对应的下标没有值,则通过$5:CAS原理插入,插入成功则退出循环,插入失败则继续循环,所以这种无锁的机制都是利用无限循环+CAS操作。
4:如果key对应的下标已经存在值,判断此时hash==MOVED,则进入$6帮助扩容。
5:如果key对应的下标已经存在值,但是hash!=MOVED,则需要$7对数组的这个下标进行加锁了,以保证 线程 的安全。
6:如果数组的这个下标是一个链表,则对操作链表(判断链表用hash>=0)
7:如果数组的这个下标是一个红黑树,则操作红黑树。
8:插入成功后,如果链表的长度已经达到了红黑树的阀门8,则首先判断此时数组的长度是否大于64,如果小于64则进行扩容,如果大于等于64则链表变成红黑树
9:判断容器是否扩容
 

以上就是ConcurrentHashMap的put操作步骤,接下来我们对一些重要的内容进行讲解。

二、第一个重要的知识点$1:spread()方法

//h:指的是key.hashCode()
static final int spread(int h) {
//高16位和低16位进行异或后,在与HASH_BITS进行逻辑与操作。
HASH_BITS=0x7fffffff:用二进制表示:1111111111111111111111111111111
 return (h ^ (h >>> 16)) & HASH_BITS;
}
对于HashMap:只是高16位和低16位进行异或
static final int hash(Object key) {
 int h;
 return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
 

那么为什么ConcurrentHashMap比HashMap多了一个逻辑与操作呢,我上面提到过,hash==MOVED=-1时是正在扩容呢,hash==TREEBIN=-2是红黑树,我猜想ConcurrentHashMap加上逻辑与操作,是为了保证key计算的hash值大于等于0.大家想象一下,如果不进行最后的逻辑与操作,如果高16位和低16位通过异或操作后恰好等于-1了,岂不是和正在扩容的判断冲突了。

三、第二个重要知识点$3:初始化:initTable()

private final Node<K,V>[] initTable() {
 Node<K,V>[] tab; int sc;
 while ((tab = table) == null || tab.length == 0) {
 if ((sc = sizeCtl) < 0)
//这一步:说明已经有一个线程正在初始化,当前线程不需要在进行初始化了,所以可以让出CPU。
 Thread.yield(); // lost initialization race; just spin
 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//这一步:说明当前线程通过CAS操作把sizeCtrl变成了-1.然后进行初始化工作
 try {
 if ((tab = table) == null || tab.length == 0) {
 int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
 @SuppressWarnings("unchecked")
 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
 table = tab = nt;
//n-(n>>>2)和0.75n相同
 sc = n - (n >>> 2);
 }
 } finally {
 sizeCtl = sc;
 }
 break;
 }
 }
 return tab;
}
 

对于扩容总结如下:

1:通过CAS操作:多个线程竞争把sizeCtrl从0改成-1,因为是原子操作,只能有一个线程竞争成功。
2:竞争成功的线程开始对线程初始化
3:竞争失败的线程,调用Tread.yield()让出当前CPU。
 

所以初始化工作并没有看到加锁,通过CAS操作保证只有一个线程进行初始化工作。

四、第三个重要知识点$6:t帮助扩容helpTransfer()

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
 Node<K,V>[] nextTab; int sc;
 //这一步:判断是否正在扩容,ForwardingNode节点只有在扩容是出现
 if (tab != null && (f instanceof ForwardingNode) &&
 (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
 //获取当前数组长度的标识符。
 int rs = resizeStamp(tab.length);
 while (nextTab == nextTable && table == tab &&
 (sc = sizeCtl) < 0) {
 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
 sc == rs + MAX_RESIZERS || transferIndex <= 0)
 break;
 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
 //原子操作sizeCtrl,成功则加1,然后进行协助扩容,失败继续,扩容会有专门的文章
 transfer(tab, nextTab);
 break;
 }
 }
 return nextTab;
 }
 return table;
}
 

大部分同学刚开始读这个帮助方法helpTransfer时,好多的内容总是搞不清楚是干什么的,接下来我会一一解答,但是扩容机制我在这就详细说了,后面会有一篇文章去详细介绍。

五、第一个比较疑惑的地方:获取当前数组长度的标识符:resizeStamp.

static final int resizeStamp(int n) {
 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
rs=resizeStamp(n);
1: Integer.numberOfLeadingZeros:表示一个整数二进制补码表示形式最高位1前的0的个数
2:1 << (RESIZE_STAMP_BITS - 1)二进制表示:1000 0000 0000 0000
两者逻辑或后,会让 Integer.numberOfLeadingZeros(n)的二进制最高位是1,所rs<<16位后,rs一定是一个负数。
 

如图所示:

当数组的长度不同时,则计算的rs也是不同的,那这个方法计算的值rs到底有什么用处呢?说是数组长度的标识符,但是还是一头雾水。我通过搜索ConcurrentHashMap的代码,在addCount()中发现了一段代码如下:

else if (U.compareAndSwapInt(this, SIZECTL, sc,
 (rs << RESIZE_STAMP_SHIFT) + 2))
//RESIZE_STAMP_SHIFT:等于16
 

上面的代码在第一个线程扩容前会执行,它把rs左移16位后+2,并存入到了sizeCtrl,上面说了rs<<16后一定是一个负数,sizeCtrl此时一定是一个负数:

1:sizeCtrl的高16位就是rs
2:sizeCtrl的低16位就是扩容的线程数+1,所以上面的+2:表示有一个线程在扩容
sizeCtrl表示如下:
-1:表示正在初始化
-(N+1)表示正在有N个线程进行扩容,+2:-(1+1):一个线程正在扩容。
>0:表示下一次扩容的阀门
 

上面的文字用一个简单的图示如下:

通过上面的描述,大家是否很容易读懂sc代表什么意思了,rs代表什么意思了吧

六、第二个比较疑惑的地方这些判断:好多同学完全没有头绪

if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
 sc == rs + MAX_RESIZERS || transferIndex <= 0)
条件1:sc>>>RESIZE_STAMP_SHIFT:sc>>>16:就是rs了,如果两者不相等,说明底层数组的长度改变了,说明扩容结束了,所以直接跳出循环,不在需要进行协助扩容了
条件2:sc==rs+1:表示扩容线程已经达到最大值,在ConcurrentHashMap永远不相等
条件3:sc==rs+MAX_RESIZERS:表示扩容线程已经达到最大值,在ConcurrentHashMap永远不相等
条件4: transferIndex <= 0:表示所有的扩容任务都被领完了,不需要在协助扩容了,这个会在扩容时详解。
 

上面的条件1和条件4好理解,但是条件2和条件3就非常的看不懂了,sc一定是负数,但是rs上面计算的不是正数吗?两者一定不会相等啊。这几个代码我看了几十遍才从注释上有一点理解,为什么大神Doug Lea会这样判断呢?

我们在仔细看一下resizeStamp方法,里面有一个RESIZE_STAMP_BITS,我们看一下怎样定义的:

/**
 * The number of bits used for generation stamp in sizeCtl.
 * Must be at least 6 for 32bit arrays.
 *
 */private static int RESIZE_STAMP_BITS = 16;
1:表示生成sizeCtrl的标识符
2:最小是6位。那说明这个不是一个常量啊,可以变化啊
--------------------------------------------------------------------------------------------------------
/**
 * The maximum number of threads that can help resize.
 * Must fit in 32 - RESIZE_STAMP_BITS bits.
 */private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
1:表示扩容的最大线程
2:因为RESIZE_STAMP_BITS可以变,所以它也不是一个常量
-------------------------------------------------------------------------------------------------------------
/**
 * The bit shift for recording size stamp in sizeCtl.
 */private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
1:表示移动的位数
2:这个也不是一个常量
 

如果RESIZE_STAMP_BITS=32的话,那么MAX_RESIZERS=0,RESIZE_STAMP_SHIFT=0,那么计算出的rs也应该是一个负数 ,如图所示:

sc == rs+1和sc==rs+MAX_RESIZERS是不是很好理解了,但是在ConcurrentHashMap中RESIZE_STAMP_BITS=16,并没有地方可以去修改它,所以这两个条件在ConcurrentHashMap中是没有意义的,永远不会为true,这是我的理解,不知道对不对,由于技术有限,不知道Doug Lea为什么会这样处理,可能是为了以后的扩展吧。

上面解释后是不是知道这些判断语句是干甚么的了,其实就是判断需要不需要协助扩容,如果不需要直接跳出循环,如果需要协助扩容,则调用transfer()进行协助扩容,在协助前,需要把扩容的线程数+1.

if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
 transfer(tab, nextTab);
 break;
}
 

文章来源:智云一二三科技

文章标题:Java集合系列-ConcurrentHashMap-put方法的全面解析

文章地址:https://www.zhihuclub.com/191778.shtml

关于作者: 智云科技

热门文章

网站地图