您的位置 首页 java

Java面试大全(六)

第四章、 Java 的多线程和并发库

3.3 ConcurrentHashMap 非阻塞 Hash 集合

 - 在JDK1.7主要通过定义 Segment 分段锁来实现多个 线程 对 ConcurrentHashMap 的数据的并发读写操作。整个ConcurrentHashMap 由多个 Segment 组成,每个 Segment保存整个 ConcurrentHashMap 的一部分数据,Segment 结合 Reentrant lock  ,即 Segment 继承于 ReentrantLock ,来实现写互斥,读共享,具体为有多少个 Segment ,则任何时候可以最多支持这么多个线程同时进行写操作,任意多个线程进行读操作。在 ConcurrentHashMap 的Segment实现中,对写操作使用 ReentrantLock 来进行加锁,读操作不加锁,通过volatile 来实现线程之间的可见性。
- 在JDK1.8中为了进一步优化 Concurrent HashMap  的性能,去掉了 Segment 分段锁的设计,在数据结构方面,则是跟 HashMap 一样,使用一个 哈希表  table 数组,即数组 + 链表或者数组 + 红黑树。而线程安全方面是结合 CAS 机制来实现的,CAS 底层依赖JDK的 UNSAFE 所提供的硬件级别的 原子操作 。同时在 HashMap 的基础上,对哈希表table 数组和链表节点的 value,next 指针等使用 volatile 来修饰,从而实现线程可见性。

———————————————

ConcurrentHashMap(JDK1.7版本)
 segment :分段锁
1、在HashMap中,是使用一个哈希表,即元素为链表结点 Node 组成的数组 table,而在 ConcurrentHashMap 中是使用多个哈希表,具体为通过定义一个Segment来封装这个哈希表其中 Segment 继承于 ReentrantLock,故自带 lock 的功能。即每个 Segment 其实就是相当于一个 HashMap,只是结合使用了 ReentrantLock 来进行并发控制,实现线程安全。
2、Segment 定义如下:ConcurrentHashMap 的一个静态内部类,继承于 ReentrantLock,在内部定义了一个HashEntry 数组 table,HashEntry 是链表的节点定义,其中 table 使用 volatile 修饰,保证某个线程对table 进行新增链表节点(头结点或者在已经存在的链表新增一个节点)对其他线程可见。  
 /**
 * Segments are specialized versions of hash tables.  This
 * subclasses from ReentrantLock opportunistically, just to
 * simplify some locking and avoid separate construction.
 */static final class Segment<K,V> extends ReentrantLock implements Serializable {
    
    ...
    
    /**
     * The maximum number of times to tryLock in a prescan before
     * possibly blocking on acquire in preparation for a locked
     * segment operation. On multiprocessors, using a bounded
     * number of retries maintains cache acquired while locating
     * nodes.
     */    static final int MAX_SCAN_RETRIES =
        Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

    /**
     * The per-segment table. Elements are accessed via
     * entryAt/setEntryAt providing volatile semantics.
     */    transient volatile HashEntry<K,V>[] table;

    ...
    
}  
 3、HashEntry 的定义如下:包含key,value,key的hash,所在链表的下一个节点next。

/**
 * ConcurrentHashMap list entry. Note that this is never exported
 * out as a user-visible Map.Entry.
 */static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
    
    ...
    
}

由定义可知,value和next均为使用volatile修饰,当多个线程共享该HashEntry所在的Segment时,其中一个线程对该Segment内部的某个链表节点HashEntry的value或下一个节点next修改能够对其他线程可见。而hash和key均使用final修饰,因为创建一个链表节点HashEntry,是根据key的hash值来确定附加到哈希表数组table的某个链表的,即根据hash计算对应的table数组的下标,故一旦添加后是不可变的。  
 Segment的哈希表table数组的容量
1、MIN_SEGMENT_TABLE_CAPACITY:table数组的容量最小量,默认为2。

/**
 * The default concurrency level for this table, used when not
 * otherwise specified in a constructor.
 */static final int DEFAULT_CONCURRENCY_LEVEL = 16;

 /**
 * The maximum number of segments to allow; used to bound
 * constructor arguments. Must be power of two less than 1 << 24.
 */static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

在ConcurrentHashMap的构造函数定义实际大小:使用ConcurrentHashMap的整体容量initialCapacity除以Segments数组的大小,得到每个Segment内部的table数组的实际大小。

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    
    // ssize:segments数组的大小
    // 不能小于concurrencyLevel,默认为16
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
        
    // cap:Segment内部HashEntry数组的大小
    // 最小为MIN_SEGMENT_TABLE_CAPACITY,默认为2
    // 实际大小根据c来计算,而c是由上面代码,
    // 根据initialCapacity / ssize得到,
    // 即整体容量大小除以Segment数组的数量,则
    // 得到每个Segment内部的table的大小
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

