第四章、 Java 的多线程和并发库

3.3 ConcurrentHashMap 非阻塞 Hash 集合

 - 在JDK1.7主要通过定义 Segment 分段锁来实现多个 线程 对 ConcurrentHashMap 的数据的并发读写操作。整个ConcurrentHashMap 由多个 Segment 组成,每个 Segment保存整个 ConcurrentHashMap 的一部分数据,Segment 结合 Reentrant lock  ,即 Segment 继承于 ReentrantLock ,来实现写互斥,读共享,具体为有多少个 Segment ,则任何时候可以最多支持这么多个线程同时进行写操作,任意多个线程进行读操作。在 ConcurrentHashMap 的Segment实现中,对写操作使用 ReentrantLock 来进行加锁,读操作不加锁,通过volatile 来实现线程之间的可见性。
- 在JDK1.8中为了进一步优化 Concurrent HashMap  的性能,去掉了 Segment 分段锁的设计,在数据结构方面,则是跟 HashMap 一样,使用一个 哈希表  table 数组,即数组 + 链表或者数组 + 红黑树。而线程安全方面是结合 CAS 机制来实现的,CAS 底层依赖JDK的 UNSAFE 所提供的硬件级别的 原子操作 。同时在 HashMap 的基础上,对哈希表table 数组和链表节点的 value,next 指针等使用 volatile 来修饰,从而实现线程可见性。


 segment :分段锁
