第四章、 Java 的多线程和并发库
3.3 ConcurrentHashMap 非阻塞 Hash 集合
- 在JDK1.7主要通过定义 Segment 分段锁来实现多个 线程 对 ConcurrentHashMap 的数据的并发读写操作。整个ConcurrentHashMap 由多个 Segment 组成,每个 Segment保存整个 ConcurrentHashMap 的一部分数据,Segment 结合 Reentrant lock ,即 Segment 继承于 ReentrantLock ,来实现写互斥,读共享,具体为有多少个 Segment ,则任何时候可以最多支持这么多个线程同时进行写操作,任意多个线程进行读操作。在 ConcurrentHashMap 的Segment实现中,对写操作使用 ReentrantLock 来进行加锁,读操作不加锁,通过volatile 来实现线程之间的可见性。
- 在JDK1.8中为了进一步优化 Concurrent HashMap 的性能,去掉了 Segment 分段锁的设计,在数据结构方面,则是跟 HashMap 一样,使用一个 哈希表 table 数组,即数组 + 链表或者数组 + 红黑树。而线程安全方面是结合 CAS 机制来实现的,CAS 底层依赖JDK的 UNSAFE 所提供的硬件级别的 原子操作 。同时在 HashMap 的基础上,对哈希表table 数组和链表节点的 value,next 指针等使用 volatile 来修饰,从而实现线程可见性。
———————————————
ConcurrentHashMap(JDK1.7版本)
segment :分段锁
1、在HashMap中,是使用一个哈希表,即元素为链表结点 Node 组成的数组 table,而在 ConcurrentHashMap 中是使用多个哈希表,具体为通过定义一个Segment来封装这个哈希表其中 Segment 继承于 ReentrantLock,故自带 lock 的功能。即每个 Segment 其实就是相当于一个 HashMap,只是结合使用了 ReentrantLock 来进行并发控制,实现线程安全。
2、Segment 定义如下:ConcurrentHashMap 的一个静态内部类,继承于 ReentrantLock,在内部定义了一个HashEntry 数组 table,HashEntry 是链表的节点定义,其中 table 使用 volatile 修饰,保证某个线程对table 进行新增链表节点(头结点或者在已经存在的链表新增一个节点)对其他线程可见。
/**
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/static final class Segment<K,V> extends ReentrantLock implements Serializable {
...
/**
* The maximum number of times to tryLock in a prescan before
* possibly blocking on acquire in preparation for a locked
* segment operation. On multiprocessors, using a bounded
* number of retries maintains cache acquired while locating
* nodes.
*/ static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/ transient volatile HashEntry<K,V>[] table;
...
}
3、HashEntry 的定义如下:包含key,value,key的hash,所在链表的下一个节点next。
/**
* ConcurrentHashMap list entry. Note that this is never exported
* out as a user-visible Map.Entry.
*/static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
...
}
由定义可知,value和next均为使用volatile修饰,当多个线程共享该HashEntry所在的Segment时,其中一个线程对该Segment内部的某个链表节点HashEntry的value或下一个节点next修改能够对其他线程可见。而hash和key均使用final修饰,因为创建一个链表节点HashEntry,是根据key的hash值来确定附加到哈希表数组table的某个链表的,即根据hash计算对应的table数组的下标,故一旦添加后是不可变的。
Segment的哈希表table数组的容量
1、MIN_SEGMENT_TABLE_CAPACITY:table数组的容量最小量,默认为2。
/**
* The default concurrency level for this table, used when not
* otherwise specified in a constructor.
*/static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* The maximum number of segments to allow; used to bound
* constructor arguments. Must be power of two less than 1 << 24.
*/static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
在ConcurrentHashMap的构造函数定义实际大小:使用ConcurrentHashMap的整体容量initialCapacity除以Segments数组的大小,得到每个Segment内部的table数组的实际大小。
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
// ssize:segments数组的大小
// 不能小于concurrencyLevel,默认为16
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// cap:Segment内部HashEntry数组的大小
// 最小为MIN_SEGMENT_TABLE_CAPACITY,默认为2
// 实际大小根据c来计算,而c是由上面代码,
// 根据initialCapacity / ssize得到,
// 即整体容量大小除以Segment数组的数量,则
// 得到每个Segment内部的table的大小
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
并发性控制
ConcurrentHashMap通过多个Segment,即内部包含一个Segment数组,来支持多个线程同时分别对这些Segment进行读写操作,从而提高并发性。
并发性:默认通过DEFAULT_CONCURRENCY_LEVEL来定义Segment的数量,默认为16,即创建大小为16的Segment数组,这样在任何时刻最多可以支持16个线程同时对ConcurrentHashMap进行写操作,即每个Segment都可以有一个线程在进行写操作。也可以在ConcurrentHashMap的构造函数中指定concurrencyLevel的值。
MAX_SEGMENTS:定义Segment数组的容量最大值,默认值为2的16次方。
/**
* The default concurrency level for this table, used when not
* otherwise specified in a constructor.
*/static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* The maximum number of segments to allow; used to bound
* constructor arguments. Must be power of two less than 1 << 24.
*/static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
put写操作
首先通过key的hash确定segments数组的下标,即需要往哪个segment存放数据。确定好segment之后,则调用该segment的put方法,写到该segment内部的哈希表table数组的某个链表中,链表的确定也是根据key的hash值和segment内部table大小取模。
在ConcurrentHashMap中的put操作是没有加锁的,而在Segment中的put操作,通过ReentrantLock加锁:
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p> The value can be retrieved by calling the <tt>get</tt> method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>
* @throws NullPointerException if the specified key or value is null
*/@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
// 根据key的hash只,确定具体的Segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 如果segments数组的该位置还没segment
// 则数组下标j对应的segment实例
s = ensureSegment(j);
// 往该segment实例设值
return s.put(key, hash, value, false );
}
Segment类的put操作定义:首先获取lock锁,然后根据key的hash值,获取在segment内部的HashEntry数组table的下标,从而获取对应的链表,具体为链表头。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// tryLock:非阻塞获取lock,如果当前没有其他线程持有该Segment的锁,则返回null,继续往下执行;
// scanAndLockForPut:该segment锁被其他线程持有了,则非阻塞重试3次,超过3次则阻塞等待锁。之后返回对应的链表节点。
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
// 链表头结点
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
// 已经存在,则更新value值
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
// 更新value时,也递增modCount,而在HashMap中是结构性修改才递增。
++modCount;
}
break ;
}
e = e.next;
}
else {
// 注意新增节点时,是在头部添加的,即最后添加的节点是链表头结点
// 这个与HashMap是不一样的,HashMap是在链表尾部新增节点。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// 当前新增的该节点作为链表头结点放在哈希表table数组中
setEntryAt(tab, index, node);
++modCount;
// 递增当前整体的元素个数
count = c;
oldValue = null;
break;
}
}
} finally {
// 释放lock锁
unlock();
}
// 返回旧值
return oldValue;
}
scanAndLockForPut的实现:获取该segment锁,从而可以进行put写操作,同时获取当前需要写的值对应的HashEntry链表节点,查找获取之前已经存在的或者创建一个新的。
/**
* Scans for a node containing given key while trying to
* acquire lock, creating and returning one if not found. Upon
* return, guarantees that lock is held. UNlike in most
* methods, calls to method equals are not screened: Since
* traversal speed doesn't matter, we might as well help warm
* up the associated code and accesses as well.
*
* @return a new node if key not found, else null
*/private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 非阻塞自旋获取lock锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
// MAX_SCAN_RETRIES为2,尝试3次后,则当前线程阻塞等待lock锁
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
get读操作
获取指定的key对应的value,get读操作是不用获取lock锁的,即不加锁的,通过使用UNSAFE的volatile版本的方法保证线程可见性。实现如下:
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 获取segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 通过hash值计算哈希表table数组的下标,从而获取对应链表的头结点
// 从链表头结点遍历链表
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
1、通过key的hash值,调用UNSAFE.getObjectVolatile方法,由于是volatile版本,可以实现线程之间的可见性(遵循happend-before原则),故可以从最新的segments数组中获取该key所在的segment;
2、然后根据key的hash值,获取该segment内部的哈希表table数组的下标,从而获取该key所在的链表的头结点,然后从链表头结点开始遍历该链表,最终如果没有找到,则返回null或者找到对应的链表节点,返回该链表节点的value。
size容量计算
size方法主要是计算当前hashmap中存放的元素的总个数,即累加各个segments的内部的哈希表table数组内的所有链表的所有链表节点的个数。
实现逻辑为:整个计算过程刚开始是不对segments加锁的,重复计算两次,如果前后两次hashmap都没有修改过,则直接返回计算结果,如果修改过了,则再加锁计算一次。
/**
* Returns the number of key-value mappings in this map. If the
* map contains more than <tt> Integer .MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of key-value mappings in this map
*/public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
// 所有的segments
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
// 累加modCounts,这个在每次计算都是重置为0
long sum; // sum of modCounts
// 记录前一次累加的modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
// RETRIES_BEFORE_LOCK值为2
// retries初始值为-1
// retries++ == RETRIES_BEFORE_LOCK,表示已经是第三次了,故需要加锁,前两次计算modCounts不一样,即期间有写操作。
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
// 每个segment都加锁,此时不能执行写操作了
ensureSegment(j).lock(); // force creation
}
// sum重置为0
sum = 0L;
size = 0;
overflow = false;
// 遍历每个segment
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
// 累加各个segment的modCount,以便与上一次的modCount进行比较,
// 看在这期间是否对segment修改过
sum += seg.modCount;
// segment使用count记录该segment的内部的所有链表的所有节点的总个数
int c = seg.count;
// size为记录所有节点的个数,用于作为返回值,使用size += c来累加
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
// 前后两次都相等,
// 则说明在这期间没有写的操作,
// 故可以直接返回了
if (sum == last)
break;
last = sum;
}
} finally {
// retries 大于 RETRIES_BEFORE_LOCK,
// 说明加锁计算过了,需要释放锁
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
ConcurrentHashMap(JDK1.8版本)
核心字段
1、哈希表table数组,如下与HashMap一样也是使用一个Node类型的数组table来定义的,不同之处是使用volatile修饰该数组。
2、baseCount 和counterCells:用来记录当前 ConcurrentHashMap 存在多少个元素使用的,在进行增删链表节点时,默认是更新baseCount的值即可,如果同时存在多个线程并发进行对链表节点的增删操作,则放弃更新 baseCount,而是 counterCells 数组中添加一个 CounterCell ,之后在计算 size 的时候,累加 baseCount 和遍历并累加counterCells。
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/transient volatile Node<K,V>[] table;
/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/private transient volatile long baseCount;
/**Table of counter cells. When non-null, size is a power of 2.*/private transient volatile CounterCell[] counterCells;
链表节点Node的定义:value,next使用volatile修饰保证线程可见性。
/**
* Key-value entry. This class is never exported out as a
* user-mutable Map.Entry (i.e., one supporting setValue; see
* MapEntry below), but can be used for read-only traversals used
* in bulk tasks. Subclasses of Node with a negative hash field
* are special, and contain null keys and values (but are never
* exported). Otherwise, keys and vals are never null.
*/static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
...
/**
* Virtualized support for map.get(); overridden in subclasses.
*/ Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
核心方法
UNSAFE:硬件级别的原子操作
主要定义了获取,更新,添加链表节点Node的方法,具体为基于UNSAFE类提供的硬件级别的原子操作来保证线程安全,而不是通过加锁机制,如 synchronized 关键字,ReentrantLock重入锁来实现,即无锁化。
/* ---------------- Table element access -------------- */
/*
* Volatile access methods are used for table elements as well as
* elements of in-progress next table while resizing. All uses of
* the tab arguments must be null checked by callers. All callers
* also paranoically precheck that tab's length is not zero (or an
* equivalent check), thus ensuring that any index argument taking
* the form of a hash value anded with (length - 1) is a valid
* index. Note that, to be correct wrt arbitrary concurrency
* errors by users, these checks must operate on local variables,
* which accounts for some odd-looking inline assignments below.
* Note that calls to setTabAt always occur within locked regions,
* and so in principle require only release ordering, not
* full volatile semantics, but are currently coded as volatile
* writes to be conservative.
*/
@SuppressWarnings("unchecked")
// 原子获取链表节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// CAS更新或新增链表节点
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// 原子新增链表节点
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
put写操作
写操作主要在putVal方法定义,实现逻辑与HashMap的putVal基本一致,只是相关操作,如获取链表节点,更新链表节点的值value和新增链表节点,都会使用到UNSAFE提供的硬件级别的原子操作,而如果是更新链表节点的值或者在一个已经存在的链表中新增节点,则是通过synchronized同步锁来实现线程安全性。
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p>The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with {@code key}, or
* {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key or value is null
*/public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// i为该key在table数组的下标
// null表示该key对应的链表(具体为链表头结点)
// 在哈希表table中还不存在
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 新增链表头结点,cas方式添加到哈希表table
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// f为链表头结点,使用synchronized加锁
// 则整条链表则被加锁了
synchronized (f) {
// 再次检查,即double check
// 即避免进入同步块之前,链表被修改了
if (tabAt(tab, i) == f) {
// hash值大于0
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 节点已经存在,更新value即可
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 该key对应的节点不存在,
// 则新增节点并添加到该链表的末尾
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树节点,
// 则往该红黑树更新或添加该节点即可
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 判断是否需要将链表转为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 递增ConcurrentHashMap的节点总个数
addCount(1L, binCount);
return null;
}
1、如果当前需要put的key对应的链表在哈希表table中还不存在,即还没添加过该key的hash值对应的链表,则调用UNSAFE的casTabAt方法,基于CAS机制来实现添加该链表头结点到哈希表table中,避免该线程在添加该链表头结的时候,其他线程也在添加的并发问题;如果CAS失败,则进行自旋,通过继续第2步的操作;
2、如果需要添加的链表已经存在哈希表table中,则通过UNSAFE的tabAt方法,基于volatile机制,获取当前最新的链表头结点f,由于f指向的是ConcurrentHashMap的哈希表table的某条链表的头结点,故虽然f是临时变量,由于是引用共享的该链表头结点,所以可以使用synchronized关键字来同步多个线程对该链表的访问。在synchronized(f)同步块里面,则是与HashMap一样遍历该链表,如果该key对应的链表节点已经存在,则更新,否则在链表的末尾新增该key对应的链表节点。
3、使用synchronized同步锁的原因:因为如果该key对应的节点所在的链表已经存在的情况下,可以通过UNSAFE的tabAt方法基于volatile获取到该链表最新的头节点,但是需要通过遍历该链表来判断该节点是否存在,如果不使用synchronized对链表头结点进行加锁,则在遍历过程中,其他线程可能会添加这个节点,导致重复添加的并发问题。故通过synchronized锁住链表头结点的方式,保证任何时候只存在一个线程对该链表进行更新操作。
4、锁的范围缩小:相对于JDK1.7的Segment分段锁,即分段锁的写操作,在操作之前需要先获取lock锁,即不管是(1)链表不存在,添加链表头结点,(2)还是更新链表节点,(3)还是在已经存在的链表中添加节点,都需要先获取lock锁,而在JDK1.8的写操作中,(1)如果该链表不存在,添加链表头的时候是不需要加锁的,因为是往哈希表table数组的某个位置填充值,不需要遍历链表之类的,所以可以基于UNSAFE的casTabAt方法,即CAS机制检查table数组的该位置是否存在元素(链表头结点)来实现线程安全,这是写操作最先检查的;如果该链表已经存在,即(2)(3)则需要通过synchronized来锁住该链表头结点,而在JDK1.7的实现中是锁住该Segment内部的整个哈希表table数组,所以这里也是一个性能提升的地方,缩小了锁的范围。
get读操作
get读操作由于是从哈希表中查找并读取链表节点数据,不会对链表进行写更新操作,故基于volatile的happend-before原则保证的线程可见性(即一个线程的操作对其他线程可见),即可保证get读取到该key对应的最新链表节点,整个过程不需要进行加锁。
具体为table和Node的value和next均是volatile修饰。
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// 获取链表头结点
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 头结点的key相等说明找到了,直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历该链表,查找对应的节点
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
size容量计算
1、size方法为计算当前ConcurrentHashMap一共存在多少链表节点,与JDK1.7中每次需要遍历segments数组来计算不同的是,在JDK1.8中,使用baseCount和counterCells数组,在增删链表节点时,实时更新来统计,在size方法中直接返回即可。整个过程不需要加锁。
2、并发修改异常处理:CounterCell的value值为1,作用是某个线程在更新baseCount时,如果存在其他线程同时在更新,则放弃更新baseCount的值,即保持baseCount不变,而是各自往counterCells数组添加一个counterCell元素,在size方法中,累加counterCells数组的value,然后与baseCount相加,从而获取准确的大小。
/**
* {@inheritDoc}
*/public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
/* ---------------- Counter support -------------- */
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
// sum初始化为baseCount
long sum = baseCount;
if (as != null) {
// 遍历counterCells并累加其value到sum
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
3、addCount:在put写操作之后,递增baseCount值。在putVal中调用addCount(1L, binCount);,即递增1,在其批量操作中,则可以是批量的数量作为参数,如addCount(100L, binCount)
/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
// CAS更新baseCount失败,表示存在并发异常
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// CAS更新失败时,在counterCells数组添加一个counterCell对象
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
// 往counterCells数组添加counterCell对象
// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
3.4 ConcurrentHashMap(JDK1.7和JDK1.8), HashTable 与Collections.synchronizedMap
HashTable
HashTable 是同步版本的 HashMap,内部数据结构与 HashMap 一样也都是使用链式哈希来实现的,即数组 + 链表(JDK1.8版本的HashMap新增了数组 + 红黑树的优化)。
HashTable通过在方法中使用synchronized关键字修饰来进行线程同步,实现线程安全,但是由于是方法级别使用synchronized,即是使用HashTable对象自身作为monitor锁,故多个线程的读读,读写,写写都是互斥,即任何使用只能存在一个线程对HashTable对象进行访问,所以HashTable的并发性能是较低的。
链表节点定义:与 HashMap 一样都是继承于Map.Entry:value和next都是不需要使用volatile修饰的,因为synchronized 同时具有线程同步和线程可见性的作用。
/**
* Hashtable bucket collision list entry
*/private static class Entry<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Entry<K,V> next;
protected Entry(int hash, K key, V value, Entry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
方法级别的 synchronized 同步:get,put等方法均是使用 synchronized 修饰
// get读操作
public synchronized V get(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
// %取模,而HashMap为通过位运算取模,性能较高
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return (V)e.value;
}
}
return null;
}
// put写操作
public synchronized V put(K key, V value) {
// Make sure the value is not null
if (value == null) {
throw new NullPointerException();
}
// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
// 更新
Entry<K,V> entry = (Entry<K,V>)tab[index];
for(; entry != null ; entry = entry.next) {
if ((entry.hash == hash) && entry.key.equals(key)) {
V old = entry.value;
entry.value = value;
return old;
}
}
// 新增节点
addEntry(hash, key, value, index);
return null;
}
private void addEntry(int hash, K key, V value, int index) {
modCount++;
Entry<?,?> tab[] = table;
if (count >= threshold) {
// Rehash the table if the threshold is exceeded
rehash();
tab = table;
hash = key.hashCode();
index = (hash & 0x7FFFFFFF) % tab.length;
}
// Creates the new entry.
@SuppressWarnings("unchecked")
Entry<K,V> e = (Entry<K,V>) tab[index];
tab[index] = new Entry<>(hash, key, value, e);
count++;
}
如果需要使用线程安全的 HashMap,则优先考虑使用ConcurrentHashMap,或者使用 Collections.synchronizedMap 来将HashMap 包装成线程安全的SynchronizedMap。因为ConcurrentHashMap和HashMap的实现更加高效,如根据key的hash计算哈希表table数组的下标的实现,HashTable 是使用%取余,而ConcurrentHashMap和HashMap都是使用位运算,效率更高,所以整体性能高于HashTable。
Collections.synchronizedMap
Collections.synchronizedMap 的方法定义如下:将传入的Map使用 SynchronizedMap 包装后返回一个SynchronizedMap实例,SynchronizedMap是线程安全的,这是一种包装器设计模式的使用。
/**
* Returns a synchronized (thread-safe) map backed by the specified
* map. In order to guarantee serial access, it is critical that
* <strong>all</strong> access to the backing map is accomplished
* through the returned map.<p>
*
* It is imperative that the user manually synchronize on the returned
* map when iterating over any of its collection views:
* <pre>
* Map m = Collections.synchronizedMap(new HashMap());
* ...
* Set s = m.keySet(); // Needn't be in synchronized block
* ...
* synchronized (m) { // Synchronizing on m, not s!
* Iterator i = s.iterator(); // Must be in synchronized block
* while (i.hasNext())
* foo(i.next());
* }
* </pre>
* Failure to follow this advice may result in non-deterministic behavior.
*
* <p>The returned map will be serializable if the specified map is
* serializable.
*
* @param <K> the class of the map keys
* @param <V> the class of the map values
* @param m the map to be "wrapped" in a synchronized map.
* @return a synchronized view of the specified map.
*/public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
return new SynchronizedMap<>(m);
}
```
```
SynchronizedMap的定义:SynchronizedMap 的实现主要是在内部定义了一个被包装的map的引用,一个对象锁mutex;
在方法内使用synchronized(mutex),即使用同步代码块的方式来替代HashTable的使用同步方法的方式,缩小同步范围,提高性能,在代码块中对被包装的map进行操作,从而实现线程安全。
```
```java
private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable {
private static final long serialVersionUID = 1978198479659022715L;
private final Map<K,V> m; // Backing Map
final Object mutex; // Object on which to synchronize
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}
SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}
public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public boolean containsValue(Object value) {
synchronized (mutex) {return m.containsValue(value);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}
public void putAll(Map<? extends K, ? extends V> map) {
synchronized (mutex) {m.putAll(map);}
}
public void clear() {
synchronized (mutex) {m.clear();}
}
...
}
3.5 AtomicBoolean 原子性布尔
AtomicBoolean 是 java.util.concurrent.atomic 包下的原子变量,这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由 JVM 从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。
AtomicBoolean,在这个 Boolean 值的变化的时候不允许在之间插入,保持操作的原子性。
下面将解释重点方法并举例:
boolean compareAndSet(expectedValue, updateValue), 这个方法主要两个作用:
1.比较 AtomicBoolean 和 expect 的值,如果一致,执行方法内的语句。其实就是一个 if 语句
2.把 AtomicBoolean 的值设成 update,比较最要的是这两件事是一气呵成的,这连个动作之间不会被打断,任何内部或者外部的语句都不可能在两个动作之间运行。为多线程的控制提供了解决的方案下面我们从代码上解释。
1、获得 AtomicBoolean 的值
你可以通过使用 get() 方法来获取一个 AtomicBoolean 的值。
示例如下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean value = atomicBoolean.get();
2、设置 AtomicBoolean 的值
你可以通过使用 set() 方法来设置一个 AtomicBoolean 的值。
示例如下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
atomicBoolean.set(false);
以上代码执行后 AtomicBoolean 的值为 false。
3、交换 AtomicBoolean 的值
你可以通过 getAndSet() 方法来交换一个 AtomicBoolean 实例的值 getAndSet() 方法将返回 AtomicBoolean 当前的值,并将为 AtomicBoolean 设置一个新值。
示例如下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean oldValue = atomicBoolean.getAndSet(false);
以上代码执行后 oldValue 变量的值为true,atomicBoolean 实例将持有false值。代码成功将AtomicBoolean 当前值 ture 交换为 false。
4、比较并设置 AtomicBoolean 的值
compareAndSet() 方法允许你对 AtomicBoolean 的当前值与一个期望值进行比较,如果当前值等于期望值的话,将会对 AtomicBoolean 设定一个新值。 compareAndSet() 方法是原子性的,因此在同一时间之内有单个线程执行它。因此 compareAndSet() 方法可被用于一些类似 于锁的同步的简单实现。
以下一个compareAndSet() 示例:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean expectedValue = true;
boolean newValue = false;
boolean wasNewValueSet = atomicBoolean.compareAndSet(expectedValue, newValue);
本示例对 AtomicBoolean 的当前值与 true 值进行比较,如果相等,将 AtomicBoolean 的值更新为 false
3.6 AtomicInteger 原子性整型
AtomicInteger,一个提供原子操作的 Integer 的类。在 Java 语言中, ++i 和 i++操作并不是线程安全的,
在使用的时候,不可避免的会用到 synchronized 关键字。而 AtomicInteger 则通过一种线程安全的加减操
作接口。
//以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果
public final int addAndGet(int delta)
//如果输入的数值等于预期值,则以原子方式将该值设置为输入的值
public final boolean compareAndSet(int expect, int update)
//最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值
public final void lazySet(int newValue)
//获取当前的值
public final int get()
//获取当前的值,并设置新的值,以原子方式设置为newValue的值,并返回旧值
public final int getAndSet(int newValue)
//获取当前的值,并自增,以原子方式将当前值加1,注意:这里返回的是自增前的值
public final int getAndIncrement()
//获取当前的值,并加上预期的值,返回的是增加以前值
public final int getAndAdd(int delta)
//自减,并获得自减后的值
public final int incrementAndget()
//将值减一,并返回减一后的值
public final int decrementAndGet()
//将值减一,返回减一之前的值
public final int getAndDecrement()
3.7 AtomicIntegerArray 原子性整型数组
AtomicIntegerArray 类提供了可以以原子方式读取和写入的底层 int 数组的操作,还包含高级原子操作。AtomicIntegerArray 支持对底层 int 数组变量的原子操作。 它具有获取和设置方法,如在变量上的读取和写入。也就是说,一个集合与同一变量上的任何后续 get 相关联。 原子 compareAndSet 方法也具有这些内存一致性功能。AtomicIntegerArray 本质上是对 int[]类型的封装。使用 Unsafe 类通过 CAS 的方式控制 int[]在多线程下的安全性。
//获得数组第 i 个下标的元素
public final int get(int i)
//获得数组的长度
public final int length()
//将数组第 i 个下标设置为 newValue,并返回旧的值
public final int getAndSet(int i, int newValue)
//进行 CAS 操作,如果第 i 个下标的元素等于 expect,则设置为 update,设置成功返回 true
public final boolean compareAndSet(int i, int expect, int update)
//将第 i 个下标的元素加 1
public final int getAndIncrement(int i)
//将第 i 个下标的元素减 1
public final int getAndDecrement(int i)
//将第 i 个下标的元素增加 delta(delta 可以是负数)
public final int getAndAdd(int i, int delta)
3.8 AtomicLong、 AtomicLongArray 原子性整型数组
AtomicLong、 AtomicLongArray 的 API 跟 AtomicInteger、 AtomicIntegerArray 在使用方法都是差不多的。区别在于用前者是使用原子方式更新的 long 值和 long 数组,后者是使用原子方式更新的 Integer 值和 Integer 数组。两者的相同处在于它们此类确实扩展了 Number,允许那些处理基于数字类的工具和实用工具进行统一访问。在实际开发中,它们分别用于不同的场景。
public class Test {
public static void main(String[] agrs) {
final AtomicLong orderIdGenerator = new AtomicLong(0);
final List<Item> orders = Collections.synchronizedList(new ArrayList<Item>());
for (int i = 0; i < 10; i++) {
Thread orderCreationThread = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 10; i++) {
long orderId = orderIdGenerator.incrementAndGet();
Item order = new Item(Thread.currentThread().getName(), orderId);
orders.add(order);
}
}
});
orderCreationThread.setName("Order Creation Thread " + i);
orderCreationThread.start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Set<Long> orderIds = new HashSet<Long>();
for (Item order : orders) {
orderIds.add(order.getID());
System.out.println("Order name:" + order.getItemName() + "----" + "Order id:" + order.getID());
}
}
}
class Item {
String itemName;
long id;
Item(String n, long id) {
this.itemName = n;
this.id = id;
}
public String getItemName() {
return itemName;
}
public long getID() {
return id;
}
}
运行结果:
Order name:Order Creation Thread 0----Order id:1
Order name:Order Creation Thread 1----Order id:2
Order name:Order Creation Thread 0----Order id:4
Order name:Order Creation Thread 1----Order id:5
Order name:Order Creation Thread 3----Order id:3
Order name:Order Creation Thread 0----Order id:7
Order name:Order Creation Thread 1----Order id:6
........
Order name:Order Creation Thread 2----Order id:100
3.9 ReentrantLock重入锁
可重入: 单线程可以重复进入,但要重复退出
可中断: lock.lockInterruptibly()
可限时: 超时不能获得锁,就返回 false,不会永久等待构成死锁
公平锁: 先来先得, public ReentrantLock(boolean fair), 默认锁不公平的, 根据线程优先级竞争
public class ReenterLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
lock.lock();
// 超时设置
// lock.tryLock(5, TimeUnit.SECONDS);
try {
i++;
} finally {
// 需要放在finally里释放, 如果上面lock了两次, 这边也要unlock两次
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLock tl = new ReenterLock();
Thread t1 = new Thread(tl);
Thread t2 = new Thread(tl);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
中断死锁
线程1, 线程2分别去获取lock1, lock2, 触发死锁. 最终通过 DeadlockChecker 来触发线程中断.
public class DeadLock implements Runnable{
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public DeadLock(int lock) {
this.lock = lock;
}
@Override
public void run() {
try {
if (lock == 1){
lock1.lockInterruptibly();
try {
Thread.sleep(500);
}catch (InterruptedException e){}
lock2.lockInterruptibly();
}else {
lock2.lockInterruptibly();
try {
Thread.sleep(500);
}catch (InterruptedException e){}
lock1.lockInterruptibly();
}
}catch (InterruptedException e){
e.printStackTrace();
}finally {
if (lock1.isHeldByCurrentThread())
lock1.unlock();
if (lock2.isHeldByCurrentThread())
lock2.unlock();
System.out.println(Thread.currentThread().getId() + "线程中断");
}
}
public static void main(String[] args) throws InterruptedException {
DeadLock deadLock1 = new DeadLock(1);
DeadLock deadLock2 = new DeadLock(2);
// 线程1, 线程2分别去获取lock1, lock2. 导致死锁
Thread t1 = new Thread(deadLock1);
Thread t2 = new Thread(deadLock2);
t1.start();
t2.start();
Thread.sleep(1000);
// 死锁检查, 触发中断
DeadlockChecker.check();
}
}
public class DeadlockChecker {
private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
final static Runnable deadLockCheck = new Runnable() {
@Override
public void run() {
while (true) {
long[] deadlockedThreadlds = mbean.findDeadlockedThreads();
if (deadlockedThreadlds != null) {
ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadlds);
for (Thread t : Thread.getAllStackTraces().keySet()) {
for (int i = 0; i < threadInfos.length; i++) {
if (t.getId() == threadInfos[i].getThreadId()) {
t.interrupt();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}
}
}
}
}
}
};
public static void check() {
Thread t = new Thread(deadLockCheck);
t.setDaemon(true);
t.start();
}
}
Condition
类似于 Object.wait() 和 Object.notify(), 需要与 ReentrantLock 结合使用
// await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,
// 线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。
void await() throws InterruptedException;
// awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。
// 这和Obejct.notify()方法很类似。
void signal();
void signalAll();
public class ReenterLockCondition implements Runnable{
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
@Override
public void run() {
try {
lock.lock();
condition.await();
System.out.println("Thread is going on");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 注意放到finally中释放
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLockCondition t1 = new ReenterLockCondition();
Thread tt = new Thread(t1);
tt.start();
Thread.sleep(2000);
System.out.println("after sleep, signal!");
// 通知线程tt继续执行. 唤醒同样需要重新获得锁
lock.lock();
condition.signal();
lock.unlock();
}
}
Semaphore 信号量
锁一般都是互斥排他的, 而信号量可以认为是一个共享锁,允许N个线程同时进入临界区, 但是超出许可范围的只能等待.如果N = 1, 则类似于lock.具体API如下, 通过acquire获取信号量, 通过release释放。
public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()
模拟20个线程, 但是信号量只设置了5个许可.
因此线程是按序每2秒5个的打印job done.
public class SemapDemo implements Runnable{
// 设置5个许可
final Semaphore semp = new Semaphore(5);
@Override
public void run() {
try {
semp.acquire();
// 模拟线程耗时操作
Thread.sleep(2000L);
System.out.println("Job done! " + Thread.currentThread().getId());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semp.release();
}
}
public static void main(String[] args){
ExecutorService service = Executors.newFixedThreadPool(20);
final SemapDemo demo = new SemapDemo();
for (int i = 0; i < 20; i++) {
service.submit(demo);
}
}
}