并发性控制
ConcurrentHashMap通过多个Segment,即内部包含一个Segment数组,来支持多个线程同时分别对这些Segment进行读写操作,从而提高并发性。
并发性:默认通过DEFAULT_CONCURRENCY_LEVEL来定义Segment的数量,默认为16,即创建大小为16的Segment数组,这样在任何时刻最多可以支持16个线程同时对ConcurrentHashMap进行写操作,即每个Segment都可以有一个线程在进行写操作。也可以在ConcurrentHashMap的构造函数中指定concurrencyLevel的值。
MAX_SEGMENTS:定义Segment数组的容量最大值,默认值为2的16次方。

/**
 * The default concurrency level for this table, used when not
 * otherwise specified in a constructor.
 */static final int DEFAULT_CONCURRENCY_LEVEL = 16;

 /**
 * The maximum number of segments to allow; used to bound
 * constructor arguments. Must be power of two less than 1 << 24.
 */static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

put写操作
首先通过key的hash确定segments数组的下标,即需要往哪个segment存放数据。确定好segment之后,则调用该segment的put方法,写到该segment内部的哈希表table数组的某个链表中,链表的确定也是根据key的hash值和segment内部table大小取模。
在ConcurrentHashMap中的put操作是没有加锁的,而在Segment中的put操作,通过ReentrantLock加锁:

/**
 * Maps the specified key to the specified value in this table.
 * Neither the key nor the value can be null.
 *
 * <p> The value can be retrieved by calling the <tt>get</tt> method
 * with a key that is equal to the original key.
 *
 * @param key key with which the specified value is to be associated
 * @param value value to be associated with the specified key
 * @return the previous value associated with <tt>key</tt>, or
 *         <tt>null</tt> if there was no mapping for <tt>key</tt>
 * @throws NullPointerException if the specified key or value is null
 */@SuppressWarnings("unchecked")
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    
    // 根据key的hash只,确定具体的Segment
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        
        // 如果segments数组的该位置还没segment
        // 则数组下标j对应的segment实例
        s = ensureSegment(j);
    
    // 往该segment实例设值
    return s.put(key, hash, value,  false );
}

Segment类的put操作定义:首先获取lock锁,然后根据key的hash值,获取在segment内部的HashEntry数组table的下标,从而获取对应的链表,具体为链表头。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    
    // tryLock:非阻塞获取lock,如果当前没有其他线程持有该Segment的锁,则返回null,继续往下执行;
    // scanAndLockForPut:该segment锁被其他线程持有了,则非阻塞重试3次,超过3次则阻塞等待锁。之后返回对应的链表节点。
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        
        // 链表头结点
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
        
            // 已经存在,则更新value值
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        // 更新value时,也递增modCount,而在HashMap中是结构性修改才递增。
                        ++modCount;
                    }
                     break ;
                }
                e = e.next;
            }
            else {
                // 注意新增节点时,是在头部添加的,即最后添加的节点是链表头结点
                // 这个与HashMap是不一样的,HashMap是在链表尾部新增节点。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                // 当前新增的该节点作为链表头结点放在哈希表table数组中
                    setEntryAt(tab, index, node);
                ++modCount;
                // 递增当前整体的元素个数
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 释放lock锁
        unlock();
    }
    // 返回旧值
    return oldValue;
}
scanAndLockForPut的实现:获取该segment锁,从而可以进行put写操作,同时获取当前需要写的值对应的HashEntry链表节点,查找获取之前已经存在的或者创建一个新的。

/**
 * Scans for a node containing given key while trying to
 * acquire lock, creating and returning one if not found. Upon
 * return, guarantees that lock is held. UNlike in most
 * methods, calls to method equals are not screened: Since
 * traversal speed doesn't matter, we might as well help warm
 * up the associated code and accesses as well.
 *
 * @return a new node if key not found, else null
 */private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    
    // 非阻塞自旋获取lock锁
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        
        // MAX_SCAN_RETRIES为2,尝试3次后,则当前线程阻塞等待lock锁
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

get读操作
获取指定的key对应的value,get读操作是不用获取lock锁的,即不加锁的,通过使用UNSAFE的volatile版本的方法保证线程可见性。实现如下:

/**
 * Returns the value to which the specified key is mapped,
 * or {@code null} if this map contains no mapping for the key.
 *
 * <p>More formally, if this map contains a mapping from a key
 * {@code k} to a value {@code v} such that {@code key.equals(k)},
 * then this method returns {@code v}; otherwise it returns
 * {@code null}.  (There can be at most one such mapping.)
 *
 * @throws NullPointerException if the specified key is null
 */public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    
    // 获取segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        
        // 通过hash值计算哈希表table数组的下标,从而获取对应链表的头结点
        // 从链表头结点遍历链表
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

1、通过key的hash值,调用UNSAFE.getObjectVolatile方法,由于是volatile版本,可以实现线程之间的可见性(遵循happend-before原则),故可以从最新的segments数组中获取该key所在的segment;
2、然后根据key的hash值,获取该segment内部的哈希表table数组的下标,从而获取该key所在的链表的头结点,然后从链表头结点开始遍历该链表,最终如果没有找到,则返回null或者找到对应的链表节点,返回该链表节点的value。