1、在HashMap中,是使用一个哈希表,即元素为链表结点 Node 组成的数组 table,而在 ConcurrentHashMap 中是使用多个哈希表,具体为通过定义一个Segment来封装这个哈希表其中 Segment 继承于 ReentrantLock,故自带 lock 的功能。即每个 Segment 其实就是相当于一个 HashMap,只是结合使用了 ReentrantLock 来进行并发控制,实现线程安全。
2、Segment 定义如下:ConcurrentHashMap 的一个静态内部类,继承于 ReentrantLock,在内部定义了一个HashEntry 数组 table,HashEntry 是链表的节点定义,其中 table 使用 volatile 修饰,保证某个线程对table 进行新增链表节点(头结点或者在已经存在的链表新增一个节点)对其他线程可见。  
 * Segments are specialized versions of hash tables.  This
 * subclasses from ReentrantLock opportunistically, just to
 * simplify some locking and avoid separate construction.
 */static final class Segment<K,V> extends ReentrantLock implements Serializable {
     * The maximum number of times to tryLock in a prescan before
     * possibly blocking on acquire in preparation for a locked
     * segment operation. On multiprocessors, using a bounded
     * number of retries maintains cache acquired while locating
     * nodes.
     */    static final int MAX_SCAN_RETRIES =
        Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

     * The per-segment table. Elements are accessed via
     * entryAt/setEntryAt providing volatile semantics.
     */    transient volatile HashEntry<K,V>[] table;

 3、HashEntry 的定义如下:包含key,value,key的hash,所在链表的下一个节点next。

 * ConcurrentHashMap list entry. Note that this is never exported
 * out as a user-visible Map.Entry.
 */static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;


 * The default concurrency level for this table, used when not
 * otherwise specified in a constructor.
 */static final int DEFAULT_CONCURRENCY_LEVEL = 16;

 * The maximum number of segments to allow; used to bound
 * constructor arguments. Must be power of two less than 1 << 24.
 */static final int MAX_SEGMENTS = 1 << 16; // slightly conservative


public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // ssize:segments数组的大小
    // 不能小于concurrencyLevel,默认为16
    while (ssize < concurrencyLevel) {
        ssize <<= 1;
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
    // cap:Segment内部HashEntry数组的大小
    // 实际大小根据c来计算,而c是由上面代码,
    // 根据initialCapacity / ssize得到,
    // 即整体容量大小除以Segment数组的数量,则
    // 得到每个Segment内部的table的大小
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;


 * Maps the specified key to the specified value in this table.
 * Neither the key nor the value can be null.
 * <p> The value can be retrieved by calling the <tt>get</tt> method
 * with a key that is equal to the original key.
 * @param key key with which the specified value is to be associated
 * @param value value to be associated with the specified key
 * @return the previous value associated with <tt>key</tt>, or
 *         <tt>null</tt> if there was no mapping for <tt>key</tt>
 * @throws NullPointerException if the specified key or value is null
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    // 根据key的hash只,确定具体的Segment
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 如果segments数组的该位置还没segment
        // 则数组下标j对应的segment实例
        s = ensureSegment(j);
    // 往该segment实例设值
    return s.put(key, hash, value,  false );


final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // tryLock:非阻塞获取lock,如果当前没有其他线程持有该Segment的锁,则返回null,继续往下执行;
    // scanAndLockForPut:该segment锁被其他线程持有了,则非阻塞重试3次,超过3次则阻塞等待锁。之后返回对应的链表节点。
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        // 链表头结点
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            // 已经存在,则更新value值
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        // 更新value时,也递增modCount,而在HashMap中是结构性修改才递增。
                     break ;
                e = e.next;
            else {
                // 注意新增节点时,是在头部添加的,即最后添加的节点是链表头结点
                // 这个与HashMap是不一样的,HashMap是在链表尾部新增节点。
                if (node != null)
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                // 当前新增的该节点作为链表头结点放在哈希表table数组中
                    setEntryAt(tab, index, node);
                // 递增当前整体的元素个数
                count = c;
                oldValue = null;
    } finally {
        // 释放lock锁
    // 返回旧值
    return oldValue;

 * Scans for a node containing given key while trying to
 * acquire lock, creating and returning one if not found. Upon
 * return, guarantees that lock is held. UNlike in most
 * methods, calls to method equals are not screened: Since
 * traversal speed doesn't matter, we might as well help warm
 * up the associated code and accesses as well.
 * @return a new node if key not found, else null
 */private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    // 非阻塞自旋获取lock锁
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            else if (key.equals(e.key))
                retries = 0;
                e = e.next;
        // MAX_SCAN_RETRIES为2,尝试3次后,则当前线程阻塞等待lock锁
        else if (++retries > MAX_SCAN_RETRIES) {
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
    return node;


 * Returns the value to which the specified key is mapped,
 * or {@code null} if this map contains no mapping for the key.
 * <p>More formally, if this map contains a mapping from a key
 * {@code k} to a value {@code v} such that {@code key.equals(k)},
 * then this method returns {@code v}; otherwise it returns
 * {@code null}.  (There can be at most one such mapping.)
 * @throws NullPointerException if the specified key is null
 */public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 获取segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 通过hash值计算哈希表table数组的下标,从而获取对应链表的头结点
        // 从链表头结点遍历链表
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
    return null;



 * Returns the number of key-value mappings in this map.  If the
 * map contains more than <tt> Integer .MAX_VALUE</tt> elements, returns
 * <tt>Integer.MAX_VALUE</tt>.
 * @return the number of key-value mappings in this map
 */public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    // 所有的segments
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    // 累加modCounts,这个在每次计算都是重置为0
    long sum;         // sum of modCounts
    // 记录前一次累加的modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            // RETRIES_BEFORE_LOCK值为2
            // retries初始值为-1
            // retries++ == RETRIES_BEFORE_LOCK,表示已经是第三次了,故需要加锁,前两次计算modCounts不一样,即期间有写操作。
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                // 每个segment都加锁,此时不能执行写操作了
                    ensureSegment(j).lock(); // force creation
            // sum重置为0
            sum = 0L;
            size = 0;
            overflow = false;
            // 遍历每个segment
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    // 累加各个segment的modCount,以便与上一次的modCount进行比较,
                    // 看在这期间是否对segment修改过
                    sum += seg.modCount;
                    // segment使用count记录该segment的内部的所有链表的所有节点的总个数
                    int c = seg.count;
                    // size为记录所有节点的个数,用于作为返回值,使用size += c来累加
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
            // 前后两次都相等,
            // 则说明在这期间没有写的操作,
            // 故可以直接返回了
            if (sum == last)
            last = sum;
    } finally {
        // retries 大于 RETRIES_BEFORE_LOCK,
        // 说明加锁计算过了,需要释放锁
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
    return overflow ? Integer.MAX_VALUE : size;


2、baseCount 和counterCells:用来记录当前 ConcurrentHashMap 存在多少个元素使用的,在进行增删链表节点时,默认是更新baseCount的值即可,如果同时存在多个线程并发进行对链表节点的增删操作,则放弃更新 baseCount,而是 counterCells 数组中添加一个 CounterCell ,之后在计算 size 的时候,累加 baseCount 和遍历并累加counterCells。

 * The array of bins. Lazily initialized upon first insertion.
 * Size is always a power of two. Accessed directly by iterators.
 */transient volatile Node<K,V>[] table;

 * Base counter value, used mainly when there is no contention,
 * but also as a fallback during table initialization
 * races. Updated via CAS.
 */private transient volatile long baseCount;

/**Table of counter cells. When non-null, size is a power of 2.*/private transient volatile CounterCell[] counterCells;


 * Key-value entry.  This class is never exported out as a
 * user-mutable Map.Entry (i.e., one supporting setValue; see
 * MapEntry below), but can be used for read-only traversals used
 * in bulk tasks.  Subclasses of Node with a negative hash field
 * are special, and contain null keys and values (but are never
 * exported).  Otherwise, keys and vals are never null.
 */static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;

    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
     * Virtualized support for map.get(); overridden in subclasses.
     */    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        return null;

主要定义了获取,更新,添加链表节点Node的方法,具体为基于UNSAFE类提供的硬件级别的原子操作来保证线程安全,而不是通过加锁机制,如 synchronized 关键字,ReentrantLock重入锁来实现,即无锁化。

/* ---------------- Table element access -------------- */
 * Volatile access methods are used for table elements as well as
 * elements of in-progress next table while resizing.  All uses of
 * the tab arguments must be null checked by callers.  All callers
 * also paranoically precheck that tab's length is not zero (or an
 * equivalent check), thus ensuring that any index argument taking
 * the form of a hash value anded with (length - 1) is a valid
 * index.  Note that, to be correct wrt arbitrary concurrency
 * errors by users, these checks must operate on local variables,
 * which accounts for some odd-looking inline assignments below.
 * Note that calls to setTabAt always occur within locked regions,
 * and so in principle require only release ordering, not
 * full volatile semantics, but are currently coded as volatile
 * writes to be conservative.

// 原子获取链表节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);

// CAS更新或新增链表节点
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);

// 原子新增链表节点
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);


 * Maps the specified key to the specified value in this table.
 * Neither the key nor the value can be null.
 * <p>The value can be retrieved by calling the {@code get} method
 * with a key that is equal to the original key.
 * @param key key with which the specified value is to be associated
 * @param value value to be associated with the specified key
 * @return the previous value associated with {@code key}, or
 *         {@code null} if there was no mapping for {@code key}
 * @throws NullPointerException if the specified key or value is null
 */public V put(K key, V value) {
    return putVal(key, value, false);

/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // i为该key在table数组的下标
        // null表示该key对应的链表(具体为链表头结点)
        // 在哈希表table中还不存在
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 新增链表头结点,cas方式添加到哈希表table
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // f为链表头结点,使用synchronized加锁
            // 则整条链表则被加锁了
            synchronized (f) {
                // 再次检查,即double check
                // 即避免进入同步块之前,链表被修改了
                if (tabAt(tab, i) == f) {
                    // hash值大于0
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 节点已经存在,更新value即可
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                            // 该key对应的节点不存在,
                            // 则新增节点并添加到该链表的末尾
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                    // 红黑树节点,
                    // 则往该红黑树更新或添加该节点即可
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
            // 判断是否需要将链表转为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
    // 递增ConcurrentHashMap的节点总个数
    addCount(1L, binCount);
    return null;




 * Returns the value to which the specified key is mapped,
 * or {@code null} if this map contains no mapping for the key.
 * <p>More formally, if this map contains a mapping from a key
 * {@code k} to a value {@code v} such that {@code key.equals(k)},
 * then this method returns {@code v}; otherwise it returns
 * {@code null}.  (There can be at most one such mapping.)
 * @throws NullPointerException if the specified key is null
 */public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    // 获取链表头结点
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 头结点的key相等说明找到了,直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 遍历该链表,查找对应的节点
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
    return null;


 * {@inheritDoc}
 */public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :

/* ---------------- Counter support -------------- */
 * A padded cell for distributing counts.  Adapted from LongAdder
 * and Striped64.  See their internal docs for explanation.
 */@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    // sum初始化为baseCount
    long sum = baseCount;
    if (as != null) {
        // 遍历counterCells并累加其value到sum
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
    return sum;

3、addCount:在put写操作之后,递增baseCount值。在putVal中调用addCount(1L, binCount);,即递增1,在其批量操作中,则可以是批量的数量作为参数,如addCount(100L, binCount)

 * Adds to count, and if table is too small and not already
 * resizing, initiates transfer. If already resizing, helps
 * perform transfer if work is available.  Rechecks occupancy
 * after a transfer to see if another resize is already needed
 * because resizings are lagging additions.
 * @param x the count to add
 * @param check if <0, don't check resize, if <= 1 only check if uncontended
 */private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
        // CAS更新baseCount失败,表示存在并发异常
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // CAS更新失败时,在counterCells数组添加一个counterCell对象
            fullAddCount(x, uncontended);
        if (check <= 1)
        s = sumCount();
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();

// 往counterCells数组添加counterCell对象
// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        if ((as = counterCells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                        } finally {
                            cellsBusy = 0;
                        if (created)
                        continue;           // Slot is now non-empty
                collide = false;
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                } finally {
                    cellsBusy = 0;
                collide = false;
                continue;                   // Retry with expanded table
            h = ThreadLocalRandom.advanceProbe(h);
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
            } finally {
                cellsBusy = 0;
            if (init)
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base

3.4 ConcurrentHashMap(JDK1.7和JDK1.8), HashTable 与Collections.synchronizedMap

HashTable 是同步版本的 HashMap,内部数据结构与 HashMap 一样也都是使用链式哈希来实现的,即数组 + 链表(JDK1.8版本的HashMap新增了数组 + 红黑树的优化)。
链表节点定义:与 HashMap 一样都是继承于Map.Entry:value和next都是不需要使用volatile修饰的,因为synchronized 同时具有线程同步和线程可见性的作用。

 * Hashtable bucket collision list entry
 */private static class Entry<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    V value;
    Entry<K,V> next;

    protected Entry(int hash, K key, V value, Entry<K,V> next) {
        this.hash = hash;
        this.key =  key;
        this.value = value;
        this.next = next;

方法级别的 synchronized 同步:get,put等方法均是使用 synchronized 修饰

// get读操作
public synchronized V get(Object key) {
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
// %取模,而HashMap为通过位运算取模,性能较高
    int index = (hash & 0x7FFFFFFF) % tab.length;
    for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
        if ((e.hash == hash) && e.key.equals(key)) {
            return (V)e.value;
    return null;

// put写操作
public synchronized V put(K key, V value) {
    // Make sure the value is not null
    if (value == null) {
        throw new NullPointerException();

    // Makes sure the key is not already in the hashtable.
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;

// 更新
    Entry<K,V> entry = (Entry<K,V>)tab[index];
    for(; entry != null ; entry = entry.next) {
        if ((entry.hash == hash) && entry.key.equals(key)) {
            V old = entry.value;
            entry.value = value;
            return old;

// 新增节点
    addEntry(hash, key, value, index);
    return null;

private void addEntry(int hash, K key, V value, int index) {

    Entry<?,?> tab[] = table;
    if (count >= threshold) {
        // Rehash the table if the threshold is exceeded

        tab = table;
        hash = key.hashCode();
        index = (hash & 0x7FFFFFFF) % tab.length;

    // Creates the new entry.
    Entry<K,V> e = (Entry<K,V>) tab[index];
    tab[index] = new Entry<>(hash, key, value, e);

如果需要使用线程安全的 HashMap,则优先考虑使用ConcurrentHashMap,或者使用 Collections.synchronizedMap 来将HashMap 包装成线程安全的SynchronizedMap。因为ConcurrentHashMap和HashMap的实现更加高效,如根据key的hash计算哈希表table数组的下标的实现,HashTable 是使用%取余,而ConcurrentHashMap和HashMap都是使用位运算,效率更高,所以整体性能高于HashTable。

Collections.synchronizedMap 的方法定义如下:将传入的Map使用 SynchronizedMap 包装后返回一个SynchronizedMap实例,SynchronizedMap是线程安全的,这是一种包装器设计模式的使用。
 * Returns a synchronized (thread-safe) map backed by the specified
 * map.  In order to guarantee serial access, it is critical that
 * <strong>all</strong> access to the backing map is accomplished
 * through the returned map.<p>
 * It is imperative that the user manually synchronize on the returned
 * map when iterating over any of its collection views:
 * <pre>
 *  Map m = Collections.synchronizedMap(new HashMap());
 *      ...
 *  Set s = m.keySet();  // Needn't be in synchronized block
 *      ...
 *  synchronized (m) {  // Synchronizing on m, not s!
 *      Iterator i = s.iterator(); // Must be in synchronized block
 *      while (i.hasNext())
 *          foo(i.next());
 *  }
 * </pre>
 * Failure to follow this advice may result in non-deterministic behavior.
 * <p>The returned map will be serializable if the specified map is
 * serializable.
 * @param <K> the class of the map keys
 * @param <V> the class of the map values
 * @param  m the map to be "wrapped" in a synchronized map.
 * @return a synchronized view of the specified map.
 */public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
    return new SynchronizedMap<>(m);

SynchronizedMap的定义:SynchronizedMap 的实现主要是在内部定义了一个被包装的map的引用,一个对象锁mutex;


private static class SynchronizedMap<K,V>
    implements Map<K,V>, Serializable {
    private static final long serialVersionUID = 1978198479659022715L;

    private final Map<K,V> m;     // Backing Map
    final Object      mutex;        // Object on which to synchronize

    SynchronizedMap(Map<K,V> m) {
        this.m = Objects.requireNonNull(m);
        mutex = this;

    SynchronizedMap(Map<K,V> m, Object mutex) {
        this.m = m;
        this.mutex = mutex;

    public int size() {
        synchronized (mutex) {return m.size();}
    public boolean isEmpty() {
        synchronized (mutex) {return m.isEmpty();}
    public boolean containsKey(Object key) {
        synchronized (mutex) {return m.containsKey(key);}
    public boolean containsValue(Object value) {
        synchronized (mutex) {return m.containsValue(value);}
    public V get(Object key) {
        synchronized (mutex) {return m.get(key);}

    public V put(K key, V value) {
        synchronized (mutex) {return m.put(key, value);}
    public V remove(Object key) {
        synchronized (mutex) {return m.remove(key);}
    public void putAll(Map<? extends K, ? extends V> map) {
        synchronized (mutex) {m.putAll(map);}
    public void clear() {
        synchronized (mutex) {m.clear();}


3.5 AtomicBoolean 原子性布尔

 AtomicBoolean 是 java.util.concurrent.atomic 包下的原子变量,这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由 JVM 从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。
AtomicBoolean,在这个 Boolean 值的变化的时候不允许在之间插入,保持操作的原子性。
boolean compareAndSet(expectedValue, updateValue), 这个方法主要两个作用:
1.比较 AtomicBoolean 和 expect 的值,如果一致,执行方法内的语句。其实就是一个 if 语句
2.把 AtomicBoolean 的值设成 update,比较最要的是这两件事是一气呵成的,这连个动作之间不会被打断,任何内部或者外部的语句都不可能在两个动作之间运行。为多线程的控制提供了解决的方案下面我们从代码上解释。

1、获得 AtomicBoolean 的值
你可以通过使用 get() 方法来获取一个 AtomicBoolean 的值。
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean value = atomicBoolean.get();

2、设置 AtomicBoolean 的值
你可以通过使用 set() 方法来设置一个 AtomicBoolean 的值。
AtomicBoolean atomicBoolean = new AtomicBoolean(true);

以上代码执行后 AtomicBoolean 的值为 false。

3、交换 AtomicBoolean 的值
你可以通过 getAndSet() 方法来交换一个 AtomicBoolean 实例的值 getAndSet() 方法将返回 AtomicBoolean 当前的值,并将为 AtomicBoolean 设置一个新值。
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean oldValue = atomicBoolean.getAndSet(false);

以上代码执行后 oldValue 变量的值为true,atomicBoolean 实例将持有false值。代码成功将AtomicBoolean 当前值 ture 交换为 false。

4、比较并设置 AtomicBoolean 的值
compareAndSet() 方法允许你对 AtomicBoolean 的当前值与一个期望值进行比较,如果当前值等于期望值的话,将会对 AtomicBoolean 设定一个新值。 compareAndSet() 方法是原子性的,因此在同一时间之内有单个线程执行它。因此 compareAndSet() 方法可被用于一些类似 于锁的同步的简单实现。
以下一个compareAndSet() 示例:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean expectedValue = true;
boolean newValue = false;
boolean wasNewValueSet = atomicBoolean.compareAndSet(expectedValue, newValue);

本示例对 AtomicBoolean 的当前值与 true 值进行比较,如果相等,将 AtomicBoolean 的值更新为 false

3.6 AtomicInteger 原子性整型

 AtomicInteger,一个提供原子操作的 Integer 的类。在 Java 语言中, ++i 和 i++操作并不是线程安全的,
在使用的时候,不可避免的会用到 synchronized 关键字。而 AtomicInteger 则通过一种线程安全的加减操

public final int addAndGet(int delta)
public final boolean compareAndSet(int expect, int update)
public final void lazySet(int newValue)
public final int get()
public final int getAndSet(int newValue)
public final int getAndIncrement()
public final int getAndAdd(int delta)
public final int incrementAndget()
public final int decrementAndGet()
public final int getAndDecrement()  

3.7 AtomicIntegerArray 原子性整型数组

 AtomicIntegerArray 类提供了可以以原子方式读取和写入的底层 int 数组的操作,还包含高级原子操作。AtomicIntegerArray 支持对底层 int 数组变量的原子操作。 它具有获取和设置方法,如在变量上的读取和写入。也就是说,一个集合与同一变量上的任何后续 get 相关联。 原子 compareAndSet 方法也具有这些内存一致性功能。AtomicIntegerArray 本质上是对 int[]类型的封装。使用 Unsafe 类通过 CAS 的方式控制 int[]在多线程下的安全性。

//获得数组第 i 个下标的元素
public final int get(int i)
public final int length()
//将数组第 i 个下标设置为 newValue,并返回旧的值
public final int getAndSet(int i, int newValue)
//进行 CAS 操作,如果第 i 个下标的元素等于 expect,则设置为 update,设置成功返回 true
public final boolean compareAndSet(int i, int expect, int update)
//将第 i 个下标的元素加 1
public final int getAndIncrement(int i)
//将第 i 个下标的元素减 1
public final int getAndDecrement(int i)
//将第 i 个下标的元素增加 delta(delta 可以是负数)
public final int getAndAdd(int i, int delta)  

3.8 AtomicLong、 AtomicLongArray 原子性整型数组

 AtomicLong、 AtomicLongArray 的 API 跟 AtomicInteger、 AtomicIntegerArray 在使用方法都是差不多的。区别在于用前者是使用原子方式更新的 long 值和 long 数组,后者是使用原子方式更新的 Integer 值和 Integer 数组。两者的相同处在于它们此类确实扩展了 Number,允许那些处理基于数字类的工具和实用工具进行统一访问。在实际开发中,它们分别用于不同的场景。

public class Test {

    public static void main(String[] agrs) {
        final AtomicLong orderIdGenerator = new AtomicLong(0);
        final List<Item> orders = Collections.synchronizedList(new ArrayList<Item>());
        for (int i = 0; i < 10; i++) {
            Thread orderCreationThread = new Thread(new Runnable() {
                public void run() {
                    for (int i = 0; i < 10; i++) {
                        long orderId = orderIdGenerator.incrementAndGet();
                        Item order = new Item(Thread.currentThread().getName(), orderId);
            orderCreationThread.setName("Order Creation Thread " + i);
        try {
        } catch (InterruptedException e) {
        Set<Long> orderIds = new HashSet<Long>();
        for (Item order : orders) {
            System.out.println("Order name:" + order.getItemName() + "----" + "Order id:" + order.getID());

class Item {
    String itemName;
    long id;

    Item(String n, long id) {
        this.itemName = n;
        this.id = id;

    public String getItemName() {
        return itemName;

    public long getID() {
        return id;

Order name:Order Creation Thread 0----Order id:1
Order name:Order Creation Thread 1----Order id:2
Order name:Order Creation Thread 0----Order id:4
Order name:Order Creation Thread 1----Order id:5
Order name:Order Creation Thread 3----Order id:3
Order name:Order Creation Thread 0----Order id:7
Order name:Order Creation Thread 1----Order id:6
Order name:Order Creation Thread 2----Order id:100  

3.9 ReentrantLock重入锁

 可重入: 单线程可以重复进入,但要重复退出
可中断: lock.lockInterruptibly()
可限时: 超时不能获得锁,就返回 false,不会永久等待构成死锁
公平锁: 先来先得, public ReentrantLock(boolean fair), 默认锁不公平的, 根据线程优先级竞争

public class ReenterLock implements Runnable {
    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;

    public void run() {
        for (int j = 0; j < 10000; j++) {
             // 超时设置
//            lock.tryLock(5, TimeUnit.SECONDS);
            try {
            } finally {
                // 需要放在finally里释放, 如果上面lock了两次, 这边也要unlock两次

    public static void main(String[] args) throws InterruptedException {
        ReenterLock tl = new ReenterLock();
        Thread t1 = new Thread(tl);
        Thread t2 = new Thread(tl);

线程1, 线程2分别去获取lock1, lock2, 触发死锁. 最终通过 DeadlockChecker 来触发线程中断.

public class DeadLock implements Runnable{

    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;

    public DeadLock(int lock) {
        this.lock = lock;

    public void run() {
        try {
            if (lock == 1){
                try {
                }catch (InterruptedException e){}

            }else {
                try {
                }catch (InterruptedException e){}

        }catch (InterruptedException e){
        }finally {
            if (lock1.isHeldByCurrentThread())
            if (lock2.isHeldByCurrentThread())
            System.out.println(Thread.currentThread().getId() + "线程中断");

    public static void main(String[] args) throws InterruptedException {
        DeadLock deadLock1 = new DeadLock(1);
        DeadLock deadLock2 = new DeadLock(2);
        // 线程1, 线程2分别去获取lock1, lock2. 导致死锁
        Thread t1 = new Thread(deadLock1);
        Thread t2 = new Thread(deadLock2);
        // 死锁检查, 触发中断


public class DeadlockChecker {
    private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
    final static Runnable deadLockCheck = new Runnable() {
        public void run() {
            while (true) {
                long[] deadlockedThreadlds = mbean.findDeadlockedThreads();

                if (deadlockedThreadlds != null) {
                    ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadlds);
                    for (Thread t : Thread.getAllStackTraces().keySet()) {
                        for (int i = 0; i < threadInfos.length; i++) {
                            if (t.getId() == threadInfos[i].getThreadId()) {
                                try {
                                } catch (InterruptedException e) {

    public static void check() {
        Thread t = new Thread(deadLockCheck);

类似于 Object.wait() 和 Object.notify(), 需要与 ReentrantLock 结合使用

// await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,
// 线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。
void await() throws InterruptedException;
// awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。
// 这和Obejct.notify()方法很类似。
void signal();
void signalAll();

public class ReenterLockCondition implements Runnable{

    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();

    public void run() {
        try {
            System.out.println("Thread is going on");
        } catch (InterruptedException e) {
        } finally {
            // 注意放到finally中释放

    public static void main(String[] args) throws InterruptedException {
        ReenterLockCondition t1 = new ReenterLockCondition();
        Thread tt = new Thread(t1);
        System.out.println("after sleep, signal!");
        // 通知线程tt继续执行. 唤醒同样需要重新获得锁

Semaphore 信号量

锁一般都是互斥排他的, 而信号量可以认为是一个共享锁,允许N个线程同时进入临界区, 但是超出许可范围的只能等待.如果N = 1, 则类似于lock.具体API如下, 通过acquire获取信号量, 通过release释放。

public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()

模拟20个线程, 但是信号量只设置了5个许可.
因此线程是按序每2秒5个的打印job done.

public class SemapDemo implements Runnable{

    // 设置5个许可
    final Semaphore semp = new Semaphore(5);

    public void run() {
        try {
            // 模拟线程耗时操作
            System.out.println("Job done! " + Thread.currentThread().getId());
        } catch (InterruptedException e) {
        } finally {

    public static void main(String[] args){
        ExecutorService service = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {




