一、ConcurrentHashMap并发容器
1.ConcurrentHashMap
ConcurrentHashMap是HashMap的升级版,HashMap虽然效率高,但它是线程不安全的容器,不能再多线程环境使用,而HashTable虽然是线程安全的,但使用的是效率比较低的synchronized,并且synchronized是悲观锁,不论读写都是独占的。而ConcurrentHashMap的底层是使用CAS来实现并发访问的,效率较高。
ConcurrentHashMap的底层数据结构与HashMap相同,采用数组+链表/红黑树来实现,如下图:
2.链表结点的实现
链表结点是基本的存储单元,其实不用想太多,必然和HashMap中的Node相类似。
//可以看到Node实现了Map.Entry接口,与HashMap基本一样static class Nodeimplements Map.Entry { final int hash; //hash值,final修饰,表示hash码不可改变。 final K key; //key值 volatile V val; //value值,volatile保证可见性 volatile Node next; //链表的下个结点 Node(int hash, K key, V val, Node next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } //重写equals方法 public final boolean equals(Object o) { Object k, v, u; Map.Entry e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry )o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } //查找结点,可以看出ConcurrentHashMap中不允许存放value==null的数据 Node find(int h, Object k) { Node e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; }}//底层数组扩容时的过渡链表结点类型static final class ForwardingNode extends Node { final Node [] nextTable; ForwardingNode(Node [] tab) { super(MOVED, null, null, null); //标识结点的状态为MOVED,即转移扩容状态 this.nextTable = tab; } Node find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node [] tab = nextTable;;) { Node e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode )e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } }}
3.红黑树结点的实现
红黑树结点的实现源码:
//红黑树的结点实现,继承了Node结点static final class TreeNodeextends Node { TreeNode parent; //父结点的引用 TreeNode left; //左孩子 TreeNode right; //右孩子 TreeNode prev; // needed to unlink next upon deletion boolean red; //颜色 TreeNode(int hash, K key, V val, Node next, TreeNode parent) { super(hash, key, val, next); this.parent = parent; } //查看树中是否存在hash值为h,value为k的结点 Node find(int h, Object k) { return findTreeNode(h, k, null); } //查找结点 final TreeNode findTreeNode(int h, Object k, Class kc) { //判断k是否null,不允许放null值 if (k != null) { TreeNode p = this; do { int ph, dir; K pk; TreeNode q; TreeNode pl = p.left, pr = p.right; if ((ph = p.hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if (pl == null) p = pr; else if (pr == null) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) p = (dir < 0) ? pl : pr; else if ((q = pr.findTreeNode(h, k, kc)) != null) return q; else p = pl; } while (p != null); } return null; }}
但ConcurrentHashMap中数组存放的红黑树根结点并不是TreeNode类型,而是TreeBin类型,TreeNode被封装在TreeBin中
//TreeBin 用作树的头结点,只存储root和first节点,不存储节点的key、value值static final class TreeBinextends Node { TreeNode root; //根结点 volatile TreeNode first; //树的链式结构 volatile Thread waiter; //等待线程 volatile int lockState; //锁状态 // values for lockState static final int WRITER = 1; // 代表拥有写入锁 static final int WAITER = 2; // 代表等待获取写入锁 static final int READER = 4; // 用于设置读取锁的增加}
4.继承关系
知道了底层的数据结构,在来看看ConcurrentHashMap的继承关系,如下图所示,ConcurrentHashMap继承了AbstractMap抽象类,实现了ConcurrentMap结构。其中AbstractMap包含了对Map集合的增删改查等方法,之前对数据结构中HashMap和TreeMap的学习时已分析过,这里不再深入。另一个ConcurrentMap接口也相对简单,提供几个抽象的新方法,仍是对Map集合的操作,不同的是这些操作要求是原子操作。
//ConcurrentMap接口源码public interface ConcurrentMapextends Map { //返回指定的key对应的value值,若key不存在则返回默认提供的defaultValue @Override default V getOrDefault(Object key, V defaultValue) { V v; return ((v = get(key)) != null) ? v : defaultValue; } //遍历Map的方法,相当于for循环或foreach循环,主要是用于lambda表达式 @Override default void forEach(BiConsumer action) { Objects.requireNonNull(action); for (Map.Entry entry : entrySet()) { K k; V v; try { k = entry.getKey(); v = entry.getValue(); } catch(IllegalStateException ise) { // this usually means the entry is no longer in the map. continue; } action.accept(k, v); } } //如果map中不存在key对应的键值对,那么就新增当前传入的key和value。 //否则就返回map中key对应的value值 V putIfAbsent(K key, V value); //删除map中对应的key和value,当且仅当key和value都对应上时才删除 //若key的对应的值不是value则不删除 boolean remove(Object key, Object value); //替换key的value值,当key对应的旧值是oldValue时,替换为newValue //否则不替换 boolean replace(K key, V oldValue, V newValue); //若key存在,则将映射值替换为给定的value V replace(K key, V value); //将每个key映射的值替换成给定的调用函数的返回值 @Override default void replaceAll(BiFunction function) { Objects.requireNonNull(function); forEach((k,v) -> { //lambda表达式遍历map while(!replace(k, v, function.apply(k, v))) { // v changed or k is gone if ( (v = get(k)) == null) { // k is no longer in the map. break; } } }); } //如果map中不存在key(或key的映射为null)且调用key为参数的函数返回值newValue不为null //且将key与返回值newValue关联成功时,返回newValue,否则返回key原本的映射值 @Override default V computeIfAbsent(K key, Function mappingFunction) { Objects.requireNonNull(mappingFunction); V v, newValue; return ((v = get(key)) == null && (newValue = mappingFunction.apply(key)) != null && (v = putIfAbsent(key, newValue)) == null) ? newValue : v; } //调用函数返回的新映射值替换key对应的旧映射值,若果新映射值为null,则直接将键值对从map中删除 @Override default V computeIfPresent(K key, BiFunction remappingFunction) { Objects.requireNonNull(remappingFunction); V oldValue; while((oldValue = get(key)) != null) { V newValue = remappingFunction.apply(key, oldValue); if (newValue != null) { if (replace(key, oldValue, newValue)) return newValue; } else if (remove(key, oldValue)) return null; } return oldValue; } @Override default V compute(K key, BiFunction remappingFunction) { Objects.requireNonNull(remappingFunction); V oldValue = get(key); //获取旧映射值 for(;;) { V newValue = remappingFunction.apply(key, oldValue); //计算新映射值 if (newValue == null) { //判断新映射值是否为null //新值不存在且map中存在key,直接删除该键值对 if (oldValue != null || containsKey(key)) { //判断删除是否成功 if (remove(key, oldValue)) { // removed the old value as expected return null; } //删除失败在继续尝试 oldValue = get(key); } else { //key原本就不存在,直接返回 return null; } } else { // 新映射值不为null,且旧映射值存在。则直接替换旧映射值 if (oldValue != null) { // replace if (replace(key, oldValue, newValue)) { // replaced as expected. return newValue; } oldValue = get(key); } else { //旧映射值不存在,即key不存在,则直接向map中新增 if ((oldValue = putIfAbsent(key, newValue)) == null) { return newValue; } } } } } //如果map中存在key且映射为value,则用调用函数计算出的非空新映射值替换旧映射值 //若新映设置为null,则将旧键值对删除 //如果map中不存在key或key的旧映射为null,则直接新增键值对(key和value) @Override default V merge(K key, V value, BiFunction remappingFunction) { Objects.requireNonNull(remappingFunction); Objects.requireNonNull(value); V oldValue = get(key); for (;;) { if (oldValue != null) { V newValue = remappingFunction.apply(oldValue, value); if (newValue != null) { if (replace(key, oldValue, newValue)) return newValue; } else if (remove(key, oldValue)) { return null; } oldValue = get(key); } else { if ((oldValue = putIfAbsent(key, value)) == null) { return value; } } } }}
看完接口,在来看看ConcurrentHashMap中构造方法及一些重要的属性变量,了解ConcurrentHashMap是如何初始化的。
public class ConcurrentHashMapextends AbstractMap implements ConcurrentMap , Serializable { //map可达到的最大容量,2的31次方 private static final int MAXIMUM_CAPACITY = 1 << 30; //map初始的默认容量,没有指定map的容量时的默认值 private static final int DEFAULT_CAPACITY = 16; //数组的最大长度 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //map的默认并发级别,没有使用 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //map的默认负载因子 private static final float LOAD_FACTOR = 0.75f; //链表转换成红黑树的临界链表长度,即链表长度大于等于8时,链表将转成红黑树 static final int TREEIFY_THRESHOLD = 8; //红黑树转成链表的临界结点数,即红黑树的结点个数小于等于6时,由红黑树退化成链表 static final int UNTREEIFY_THRESHOLD = 6; //允许链表树化的最小容量值,即map的容量要大于64,才能将链表树化 static final int MIN_TREEIFY_CAPACITY = 64; //扩容线程所负责的区间大小最低为16,避免发生大量的内存冲突 private static final int MIN_TRANSFER_STRIDE = 16; //用于生成当前数组对应的基数戳 private static int RESIZE_STAMP_BITS = 16; //表示最多能有多少个线程能够帮助进行扩容,因为sizeCtl只有低16位用于标识,所以最多只有2^16-1个线程帮助扩容 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //将基数戳左移的位数,保证左移后的基数戳为负值,然后再加上n+1,表示n个线程正在扩容 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; static final int MOVED = -1; // 表示正在转移,是一个forwardNode节点 static final int TREEBIN = -2; // 表示已经转换成树,是一个TreeBin节点 static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // 用于生成hash值 //计算机的核心数 static final int NCPU = Runtime.getRuntime().availableProcessors(); //底层的table结点数组,存储键值对 transient volatile Node [] table; //只有当数组处于扩容过程时,nextTable才不为null;否则其他时刻,nextTable为null; //nextTable主要用于扩容过程中指向扩容后的新数组 private transient volatile Node [] nextTable; /** * 未初始化: * sizeCtl=0:表示没有指定初始容量。 * sizeCtl>0:表示初始容量。 * 初始化中: * sizeCtl=-1,标记作用,告知其他线程,正在初始化 * 正常状态: * sizeCtl=0.75n ,扩容阈值 * 扩容中: * sizeCtl < 0 : 表示有其他线程正在执行扩容 * sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此时只有一个线程在执行扩容 */ private transient volatile int sizeCtl; //用于扩容过程中,指示原数组下一个分割区间的上界位置,从后往前指示 private transient volatile int transferIndex; //key的视图,map中所有key的集合 private transient KeySetView keySet; //value的视图 private transient ValuesView values; //Entry视图 private transient EntrySetView entrySet; //空构造,啥也没干 public ConcurrentHashMap() { } //带指定容量的构造方法,虽然指定了容量,但map的容量的确定并不是指定的容量 //而是最接近该容量的2的幂次方数 public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) //判断容量是否合法 throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } //扩容策略,容量的确定方法 private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } //带有map集合的构造方法 public ConcurrentHashMap(Map m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } //带有负载因子及初始化容量的构造方法 public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } //带有负载因子及初始化容量、并发等级的构造方法 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // 最低的容量不能小于并发级别 initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }//不安全的属性变量,知道就行private static final sun.misc.Unsafe U; //不安全的底层操作类private static final long SIZECTL; //变量sizeCtl的内存地址偏移量private static final long TRANSFERINDEX; //变量transferIndex的内存地址偏移量private static final long BASECOUNT; //变量baseCount的内存地址偏移量private static final long CELLSBUSY; //变量cellsBusy的内存地址偏移量private static final long CELLVALUE; //变量value的内存地址偏移量private static final long ABASE; //变量ak的内存地址偏移量private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); }}}
5.put过程
数据的put过程:
public V put(K key, V value) { return putVal(key, value, false); //真正执行put的方法}//若key不存在map中,则直接将key与value新增到map中,若key已存在,则根据onlyIfAbsent//决定是否覆盖,false为覆盖,true不覆盖final V putVal(K key, V value, boolean onlyIfAbsent) { //判断key或value是否为null,从这可以看出,ConcurrentHashMap的键与值均不能为null if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); //计算key对应的hash值 int binCount = 0; //用来计算在这个链表或红黑树总共有多少个元素,用来控制扩容或者转移为树 for (Node[] tab = table;;) { Node f; int n, i, fh; //判断table是否初始化过,若未初始化,则进行初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //初始哈数组的方法 //判断hash值在table中对应的索引位置是否已经有数据 //若没有数据,则直接公国CAS的方式更新数据(即新建一个结点放到数组table中) else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node (hash, key, value, null))) //向数组中新增结点 break; // 新增完毕,退出循环 } //判断数组table对应索引的结点的状态(即链表的表头或红黑树的根结点的hash值) //若为MOVED表示数组正在扩容的复制阶段,那么当前线程也前去帮忙复制 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //帮助转移复制数据 else { V oldVal = null; //对表头或树根结点对象加锁,即同时仅有一个线程能对该链表或红黑树进行新增操作 //从这里可以看出ConcurrentHashMap的并发安全是基于对链表或黑红树的独占操作实现的 synchronized (f) { //判断数组中的索引是否发生改变 //有可能结点的类型发生改变(链表转成红黑树) if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; //遍历链表 for (Node e = f;; ++binCount) { K ek; //判断要存放的key在链表中是否已经存在 //若是存在则根据onlyIfAbsent决定是否覆盖 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; //若是key在链表中之前不存在,那么新建一个key和value构成的结点,向链表末尾新增 if ((e = e.next) == null) { pred.next = new Node (hash, key, value, null); break; } } } //判断结点是否是TreeBin类型,若是则表明链表已经树化了 //应该使用红黑树中的方法进行替换或新增 else if (f instanceof TreeBin) { Node p; binCount = 2; //putTreeVal是红黑树中的新增或替换方法, //若返回不为null,则表示key在树中已存在,返回对应的结点 //若返回是null,表明是新增结点 if ((p = ((TreeBin )f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //判断链表是否需要树化成红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //判断链表的长度是否超过8 treeifyBin(tab, i); if (oldVal != null) //若是替换结点的value,则返回旧值 return oldVal; break; } } } addCount(1L, binCount); //到这必然新增一个node,该修改size的值了 return null;}//hash值的计算方法,即散列算法//h缩小2的16次方后与自身向异或,在与上HASH_BITS(0x7fffffff)static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS;}//初始化table数组private final Node [] initTable() { Node [] tab; int sc; //当table为空时,尝试对table进行初始化 while ((tab = table) == null || tab.length == 0) { //判断是否有其他线程正在对table数组进行初始化工作 //当sizeCtl==-1时,说明已经有线程正在对table进行初始化,当前线程应该暂停让出cpu资源 if ((sc = sizeCtl) < 0) Thread.yield(); //CAS方式尝试更新sizeCtl的值,将sizeCtl的值由sc更新成-1,-1表示table表要进行初始化了 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //再次确认table为初始化过 if ((tab = table) == null || tab.length == 0) { //初始化指定的容量,若未指定初始容量的大小则使用DEFAULT_CAPACITY(16) int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node [n]; table = tab = nt; sc = n - (n >>> 2); //令sizeCtl的值为0.75倍的数组大小 } } finally { sizeCtl = sc; } break; } } return tab;}//安全的获取数组tab中索引为i的位置的数据static final Node tabAt(Node [] tab, int i) { return (Node )U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}//辅助table数组的扩容转移工作final Node [] helpTransfer(Node [] tab, Node f) { Node [] nextTab; int sc; //判断数组table是否为null,且结点f是否是ForwardingNode类型,且nextTab是否为null //即判断map是不是处在扩容状态,若是进入帮助扩容 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode )f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //判断是否还有未转移的桶区间,若没有,则退出 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //若还有,则尝试参与扩容的线程数+1;成功就进行转移, if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table;}//链表是否需要树化private final void treeifyBin(Node [] tab, int index) { Node b; int n, sc; if (tab != null) { //判断数组table的大小是否满足树化的最小容量要求 //若不满足,但链表长度又超过8,则进行数组扩容,扩大一倍 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); //扩容 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //锁结点将链表树化。其实就是新建一棵红黑树。 synchronized (b) { if (tabAt(tab, index) == b) { TreeNode hd = null, tl = null; for (Node e = b; e != null; e = e.next) { TreeNode p = new TreeNode (e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin (hd)); //更新索引位置的结点为树的根结点 } } } }}//数组table扩容private final void tryPresize(int size) { //判断要扩大的容量是否越界,且是否是2的次方 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node [] tab = table; int n; //判断数组是否初始化过,若没有初始化过,则初始化数组table //这里和initTable方法是一样的 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node [n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } //判断是否需要扩容,c小于等于sc则表示无需扩容,没达到扩容的临界值,n大于MAXIMUM_CAPACITY表示不能再扩大了 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); //不知道用来干嘛,艹 //判断是否在扩容或初始化,扩容或初始化是sc小于0 //若是初始化(sc==-1) if (sc < 0) { Node [] nt; /** * 1 (sc >>> RESIZE_STAMP_SHIFT) != rs :扩容线程数 > MAX_RESIZERS-1 * 2 sc == rs + 1 和 sc == rs + MAX_RESIZERS :看不懂.. * 3 (nt = nextTable) == null :表示nextTable正在初始化 * 4 transferIndex <= 0 :表示所有hash桶均分配出去 */ //如果不需要帮其扩容,直接返回 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //transfer的线程数+1,该线程将帮忙转移 //在transfer的时候,sc表示在transfer工作的线程数 //第一个执行扩容操作的线程,将sizeCtl设置为:(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //没有在初始化或扩容,则开始扩容 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } }}//这个方法计算n的二进制最高位前有几个零,然后和2的15次方按位或,得出个数,不知道有什么用static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));}/*** 数组扩容的核心方法* 把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置* 在这里首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用,* 每个CPU最少处理16个长度的数组元素,也就是说,如果一个数组的长度只有16,那只有一个线程会对其进行扩容的复制移动操作* 扩容的时候会一直遍历,直到复制完所有节点,每处理一个节点的时候会在链表的头部设置一个fwd节点,这样其他线程就会跳过他,* 复制后在新数组中的链表不是绝对的反序的* 简要过程* 1、获取遍历table的步长* 2、最先进行扩容的线程,会初始化nextTable(正常情况下,nextTable 是table的两倍大小)* 3、计算table某个位置索引 i,该位置上的数据将会被转移到nextTable 中。* 4、如果索引i 所对应的table的位置上没有存放数据,则放有ForwardingNode 数据,表明该table 正在进行扩容处理。* (如果有添加数据的线程添加数据到该位置上,将会发现table的状态)* 5、将索引i 位置上的数据进行转移,数据分成两部分,一部分就是数据 在nextTable 中索引没有变(仍然是i),* 另一部分则是其索引变成i+n的,将这两部分分别添加到nextTable 中。*/private final void transfer(Node [] tab, Node [] nextTab) { int n = tab.length, stride; //判断并设置每个cpu核心(即每个线程)需要转移的结点数量,最少为16个 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //nextTab为null,初始化一个table数组2倍长度的数组 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node [n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; //transferIndex为原数组的终点,转移时,从后往前转移,控制原数组的转移 } int nextn = nextTab.length; //新数组长度 //新建ForwardingNode结点,用来控制并发的,当一个节点为空或已经被转移之后,就设置为ForwardingNode节点 //是个空的标志结点 ForwardingNode fwd = new ForwardingNode (nextTab); //advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了 //表示是否继续向前查找的标志位 boolean advance = true; boolean finishing = false; //转移是否完成的标志位,完成前重新扫面数组,看看有没有遗留的 //计算和控制对table 表中位置i 的数据进行转移 //bound为数组区间下限值,i为当前转移数组的位置,--i处理转移下一个节点位置,从后往前处理 //1 逆序迁移已经获取到的hash桶集合,如果迁移完毕,则更新transferIndex,获取下一批待迁移的hash桶 //2 如果transferIndex=0,表示所以hash桶均被分配,将i置为-1,准备退出transfer方法 for (int i = 0, bound = 0;;) { Node f; int fh; while (advance) { int nextIndex, nextBound; //判断当前这一段桶区间stride是否转移完成 //i小于bound,就表示当前任务完成了 //finishing==true则表示没有转移任务结束了 if (--i >= bound || finishing) advance = false; //判断是否还有 转移任务可领取 //transferIndex==0表示,原数组中所有的桶区间都已经有线程在进行转移了 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //CAS操作修改transferIndex值,代表下一个线程要转移原数组的结点的起始索引位置 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //判断i是否小于0,即是不是所有数据都转移完了 //且i是否越界,i是不应该大于原数组长度的,也不该大于新数组长度 if (i < 0 || i >= n || i + n >= nextn) { int sc; //判断转移操作是否完成 //完成的话就将新数组赋给table,sizeCtl变为0.75倍的新数组长度 //nextTable则赋null,扩容完成 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //转移工作没有结束,尝试更新sizeCtl的值,更新成功,表示当前转移该桶区间数据的任务完成了 /** * 最先的线程,执行transfer方法时,会设置 * sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) (为什么是这个数?) * 后面辅助扩容的线程,执行transfer方法之前,会设置 sizeCtl = sizeCtl+1 * 每个线程转移完桶区间的数据后退出之前,会设置 sizeCtl = sizeCtl-1 * 那么最后一个线程退出时:必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2), * 即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT */ if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //判断是否到最后一个线程 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; //最后退出的线程要重新check下是否全部转移完毕 } } //判断原数组索引i处是否为null,为null就放个ForwardingNode结点,表示数组正在扩容转移 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //判断索引i处的结点是否是ForwardingNode结点,若是,表示该结点已经转移了 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //对数组索引i的结点进行加锁,该结点进行数据转移,不要打扰 synchronized (f) { //再次判断头结点有没有被修改过,被修改过就循环重新来 if (tabAt(tab, i) == f) { //新建两个链表。用于存放原链表转移到新数组是分开的结点 //ln存放索引不变的结点,hn存放索引变为i+n的结点 Node ln, hn; //判断头结点时链表还是红黑树 /** * fh大于等于0表示当前索引下是链表结构,那么链表中的结点转移到新数组中 * 只有可能存放在与元素组相同的索引i处,或是i+n(n为原数组长度)处,这是因为 * 数组的扩容时原来的2倍,且数组的长度必须是2的幂次方,这就使得同一索引的结点在新数组 * 中的索引只有上述两种可能的去处(这里与hashmap是相同的) */ if (fh >= 0) { //计算hash值与数组长度的按位结果 /** * 我们知道hash&(n-1)是结点在数组中的索引位置 * 而hash&n,则是判断结点是在新数组中位置是否变化的一个依据 * 例子:e.hash=6,二进制为0000 0110。oldCap假设为16,二进制位0001 0000。newCap为32,二进制位0010 0000 * (e.hash & oldCap)=0000 0110 & 0001 0000=0000 0000 ; * (e.hash & oldCap-1)=0000 0110 & 0000 1111=0000 0110 ; * (e.hash & newCap)=0000 0110 & 0010 0000=0000 0000 * (e.hash & newCap-1)=0000 0110 & 0001 1111=0000 0110 ; * 例子:e.hash=19,二进制为0001 0011。oldCap假设为16,二进制位0001 0000。newCap为32,二进制位0010 0000 * (e.hash & oldCap)=0001 0011 & 0001 0000=0001 0000 ; * (e.hash & oldCap-1)=0001 0011 & 0000 1111=0000 0011 ; * (e.hash & newCap)=0001 0011 & 0010 0000=0000 0000 * (e.hash & newCap-1)=0001 0011 & 0001 1111=0001 0011 ; * 有上面就可以看出当(e.hash & oldCap) == 0时,(e.hash & oldCap-1)与(e.hash & newCap-1)的值时相同的,即在新数组中索引不变 * 而当(e.hash & oldCap) != 0 时,(e.hash & oldCap-1)与(e.hash & newCap-1)的值不相同的,即在新数组的索引变了 * 并且hash& n的结果只有0或n两种,可以借此判断是存放在hn中还是ln中 int runBit = fh & n; Node lastRun = f; //最后一次发生hash值改变的结点 //遍历链表查找结点的hash & n值最后一次有变化的结点 for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //lastRun用作哪个链表的起点,runBit为0表示结点在新数组中的索引不变 //runBit不为0表示结点在新数组中发生改变,变为i+n if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //将结点转移到新数组中 //遍历旧链表,按个判断是移动到i+n位置,还是索引不动,分别放入ln或hn链表中 //这里hn或ln哪个链表为null,就会出现结点顺序反转,假设ln为null,则 //有ln==null,那么越早加入链表就会排在越后面 //而hn==lastRun,则会有部分结点反转,即原数组中lastRun结点之前的顺序会反转,之后不变 for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node (ph, pk, pv, ln); else hn = new Node (ph, pk, pv, hn); } //将两个新的链表更新到新数组中 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); //将原数组的索引位置结点用fwd替换 advance = true; } //这里就是红黑树的转移了 else if (f instanceof TreeBin) { TreeBin t = (TreeBin )f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //判断两个树结点是不是小于等于6,要不要退化成链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin (lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin (hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } }}
6.size的更新
到这ConcurrentHashMap的put过程及扩容过程的分析就算告一段落,但是仍然还有一点小问题还没有解决,那就是ConcurrentHashMap键值对个数的统计size值的问题,在putVal方法的末尾有个addCount方法就是对size的更新,但在看这个方法前先要了解size是如何在并发环境下进行定义的:
//标识当前cell数组是否在初始化或扩容中的CAS标志位 private transient volatile int cellsBusy; //counterCells数组,总数值的分值分别存在每个cell中 private transient volatile CounterCell[] counterCells; //没有竞争的时候使用,或者在初始化的时候作为一个反馈 //总和值的获得方式为 base + 每个cell中分值之和在并发度较低的场景下,所有值都直接累加在base中 private transient volatile long baseCount;//CounterCell的定义@sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; }}//计算总和,每个CounterCell中的值相加final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum;}
了解了定义,再来看看addCount方法:
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //如果counterCell数组为null,则直接尝试将增加的键值对数更新到baseCount的上 //若counterCell不为null if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // 是否冲突标志,默认未冲突 boolean uncontended = true; // 如果计数数组是空(尚未出现并发) // 如果随机取一个数组位置为空 或者 // 修改这个变量cellvalue失败(出现并发了) // 执行 fullAddCount 方法。并结束 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; //计算总和 s = sumCount(); } //判断是否需要扩容,从putVal方法进来(check必定大于等于0)必定要检查是否要扩容 //这里与上面分析的扩容过程类似,不在多说 if (check >= 0) { Node[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } }}/*** 1、初始化probe,该值散列后用于获取counterCells数组的索引值(线程相关)* 2、如果counterCells数组不为空,尝试占据counterCells 数组,创建CounterCell存放到counterCells中的某个位置上,退出循环* 3、如果counterCells 中该位置上存在数据,如果有冲突,则重新循环,否则尝试累加数据到cell 中,成功则退出循环。* 4、如果counterCells 数组扩容,或者不需要扩容,则重新循环* 5、对counterCells 进行扩容操作(容量增加1倍),然后重新循环* 6、如果counterCells未初始化,则尝试进行初始化* 7、如果初始化失败,则有其它线程再初始化,更新baseCount,成功就退出,否则重新循环。*/private final void fullAddCount(long x, boolean wasUncontended) { int h; // 获取当前线程的probe值 // 如果为0,则初始化当前线程probe值 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // 生成一个线程所有的随机值 h = ThreadLocalRandom.getProbe(); //返回生成的probe值,用于选择counterCells数组的下标 wasUncontended = true; //重新生成了probe,未冲突标志位设置为true } boolean collide = false; // 标识是否是最后一个槽位 for (;;) { CounterCell[] as; CounterCell a; int n; long v; //判断counterCells数组是否初始化过,若未初始化则先初始化 //cells数组不为空,即表示初始化已经成功 if ((as = counterCells) != null && (n = as.length) > 0) { //判断当前线程对应的索引是否有数据 //没有数据则新建一个CounterCell存放x,并更新counterCells数组 if ((a = as[(n - 1) & h]) == null) { //判断counterCells数组在什么状态下 //为0表示不在扩容或初始化状态 if (cellsBusy == 0) { // Try to attach new Cell //新建当前线程对应的CounterCell对象 CounterCell r = new CounterCell(x); // Optimistic create // CAS设置cellsBusy,防止其它线程来破坏数据结构 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; //标志位,操作是否成功 try { // Recheck under lock CounterCell[] rs; int m, j; //更新counterCells数组中的数据 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; //恢复标志位 } //更新成功,则退出死循环 if (created) break; continue; // Slot is now non-empty } } collide = false; } //对应索引中已经有数据了 //并且调用该函数前,cellvalue的CAS操作也已经失败(已经发生竞争) else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //执行value的累加 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; //成功更新就直接退出 //如果数组比较大了,则不需要扩容,继续重试,或者已经扩容了,重试 else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale else if (!collide) collide = true; //对counterCells数组进行扩容,若有其他线程在扩容则重新循环 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; //扩大1倍 for (int i = 0; i < n; ++i) //直接转移数据 rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h); //重新生成一个probe值 } //进行初始化 //线程要初始化counterCells数组的条件是:cellsBusy标志位要为0(说明不在初始化或扩容) //counterCells == as这个判断不知道有什么意义? //当前线程更新cellsBusy状态为初始化状态,那么其他线程就不能再进行初始化了 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; //标志初始化是否成功 try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; //counterCells数组默认容量为2 rs[h & 1] = new CounterCell(x); //h&1是计算线程的probe值散列后的存放索引 counterCells = rs; init = true; //初始化成功 } } finally { cellsBusy = 0; //恢复 } if (init) //判断初始化是否成功,成功就退出 break; } //有其它线程占据counterCells数组,直接累加在base变量中 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base }}
到这size的大小是如何计算更新的就差不多分析完了,可以看出size值只是一个瞬时值,大小随时都有可能变化,只能做参考,不要过度依赖使用
7.get和remove的过程
的剩余的get方法和remove方法就很简单了,看看就差不多都能理解,先来看看get方法:
//get方法的实现public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; int h = spread(key.hashCode()); //计算key对应的hash值 //判断table数组是否为空,若不为空,h对应的索引是否有结点 //table不为空或索引位置没有结点说明key不存在,直接返回null if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //判断该索引位置的结点的hash值是否与h相同(有可能头结点的hash值小于0,表示此时正在扩容) if ((eh = e.hash) == h) { //判断要查找的key是不是头结点,若是头结点俺么直接返回头结点的value值 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //eh小于0,该结点是ForwardingNode结点或TreeBin结点,调用find方法 //ForwardingNode的find方法比较简单就是遍历新数组来查找对应结点是否存在 //TreeBin的find方法则要复杂 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //遍历链表来查找key对应的value while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null;}//ForwardingNode中的find方法,遍历新数组对应的索引查找keyNode find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node [] tab = nextTable;;) { Node e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode )e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } }}//TreeBin中的find方法final Node find(int h, Object k) { if (k != null) { // for (Node e = first; e != null; ) { int s; K ek; //判断TreeBin的状态,若是等待或写状态,则以链表的形式进行遍历查找 //((s = lockState) & (WAITER|WRITER)) == 0表示TreeBin现在处于读状态 if (((s = lockState) & (WAITER|WRITER)) != 0) { if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; e = e.next; } //尝试更新读状态,即获取读锁,成功的话,就以红黑树的形式进行遍历查找 else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) { TreeNode r, p; try { p = ((r = root) == null ? null : r.findTreeNode(h, k, null)); } finally { Thread w; if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER) && (w = waiter) != null) LockSupport.unpark(w); } return p; } } } return null;}
再看remove方法:
public V remove(Object key) { return replaceNode(key, null, null);}final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); //计算hash值 for (Node[] tab = table;;) { Node f; int n, i, fh; //判断table数组是否为空,或hash值对应索引上是否有结点 //table为空或索引上没有结点,表明key不存在,直接返回 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; else if ((fh = f.hash) == MOVED) //如果当前table数组在扩容,当前线程先去辅助扩容,扩容完在删除结点 tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { //加锁,保证对应的链表或红黑树的并发操作 if (tabAt(tab, i) == f) { //再次判断头结点是否被改变 //判断是链表还是红黑树结构 //fh大于0表明是链表 if (fh >= 0) { validated = true; //遍历链表 for (Node e = f, pred = null;;) { K ek; //判断遍历到的当前结点是不是要删除的结点 //是要删除的结点就进行删除,并将删除结点的value返回 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) //判断下个结点 break; //链表到头了,说明不存在哟啊删除的结点,直接退出循环 } } //底层是红黑树结构,执行红黑树中删除节点的额方法,这里不做深入,有兴趣可以看源码具体实现 else if (f instanceof TreeBin) { validated = true; TreeBin t = (TreeBin )f; TreeNode r, p; //先查找出key对应的结点是否存在,不存在直接结束 //存在的话,获取到value再删除结点 if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } //判断要删除的key是否存在 //存在就返回对应的value,不存在就返回null if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null;}
OK,到这ConcurrentHashMap中的重要的方法基本就分析完了,剩余的方法就更简单,不在多说,gg。