size容量计算
size方法主要是计算当前hashmap中存放的元素的总个数,即累加各个segments的内部的哈希表table数组内的所有链表的所有链表节点的个数。
实现逻辑为:整个计算过程刚开始是不对segments加锁的,重复计算两次,如果前后两次hashmap都没有修改过,则直接返回计算结果,如果修改过了,则再加锁计算一次。

/**
 * Returns the number of key-value mappings in this map.  If the
 * map contains more than <tt> Integer .MAX_VALUE</tt> elements, returns
 * <tt>Integer.MAX_VALUE</tt>.
 *
 * @return the number of key-value mappings in this map
 */public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    
    // 所有的segments
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    
    // 累加modCounts,这个在每次计算都是重置为0
    long sum;         // sum of modCounts
    
    // 记录前一次累加的modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
        
            // RETRIES_BEFORE_LOCK值为2
            // retries初始值为-1
            // retries++ == RETRIES_BEFORE_LOCK,表示已经是第三次了,故需要加锁,前两次计算modCounts不一样,即期间有写操作。
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                // 每个segment都加锁,此时不能执行写操作了
                    ensureSegment(j).lock(); // force creation
            }
            // sum重置为0
            sum = 0L;
            size = 0;
            overflow = false;
            
            // 遍历每个segment
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                
                    // 累加各个segment的modCount,以便与上一次的modCount进行比较,
                    // 看在这期间是否对segment修改过
                    sum += seg.modCount;
                    
                    // segment使用count记录该segment的内部的所有链表的所有节点的总个数
                    int c = seg.count;
                    // size为记录所有节点的个数,用于作为返回值,使用size += c来累加
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            
            // 前后两次都相等,
            // 则说明在这期间没有写的操作,
            // 故可以直接返回了
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
    
        // retries 大于 RETRIES_BEFORE_LOCK,
        // 说明加锁计算过了,需要释放锁
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

ConcurrentHashMap(JDK1.8版本)

核心字段
1、哈希表table数组,如下与HashMap一样也是使用一个Node类型的数组table来定义的,不同之处是使用volatile修饰该数组。
2、baseCount 和counterCells:用来记录当前 ConcurrentHashMap 存在多少个元素使用的,在进行增删链表节点时,默认是更新baseCount的值即可,如果同时存在多个线程并发进行对链表节点的增删操作,则放弃更新 baseCount,而是 counterCells 数组中添加一个 CounterCell ,之后在计算 size 的时候,累加 baseCount 和遍历并累加counterCells。

/**
 * The array of bins. Lazily initialized upon first insertion.
 * Size is always a power of two. Accessed directly by iterators.
 */transient volatile Node<K,V>[] table;

/**
 * Base counter value, used mainly when there is no contention,
 * but also as a fallback during table initialization
 * races. Updated via CAS.
 */private transient volatile long baseCount;

/**Table of counter cells. When non-null, size is a power of 2.*/private transient volatile CounterCell[] counterCells;

链表节点Node的定义:value,next使用volatile修饰保证线程可见性。

/**
 * Key-value entry.  This class is never exported out as a
 * user-mutable Map.Entry (i.e., one supporting setValue; see
 * MapEntry below), but can be used for read-only traversals used
 * in bulk tasks.  Subclasses of Node with a negative hash field
 * are special, and contain null keys and values (but are never
 * exported).  Otherwise, keys and vals are never null.
 */static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }

    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
 
    ...
    
    /**
     * Virtualized support for map.get(); overridden in subclasses.
     */    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

核心方法
UNSAFE:硬件级别的原子操作
主要定义了获取,更新,添加链表节点Node的方法,具体为基于UNSAFE类提供的硬件级别的原子操作来保证线程安全,而不是通过加锁机制,如 synchronized 关键字,ReentrantLock重入锁来实现,即无锁化。

/* ---------------- Table element access -------------- */
/*
 * Volatile access methods are used for table elements as well as
 * elements of in-progress next table while resizing.  All uses of
 * the tab arguments must be null checked by callers.  All callers
 * also paranoically precheck that tab's length is not zero (or an
 * equivalent check), thus ensuring that any index argument taking
 * the form of a hash value anded with (length - 1) is a valid
 * index.  Note that, to be correct wrt arbitrary concurrency
 * errors by users, these checks must operate on local variables,
 * which accounts for some odd-looking inline assignments below.
 * Note that calls to setTabAt always occur within locked regions,
 * and so in principle require only release ordering, not
 * full volatile semantics, but are currently coded as volatile
 * writes to be conservative.
 */
@SuppressWarnings("unchecked")

// 原子获取链表节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// CAS更新或新增链表节点
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// 原子新增链表节点
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

put写操作
写操作主要在putVal方法定义,实现逻辑与HashMap的putVal基本一致,只是相关操作,如获取链表节点,更新链表节点的值value和新增链表节点,都会使用到UNSAFE提供的硬件级别的原子操作,而如果是更新链表节点的值或者在一个已经存在的链表中新增节点,则是通过synchronized同步锁来实现线程安全性。

/**
 * Maps the specified key to the specified value in this table.
 * Neither the key nor the value can be null.
 *
 * <p>The value can be retrieved by calling the {@code get} method
 * with a key that is equal to the original key.
 *
 * @param key key with which the specified value is to be associated
 * @param value value to be associated with the specified key
 * @return the previous value associated with {@code key}, or
 *         {@code null} if there was no mapping for {@code key}
 * @throws NullPointerException if the specified key or value is null
 */public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        
        // i为该key在table数组的下标
        // null表示该key对应的链表(具体为链表头结点)
        // 在哈希表table中还不存在
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        
            // 新增链表头结点,cas方式添加到哈希表table
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            
            // f为链表头结点,使用synchronized加锁
            // 则整条链表则被加锁了
            synchronized (f) {
            
                // 再次检查,即double check
                // 即避免进入同步块之前,链表被修改了
                if (tabAt(tab, i) == f) {
                
                    // hash值大于0
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                 
                                // 节点已经存在,更新value即可
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            
                            // 该key对应的节点不存在,
                            // 则新增节点并添加到该链表的末尾
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    
                    // 红黑树节点,
                    // 则往该红黑树更新或添加该节点即可
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            
            // 判断是否需要将链表转为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    
    // 递增ConcurrentHashMap的节点总个数
    addCount(1L, binCount);
    return null;
}

1、如果当前需要put的key对应的链表在哈希表table中还不存在,即还没添加过该key的hash值对应的链表,则调用UNSAFE的casTabAt方法,基于CAS机制来实现添加该链表头结点到哈希表table中,避免该线程在添加该链表头结的时候,其他线程也在添加的并发问题;如果CAS失败,则进行自旋,通过继续第2步的操作;
2、如果需要添加的链表已经存在哈希表table中,则通过UNSAFE的tabAt方法,基于volatile机制,获取当前最新的链表头结点f,由于f指向的是ConcurrentHashMap的哈希表table的某条链表的头结点,故虽然f是临时变量,由于是引用共享的该链表头结点,所以可以使用synchronized关键字来同步多个线程对该链表的访问。在synchronized(f)同步块里面,则是与HashMap一样遍历该链表,如果该key对应的链表节点已经存在,则更新,否则在链表的末尾新增该key对应的链表节点。
3、使用synchronized同步锁的原因:因为如果该key对应的节点所在的链表已经存在的情况下,可以通过UNSAFE的tabAt方法基于volatile获取到该链表最新的头节点,但是需要通过遍历该链表来判断该节点是否存在,如果不使用synchronized对链表头结点进行加锁,则在遍历过程中,其他线程可能会添加这个节点,导致重复添加的并发问题。故通过synchronized锁住链表头结点的方式,保证任何时候只存在一个线程对该链表进行更新操作。
4、锁的范围缩小:相对于JDK1.7的Segment分段锁,即分段锁的写操作,在操作之前需要先获取lock锁,即不管是(1)链表不存在,添加链表头结点,(2)还是更新链表节点,(3)还是在已经存在的链表中添加节点,都需要先获取lock锁,而在JDK1.8的写操作中,(1)如果该链表不存在,添加链表头的时候是不需要加锁的,因为是往哈希表table数组的某个位置填充值,不需要遍历链表之类的,所以可以基于UNSAFE的casTabAt方法,即CAS机制检查table数组的该位置是否存在元素(链表头结点)来实现线程安全,这是写操作最先检查的;如果该链表已经存在,即(2)(3)则需要通过synchronized来锁住该链表头结点,而在JDK1.7的实现中是锁住该Segment内部的整个哈希表table数组,所以这里也是一个性能提升的地方,缩小了锁的范围。

get读操作
get读操作由于是从哈希表中查找并读取链表节点数据,不会对链表进行写更新操作,故基于volatile的happend-before原则保证的线程可见性(即一个线程的操作对其他线程可见),即可保证get读取到该key对应的最新链表节点,整个过程不需要进行加锁。

具体为table和Node的value和next均是volatile修饰。

/**
 * Returns the value to which the specified key is mapped,
 * or {@code null} if this map contains no mapping for the key.
 *
 * <p>More formally, if this map contains a mapping from a key
 * {@code k} to a value {@code v} such that {@code key.equals(k)},
 * then this method returns {@code v}; otherwise it returns
 * {@code null}.  (There can be at most one such mapping.)
 *
 * @throws NullPointerException if the specified key is null
 */public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    
    // 获取链表头结点
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        
        // 头结点的key相等说明找到了,直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        
        // 遍历该链表,查找对应的节点
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

size容量计算
1、size方法为计算当前ConcurrentHashMap一共存在多少链表节点,与JDK1.7中每次需要遍历segments数组来计算不同的是,在JDK1.8中,使用baseCount和counterCells数组,在增删链表节点时,实时更新来统计,在size方法中直接返回即可。整个过程不需要加锁。
2、并发修改异常处理:CounterCell的value值为1,作用是某个线程在更新baseCount时,如果存在其他线程同时在更新,则放弃更新baseCount的值,即保持baseCount不变,而是各自往counterCells数组添加一个counterCell元素,在size方法中,累加counterCells数组的value,然后与baseCount相加,从而获取准确的大小。

/**
 * {@inheritDoc}
 */public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

/* ---------------- Counter support -------------- */
/**
 * A padded cell for distributing counts.  Adapted from LongAdder
 * and Striped64.  See their internal docs for explanation.
 */@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    
    // sum初始化为baseCount
    long sum = baseCount;
    if (as != null) {
    
        // 遍历counterCells并累加其value到sum
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

3、addCount:在put写操作之后,递增baseCount值。在putVal中调用addCount(1L, binCount);,即递增1,在其批量操作中,则可以是批量的数量作为参数,如addCount(100L, binCount)

/**
 * Adds to count, and if table is too small and not already
 * resizing, initiates transfer. If already resizing, helps
 * perform transfer if work is available.  Rechecks occupancy
 * after a transfer to see if another resize is already needed
 * because resizings are lagging additions.
 *
 * @param x the count to add
 * @param check if <0, don't check resize, if <= 1 only check if uncontended
 */private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
    
        // CAS更新baseCount失败,表示存在并发异常
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
              
            // CAS更新失败时,在counterCells数组添加一个counterCell对象
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

// 往counterCells数组添加counterCell对象
// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        if ((as = counterCells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}  

3.4 ConcurrentHashMap(JDK1.7和JDK1.8), HashTable 与Collections.synchronizedMap

 HashTable
HashTable 是同步版本的 HashMap,内部数据结构与 HashMap 一样也都是使用链式哈希来实现的,即数组 + 链表(JDK1.8版本的HashMap新增了数组 + 红黑树的优化)。
HashTable通过在方法中使用synchronized关键字修饰来进行线程同步,实现线程安全,但是由于是方法级别使用synchronized,即是使用HashTable对象自身作为monitor锁,故多个线程的读读,读写,写写都是互斥,即任何使用只能存在一个线程对HashTable对象进行访问,所以HashTable的并发性能是较低的。
链表节点定义:与 HashMap 一样都是继承于Map.Entry:value和next都是不需要使用volatile修饰的,因为synchronized 同时具有线程同步和线程可见性的作用。

/**
 * Hashtable bucket collision list entry
 */private static class Entry<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    V value;
    Entry<K,V> next;

    protected Entry(int hash, K key, V value, Entry<K,V> next) {
        this.hash = hash;
        this.key =  key;
        this.value = value;
        this.next = next;
    }

方法级别的 synchronized 同步:get,put等方法均是使用 synchronized 修饰

// get读操作
public synchronized V get(Object key) {
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    
// %取模,而HashMap为通过位运算取模,性能较高
    int index = (hash & 0x7FFFFFFF) % tab.length;
    for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
        if ((e.hash == hash) && e.key.equals(key)) {
            return (V)e.value;
        }
    }
    return null;
}

// put写操作
public synchronized V put(K key, V value) {
    // Make sure the value is not null
    if (value == null) {
        throw new NullPointerException();
    }

    // Makes sure the key is not already in the hashtable.
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    @SuppressWarnings("unchecked")

// 更新
    Entry<K,V> entry = (Entry<K,V>)tab[index];
    for(; entry != null ; entry = entry.next) {
        if ((entry.hash == hash) && entry.key.equals(key)) {
            V old = entry.value;
            entry.value = value;
            return old;
        }
    }

// 新增节点
    addEntry(hash, key, value, index);
    return null;
}

private void addEntry(int hash, K key, V value, int index) {
    modCount++;

    Entry<?,?> tab[] = table;
    if (count >= threshold) {
        // Rehash the table if the threshold is exceeded
        rehash();

        tab = table;
        hash = key.hashCode();
        index = (hash & 0x7FFFFFFF) % tab.length;
    }

    // Creates the new entry.
    @SuppressWarnings("unchecked")
    Entry<K,V> e = (Entry<K,V>) tab[index];
    tab[index] = new Entry<>(hash, key, value, e);
    count++;
}

如果需要使用线程安全的 HashMap,则优先考虑使用ConcurrentHashMap,或者使用 Collections.synchronizedMap 来将HashMap 包装成线程安全的SynchronizedMap。因为ConcurrentHashMap和HashMap的实现更加高效,如根据key的hash计算哈希表table数组的下标的实现,HashTable 是使用%取余,而ConcurrentHashMap和HashMap都是使用位运算,效率更高,所以整体性能高于HashTable。

Collections.synchronizedMap
Collections.synchronizedMap 的方法定义如下:将传入的Map使用 SynchronizedMap 包装后返回一个SynchronizedMap实例,SynchronizedMap是线程安全的,这是一种包装器设计模式的使用。
/**
 * Returns a synchronized (thread-safe) map backed by the specified
 * map.  In order to guarantee serial access, it is critical that
 * <strong>all</strong> access to the backing map is accomplished
 * through the returned map.<p>
 *
 * It is imperative that the user manually synchronize on the returned
 * map when iterating over any of its collection views:
 * <pre>
 *  Map m = Collections.synchronizedMap(new HashMap());
 *      ...
 *  Set s = m.keySet();  // Needn't be in synchronized block
 *      ...
 *  synchronized (m) {  // Synchronizing on m, not s!
 *      Iterator i = s.iterator(); // Must be in synchronized block
 *      while (i.hasNext())
 *          foo(i.next());
 *  }
 * </pre>
 * Failure to follow this advice may result in non-deterministic behavior.
 *
 * <p>The returned map will be serializable if the specified map is
 * serializable.
 *
 * @param <K> the class of the map keys
 * @param <V> the class of the map values
 * @param  m the map to be "wrapped" in a synchronized map.
 * @return a synchronized view of the specified map.
 */public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
    return new SynchronizedMap<>(m);
}
```

```
SynchronizedMap的定义:SynchronizedMap 的实现主要是在内部定义了一个被包装的map的引用,一个对象锁mutex;

在方法内使用synchronized(mutex),即使用同步代码块的方式来替代HashTable的使用同步方法的方式,缩小同步范围,提高性能,在代码块中对被包装的map进行操作,从而实现线程安全。
```

```java
private static class SynchronizedMap<K,V>
    implements Map<K,V>, Serializable {
    private static final long serialVersionUID = 1978198479659022715L;

    private final Map<K,V> m;     // Backing Map
    final Object      mutex;        // Object on which to synchronize

    SynchronizedMap(Map<K,V> m) {
        this.m = Objects.requireNonNull(m);
        mutex = this;
    }

    SynchronizedMap(Map<K,V> m, Object mutex) {
        this.m = m;
        this.mutex = mutex;
    }

    public int size() {
        synchronized (mutex) {return m.size();}
    }
    public boolean isEmpty() {
        synchronized (mutex) {return m.isEmpty();}
    }
    public boolean containsKey(Object key) {
        synchronized (mutex) {return m.containsKey(key);}
    }
    public boolean containsValue(Object value) {
        synchronized (mutex) {return m.containsValue(value);}
    }
    public V get(Object key) {
        synchronized (mutex) {return m.get(key);}
    }

    public V put(K key, V value) {
        synchronized (mutex) {return m.put(key, value);}
    }
    public V remove(Object key) {
        synchronized (mutex) {return m.remove(key);}
    }
    public void putAll(Map<? extends K, ? extends V> map) {
        synchronized (mutex) {m.putAll(map);}
    }
    public void clear() {
        synchronized (mutex) {m.clear();}
    }

...
}  

3.5 AtomicBoolean 原子性布尔

 AtomicBoolean 是 java.util.concurrent.atomic 包下的原子变量,这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由 JVM 从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。
AtomicBoolean,在这个 Boolean 值的变化的时候不允许在之间插入,保持操作的原子性。
下面将解释重点方法并举例:
boolean compareAndSet(expectedValue, updateValue), 这个方法主要两个作用:
1.比较 AtomicBoolean 和 expect 的值,如果一致,执行方法内的语句。其实就是一个 if 语句
2.把 AtomicBoolean 的值设成 update,比较最要的是这两件事是一气呵成的,这连个动作之间不会被打断,任何内部或者外部的语句都不可能在两个动作之间运行。为多线程的控制提供了解决的方案下面我们从代码上解释。

1、获得 AtomicBoolean 的值
你可以通过使用 get() 方法来获取一个 AtomicBoolean 的值。
示例如下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean value = atomicBoolean.get();

2、设置 AtomicBoolean 的值
你可以通过使用 set() 方法来设置一个 AtomicBoolean 的值。
示例如下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
atomicBoolean.set(false);

以上代码执行后 AtomicBoolean 的值为 false。

3、交换 AtomicBoolean 的值
你可以通过 getAndSet() 方法来交换一个 AtomicBoolean 实例的值 getAndSet() 方法将返回 AtomicBoolean 当前的值,并将为 AtomicBoolean 设置一个新值。
示例如下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean oldValue = atomicBoolean.getAndSet(false);

以上代码执行后 oldValue 变量的值为true,atomicBoolean 实例将持有false值。代码成功将AtomicBoolean 当前值 ture 交换为 false。

4、比较并设置 AtomicBoolean 的值
compareAndSet() 方法允许你对 AtomicBoolean 的当前值与一个期望值进行比较,如果当前值等于期望值的话,将会对 AtomicBoolean 设定一个新值。 compareAndSet() 方法是原子性的,因此在同一时间之内有单个线程执行它。因此 compareAndSet() 方法可被用于一些类似 于锁的同步的简单实现。
以下一个compareAndSet() 示例:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean expectedValue = true;
boolean newValue = false;
boolean wasNewValueSet = atomicBoolean.compareAndSet(expectedValue, newValue);

本示例对 AtomicBoolean 的当前值与 true 值进行比较,如果相等,将 AtomicBoolean 的值更新为 false
  

3.6 AtomicInteger 原子性整型

 AtomicInteger,一个提供原子操作的 Integer 的类。在 Java 语言中, ++i 和 i++操作并不是线程安全的,
在使用的时候,不可避免的会用到 synchronized 关键字。而 AtomicInteger 则通过一种线程安全的加减操
作接口。

//以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果
public final int addAndGet(int delta)
//如果输入的数值等于预期值,则以原子方式将该值设置为输入的值
public final boolean compareAndSet(int expect, int update)
//最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值
public final void lazySet(int newValue)
//获取当前的值
public final int get()
//获取当前的值,并设置新的值,以原子方式设置为newValue的值,并返回旧值
public final int getAndSet(int newValue)
//获取当前的值,并自增,以原子方式将当前值加1,注意:这里返回的是自增前的值
public final int getAndIncrement()
//获取当前的值,并加上预期的值,返回的是增加以前值
public final int getAndAdd(int delta)
//自减,并获得自减后的值
public final int incrementAndget()
//将值减一,并返回减一后的值
public final int decrementAndGet()
//将值减一,返回减一之前的值
public final int getAndDecrement()  

3.7 AtomicIntegerArray 原子性整型数组

 AtomicIntegerArray 类提供了可以以原子方式读取和写入的底层 int 数组的操作,还包含高级原子操作。AtomicIntegerArray 支持对底层 int 数组变量的原子操作。 它具有获取和设置方法,如在变量上的读取和写入。也就是说,一个集合与同一变量上的任何后续 get 相关联。 原子 compareAndSet 方法也具有这些内存一致性功能。AtomicIntegerArray 本质上是对 int[]类型的封装。使用 Unsafe 类通过 CAS 的方式控制 int[]在多线程下的安全性。

//获得数组第 i 个下标的元素
public final int get(int i)
//获得数组的长度
public final int length()
//将数组第 i 个下标设置为 newValue,并返回旧的值
public final int getAndSet(int i, int newValue)
//进行 CAS 操作,如果第 i 个下标的元素等于 expect,则设置为 update,设置成功返回 true
public final boolean compareAndSet(int i, int expect, int update)
//将第 i 个下标的元素加 1
public final int getAndIncrement(int i)
//将第 i 个下标的元素减 1
public final int getAndDecrement(int i)
//将第 i 个下标的元素增加 delta(delta 可以是负数)
public final int getAndAdd(int i, int delta)  

3.8 AtomicLong、 AtomicLongArray 原子性整型数组

 AtomicLong、 AtomicLongArray 的 API 跟 AtomicInteger、 AtomicIntegerArray 在使用方法都是差不多的。区别在于用前者是使用原子方式更新的 long 值和 long 数组,后者是使用原子方式更新的 Integer 值和 Integer 数组。两者的相同处在于它们此类确实扩展了 Number,允许那些处理基于数字类的工具和实用工具进行统一访问。在实际开发中,它们分别用于不同的场景。

public class Test {

    public static void main(String[] agrs) {
        final AtomicLong orderIdGenerator = new AtomicLong(0);
        final List<Item> orders = Collections.synchronizedList(new ArrayList<Item>());
        for (int i = 0; i < 10; i++) {
            Thread orderCreationThread = new Thread(new Runnable() {
                public void run() {
                    for (int i = 0; i < 10; i++) {
                        long orderId = orderIdGenerator.incrementAndGet();
                        Item order = new Item(Thread.currentThread().getName(), orderId);
                        orders.add(order);
                    }
                }
            });
            orderCreationThread.setName("Order Creation Thread " + i);
            orderCreationThread.start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Set<Long> orderIds = new HashSet<Long>();
        for (Item order : orders) {
            orderIds.add(order.getID());
            System.out.println("Order name:" + order.getItemName() + "----" + "Order id:" + order.getID());
        }
    }
}

class Item {
    String itemName;
    long id;

    Item(String n, long id) {
        this.itemName = n;
        this.id = id;
    }

    public String getItemName() {
        return itemName;
    }

    public long getID() {
        return id;
    }
}

运行结果:
Order name:Order Creation Thread 0----Order id:1
Order name:Order Creation Thread 1----Order id:2
Order name:Order Creation Thread 0----Order id:4
Order name:Order Creation Thread 1----Order id:5
Order name:Order Creation Thread 3----Order id:3
Order name:Order Creation Thread 0----Order id:7
Order name:Order Creation Thread 1----Order id:6
........
Order name:Order Creation Thread 2----Order id:100  

3.9 ReentrantLock重入锁

 可重入: 单线程可以重复进入,但要重复退出
可中断: lock.lockInterruptibly()
可限时: 超时不能获得锁,就返回 false,不会永久等待构成死锁
公平锁: 先来先得, public ReentrantLock(boolean fair), 默认锁不公平的, 根据线程优先级竞争

public class ReenterLock implements Runnable {
    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;

    @Override
    public void run() {
        for (int j = 0; j < 10000; j++) {
            lock.lock();
             // 超时设置
//            lock.tryLock(5, TimeUnit.SECONDS);
            try {
                i++;
            } finally {
                // 需要放在finally里释放, 如果上面lock了两次, 这边也要unlock两次
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReenterLock tl = new ReenterLock();
        Thread t1 = new Thread(tl);
        Thread t2 = new Thread(tl);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}

中断死锁
线程1, 线程2分别去获取lock1, lock2, 触发死锁. 最终通过 DeadlockChecker 来触发线程中断.

public class DeadLock implements Runnable{

    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;

    public DeadLock(int lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            if (lock == 1){
                lock1.lockInterruptibly();
                try {
                    Thread.sleep(500);
                }catch (InterruptedException e){}
                lock2.lockInterruptibly();

            }else {
                lock2.lockInterruptibly();
                try {
                    Thread.sleep(500);
                }catch (InterruptedException e){}
                lock1.lockInterruptibly();

            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            if (lock1.isHeldByCurrentThread())
                lock1.unlock();
            if (lock2.isHeldByCurrentThread())
                lock2.unlock();
            System.out.println(Thread.currentThread().getId() + "线程中断");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DeadLock deadLock1 = new DeadLock(1);
        DeadLock deadLock2 = new DeadLock(2);
        // 线程1, 线程2分别去获取lock1, lock2. 导致死锁
        Thread t1 = new Thread(deadLock1);
        Thread t2 = new Thread(deadLock2);
        t1.start();
        t2.start();
        Thread.sleep(1000);
        // 死锁检查, 触发中断
        DeadlockChecker.check();

    }
}

public class DeadlockChecker {
    private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
    final static Runnable deadLockCheck = new Runnable() {
        @Override
        public void run() {
            while (true) {
                long[] deadlockedThreadlds = mbean.findDeadlockedThreads();

                if (deadlockedThreadlds != null) {
                    ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadlds);
                    for (Thread t : Thread.getAllStackTraces().keySet()) {
                        for (int i = 0; i < threadInfos.length; i++) {
                            if (t.getId() == threadInfos[i].getThreadId()) {
                                t.interrupt();
                                try {
                                    Thread.sleep(5000);
                                } catch (InterruptedException e) {
                                }
                            }
                        }
                    }
                }
            }
        }
    };

    public static void check() {
        Thread t = new Thread(deadLockCheck);
        t.setDaemon(true);
        t.start();
    }
}

Condition
类似于 Object.wait() 和 Object.notify(), 需要与 ReentrantLock 结合使用

// await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,
// 线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。
void await() throws InterruptedException;
// awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。
// 这和Obejct.notify()方法很类似。
void signal();
void signalAll();

public class ReenterLockCondition implements Runnable{

    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();

    @Override
    public void run() {
        try {
            lock.lock();
            condition.await();
            System.out.println("Thread is going on");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 注意放到finally中释放
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReenterLockCondition t1 = new ReenterLockCondition();
        Thread tt = new Thread(t1);
        tt.start();
        Thread.sleep(2000);
        System.out.println("after sleep, signal!");
        // 通知线程tt继续执行. 唤醒同样需要重新获得锁
        lock.lock();
        condition.signal();
        lock.unlock();
    }
}

Semaphore 信号量

锁一般都是互斥排他的, 而信号量可以认为是一个共享锁,允许N个线程同时进入临界区, 但是超出许可范围的只能等待.如果N = 1, 则类似于lock.具体API如下, 通过acquire获取信号量, 通过release释放。

public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()

模拟20个线程, 但是信号量只设置了5个许可.
因此线程是按序每2秒5个的打印job done.

public class SemapDemo implements Runnable{

    // 设置5个许可
    final Semaphore semp = new Semaphore(5);

    @Override
    public void run() {
        try {
            semp.acquire();
            // 模拟线程耗时操作
            Thread.sleep(2000L);
            System.out.println("Job done! " + Thread.currentThread().getId());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semp.release();
        }
    }

    public static void main(String[] args){
        ExecutorService service = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            service.submit(demo);
        }
    }
}  

文章来源:智云一二三科技

文章标题:Java面试大全(六)

文章地址:https://www.zhihuclub.com/200433.shtml

关于作者: 智云科技

热门文章

网站地图