ZhouXiangの博客 后端工程师&机器学习爱好者

Java学习系列之集合ConcurrentHashMap

2019-10-05

本文系记录对Java中集合类ConcurrentHashMap的学习资料,如有异议,欢迎联系我讨论修改。PS:图侵删!如果看不清图请访问https://github.com/zx950519/zx950519.github.io/edit/master/_posts/2019-10-05-Java%E5%AD%A6%E4%B9%A0%E7%B3%BB%E5%88%97%E4%B9%8B%E9%9B%86%E5%90%88ConcurrentHashMap.md

1 并发安全的哈希

为什么需要ConcurrentHashMap

HashMap在多线程环境下非线程安全,而HashTable通多对方法使用Synchronized加以修饰从而达到线程安全,但是HashTable实现同步的方法比较暴力,相当于所有读写线程均去读取一把锁,效率比较低下。另外一种同步Map的方法是使用Collections工具类,例如:

Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());

该种方法与HashTable实现方式类似,也是通过锁住整表来实现同步的。而ConcurrentHashMap则避免了上述两种Map同步方式锁住全表的问题。在JDK1.7中,通过引入分段锁Segment(实现了ReentrantLock),保证了一定粒度的并行;在JDK1.8中则抛弃Segment的设计理念,将粒度完全降低到桶数量,基于CAS与Synchronized。

ConcurrentHashMap的基本设计思想

ConcurrentHashMap和HashMap实现上类似,最主要的差别是ConcurrentHashMap 采用了分段锁(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发度更高(并发度就是Segment的个数)。其中Segment继承自ReentrantLock。默认的并发级别为16,也就是说默认创建16个Segment。JDK1.7使用分段锁机制来实现并发更新操作,核心类为Segment,它继承自重入锁ReentrantLock,并发度与Segment数量相等。JDK1.8使用了CAS操作来支持更高的并发度,在CAS操作失败时使用内置锁synchronized。并且 JDK 1.8 的实现也在链表过长时会转换为红黑树。

1.2 JDK7的ConcurrentHashMap

基本结构

http://ww1.sinaimg.cn/large/005L0VzSly1g7ndp1emulj30md0beac3.jpg

Segment

Segment是JDK1.7中ConcurrentHashMap的核心设计,通过引入分段达成提高并行处理度的效果。易于看出,Segment继承了ReentrantLock并实现了序列化接口,说明Segment的锁是可重入的。

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile int count;   // Segment中元素的数量,由volatile修饰,支持内存可见性
    transient int modCount;         // 对table的大小造成影响的操作的数量(比如put或者remove操作)
    transient int threshold;        // 扩容阈值
    transient volatile HashEntry<K,V>[] table;    // 链表数组,数组中的每一个元素代表了一个链表的头部
    final float loadFactor;         // 负载因子
}

HashEntry

Segment中的元素是以HashEntry的形式存放在链表数组中的,其结构与普通HashMap的HashEntry基本一致,不同的是Segment的HashEntry,其value由volatile修饰,以支持内存可见性,即写操作对其他读线程即时可见。

static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;    
}

值得注意的是,key,hash值与next域都是由final修饰的,无法修改其引用。

初始化过程

下面以一个ConcurrentHashMap构造函数为例进行分析:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // 根据传入的concurrencyLevel更新sshift与ssize
    // 如concurrencyLevel=5 则天花板方向上离2^3=8最近,则sshift=3,ssize=8
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // segmentShift和segmentMask元素定位Segment下标
    segmentShift = 32 - sshift;   // 计算Segment下标时首先令hash值的位数
    segmentMask = ssize - 1;      // 计算Segment下标时随后求余数的操作数
    // 按照ssize的大小对segment数组进行初始化
    this.segments = Segment.newArray(ssize);
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    // 若initialCapacity / ssize不整除,则将c=c+1
    if (c * ssize < initialCapacity)
        ++c;
    int cap = 1;
    // cap为每个segment的初始容量,其值为离c天花板方向最近的2^n
    // 例:c为5,cap则为2^3=8;c为12,cap则为2^4=16
    while (cap < c)
        cap <<= 1;
    // 创建Segment
    for (int i = 0; i < this.segments.length; ++i)
        this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

get方法

根据key获取value时,由于1.7中需要两次Hash过程,第一次需要定位到Segment;第二次需要定位到Segment中的桶下标。

public V get(Object key) {
    // 首先计算key的hash值
    int hash = hash(key.hashCode());
    // segmentFor(hash): 定位到key在哪个segment
    // 调用segment的get(key, hash)获取到指定key的value
    return segmentFor(hash).get(key, hash);
}
final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}
V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        // 取得链表的头部,就是第2次hash过程
        HashEntry<K,V> e = getFirst(hash);
        // 链表搜索,直到hash与key均相等时,返回节点的value
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}
HashEntry<K,V> getFirst(int hash) {
    // 获取Segment的数组结构
    HashEntry<K,V>[] tab = table;
    // 第2次hash过程,确定key位于哪一个HashEntry
    return tab[hash & (tab.length - 1)];
}

在第二次查找具体元素时,首先对count做了非零判断,由于count是volatile修饰的,put、remove等操作会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。

put方法

put操作也涉及2次hash定位过程,但是比get操作多了是否扩容、rehash等过程,2次哈希在此不做过多赘述。

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先加锁
    lock();
    try {
        int c = count;
        // 对c进行+1操作,获取新的数组容量
        // 如果新的数组容量高于阈值,则先进行扩容操作
        if (c++ > threshold) // ensure capacity
            rehash();
        // 获取Segment的数组table
        HashEntry<K,V>[] tab = table;
        // 2次hash过程确定桶下标
        int index = hash & (tab.length - 1);
        // 获取index位置的头结点
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        // 沿链表遍历,直到找到与元素key或者hash值相同的节点
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;
        V oldValue;
        // 若key或者hash值相同的节点存在,则进行更新操作
        if (e != null) {
            // value也是volatile修饰的,所以内存即时可见
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        }
        // 若key或者hash值相同的节点不存在,则新建节点,追加到当前链表的头部(头插法)
        else {
            oldValue = null;
            // 更新modCount的值,记录对table的大小造成影响的操作的数量
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            // 更新count的值,内存即时可见
            count = c; // write-volatile
        }
        // 返回旧值
        return oldValue;
    } finally {
        // 必须在finally中显示释放
        unlock();
    }
}

remove方法

V remove(Object key, int hash, Object value) {
    // 加锁,除了读取操作,其他操作均需要加锁
    lock();
    try {
        // 计算新的Segment元素数量
        int c = count - 1;
        // 获取Segment的数组table
        HashEntry<K,V>[] tab = table;
        // 第2次hash,确定在table的哪个位置
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        // 沿链表遍历,直到找到与元素key或者hash值相同的节点
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;
        V oldValue = null;
        if (e != null) {
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                // 更新modCount值              
                ++modCount;
                HashEntry<K,V> newFirst = e.next;
                // 将待删除元素的前面的元素全部复制一遍,然后头插到链表上去
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                // 更新新的Segment元素数量,内存即时可见
                count = c; // write-volatile
            }
        }
        // 返回旧值
        return oldValue;
    } finally {
        // 释放锁
        unlock();
    }
}

由于,HashEntry中的next是final的,一经赋值以后就不可修改,在定位到待删除元素的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍,然后再一个一个重新接到链表上去。使用final保证其不变性从而易于并发读取,但是当修改时,其成本就显得有些高了。

1.3 JDK8的ConcurrentHashMap

基本结构

http://ww1.sinaimg.cn/large/005L0VzSly1g7nehxnf22j30hy0fyab5.jpg
在JDK1.8的ConcurrentHashMap中,完全摒弃1.7中segment的概念,保持与1.8HashMap一致的设计,通过引入CAS与Synchronized来保证最大粒度的并行。

重要全局变量

    static final int MOVED     = -1; // 表示正在转移
    static final int TREEBIN   = -2; // 表示已经转换成树
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
    transient volatile Node<K,V>[] table;//默认没初始化的数组,用来保存元素
    private transient volatile Node<K,V>[] nextTable;//转移的时候用的数组
    /**
     * 用来控制表初始化和扩容的,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75
     * 当为负的时候,说明表正在初始化或扩张,
     *     -1 表示初始化  
     *     -(1+n) n:表示活动的扩张线程
     */
    private transient volatile int sizeCtl;

Node内部类

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;    //key的hash值
    final K key;       //key
    volatile V val;    //value
    volatile Node<K,V> next; //表示链表中的下一个节点
    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
}

TreeNode内部类

static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;
    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }
}

TreeBin内部类

static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // values for lockState
    static final int WRITER = 1; // set while holding write lock
    static final int WAITER = 2; // set when waiting for write lock
    static final int READER = 4; // increment value for setting read lock
}

初始化方法

初始化方法均采用延迟初始化的形式。

public ConcurrentHashMapDebug(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

put方法

  • 先判断key与value是否为空。与HashMap不同,ConcurrentHashMap不允许null作为key或value,为什么这样设计? 因为ConcurrentHashmap是支持并发,这样会有一个问题,当你通过get(k)获取对应的value时,如果获取到的是null时,你无法判断,它是put(k,v)的时候value为null,还是这个key从来没有做过映射。HashMap是非并发的,可以通过contains(key)来做这个判断。而支持并发的Map在调用m.contains(key)和m.get(key),可能已经不同了;
  • 计算hash值来确定放在数组的哪个位置
  • 判断当前table是否为空,空的话初始化table
  • 根据重哈希算出的值通过与运算得到桶索引,利用Unsafe类直接获取内存内存中对应位置上的节点,若没有碰撞即桶中无结点CAS直接添加
  • 如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制
  • 最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作
  • 其他部分同HashMap中的操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();   //K,V都不能为空,否则的话跑出异常
    int hash = spread(key.hashCode());    //取得key的hash值
    int binCount = 0;                     //用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树
    for (Node<K,V>[] tab = table;;) {     //
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)    
            tab = initTable();    //第一次put的时候table没有初始化,则初始化table
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    //通过哈希计算出一个表中的位置因为n是数组的长度,所以(n-1)&hash肯定不会出现数组越界
            if (casTabAt(tab, i, null,        //如果这个位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的
                         new Node<K,V>(hash, key, value, null)))        //创建一个Node添加到数组中区,null表示的是下一个节点为空
                break;                   // no lock when adding to empty bin
        }
        /*
         * 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段,
         * 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失
         */
        else if ((fh = f.hash) == MOVED)    
            tab = helpTransfer(tab, f);
        else {
            /*
             * 如果在这个位置有元素的话,就采用synchronized的方式加锁,
             *     如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历,
             *         如果找到了key和key的hash值都一样的节点,则把它的值替换到
             *         如果没找到的话,则添加在链表的最后面
             *  否则,是树的话,则调用putTreeVal方法添加到树中去
             *  
             *  在添加完之后,会对该节点上关联的的数目进行判断,
             *  如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容
             */
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {        //再次取出要存储的位置的元素,跟前面取出来的比较
                    if (fh >= 0) {                //取出来的元素的hash值大于0,当转换为树之后,hash值为-2
                        binCount = 1;            
                        for (Node<K,V> e = f;; ++binCount) {    //遍历这个链表
                            K ek;
                            if (e.hash == hash &&        //要存的元素的hash,key跟要存储的位置的节点的相同的时候,替换掉该节点的value即可
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)        //当使用putIfAbsent的时候,只有在这个key没有设置值得时候才设置
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {    //如果不是同样的hash,同样的key的时候,则判断该节点的下一个节点是否为空,
                                pred.next = new Node<K,V>(hash, key,        //为空的话把这个要加入的节点设置为当前节点的下一个节点
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {    //表示已经转化成红黑树类型了
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,    //调用putTreeVal方法,将该元素添加到树中去
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)    //当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为tree
                    treeifyBin(tab, i);    
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);    //计数
    return null;
}

关于put中对CAS和synchronized的使用:

  • CAS用于当桶为空时,使用cas尝试加入新的桶头结点
  • synchronized用于桶不为空时,向链表或树中put结点的情形

get方法

  • 当key为null的时候回抛出NullPointerException的异常
  • get操作通过首先计算key的hash值来确定该元素放在数组的哪个位置
  • 判断table是否为空且table长度大于0且下标不为空
  • 然后遍历该位置的所有节点
  • 如果均无法定位到key则返回null
    public V get(Object key) {
      Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
      int h = spread(key.hashCode());
      if ((tab = table) != null && (n = tab.length) > 0 &&
          (e = tabAt(tab, (n - 1) & h)) != null) {
          if ((eh = e.hash) == h) {
              if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                  return e.val;
          }
          else if (eh < 0)
              return (p = e.find(h, key)) != null ? p.val : null;
          while ((e = e.next) != null) {
              if (e.hash == h &&
                  ((ek = e.key) == key || (ek != null && key.equals(ek))))
                  return e.val;
          }
      }
      return null;
    }
    

resize方法

当需要扩容的时候,调用的时候tryPresize方法。在tryPresize方法中,并没有加锁,允许多个线程进入,如果数组正在扩张,则当前线程也去帮助扩容。值得注意的是,复制之后的新链表不是旧链表的绝对倒序;在扩容的时候每个线程都有处理的步长,最少为16,在这个步长范围内的数组节点只有自己一个线程来处理。整个操作是在持有段锁的情况下执行。

private final void tryPresize(int size) {
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        /*
         * 如果数组table还没有被初始化,则初始化一个大小为sizeCtrl和刚刚算出来的c中较大的一个大小的数组
         * 初始化的时候,设置sizeCtrl为-1,初始化完成之后把sizeCtrl设置为数组长度的3/4
         * 为什么要在扩张的地方来初始化数组呢?这是因为如果第一次put的时候不是put单个元素,
         * 而是调用putAll方法直接put一个map的话,在putALl方法中没有调用initTable方法去初始化table,
         * 而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断
         */
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    //初始化tab的时候,把sizeCtl设为-1
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        /*
         * 一直扩容到的c小于等于sizeCtl或者数组长度大于最大长度的时候,则退出
         * 所以在一次扩容之后,不是原来长度的两倍,而是2的n次方倍
         */
        else if (c <= sc || n >= MAXIMUM_CAPACITY) {
                break;    //退出扩张
        }
        else if (tab == table) {
            int rs = resizeStamp(n);
            /*
             * 如果正在扩容Table的话,则帮助扩容
             * 否则的话,开始新的扩容
             * 在transfer操作,将第一个参数的table中的元素,移动到第二个元素的table中去,
             * 虽然此时第二个参数设置的是null,但是,在transfer方法中,当第二个参数为null的时候,
             * 会创建一个两倍大小的table
             */
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                /*
                 * transfer的线程数加一,该线程将进行transfer的帮忙
                 * 在transfer的时候,sc表示在transfer工作的线程数
                 */
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            /*
             * 没有在初始化或扩容,则开始扩容
             */
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2)) {
                    transfer(tab, null);
            }
        }
    }
}
  • 复制之后的新链表不是旧链表的绝对倒序
  • 在扩容的时候每个线程都有处理的步长,最少为16,在这个步长范围内的数组节点只有自己一个线程来处理

remove方法

remove操作首先要确定需要删除的元素的位置,不过删除元素不是简单地把待删除元素的前面的一个元素的next指向后面一个域这么简单。由于HashEntry中的next是final的,一经赋值以后就不可修改引用。因此,在定位到待删除元素的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍,然后再一个一个重新接到链表上去。这其实是由entry的不变性来决定的,除了value,其他所有属性都是用final来修饰的,这意味着在第一次设置了next域之后便不能再改变它,取而代之的是将它之前的节点全都克隆一次。至于entry为什么要设置为不变性,这跟不变性的访问不需要同步从而节省时间有关。

V remove(Object key, int hash, Object value) { 
    lock(); 
    try { 
        int c = count - 1; 
        HashEntry<K,V>[] tab = table; 
        int index = hash & (tab.length - 1); 
        HashEntry<K,V> first = tab[index]; 
        HashEntry<K,V> e = first; 
        while (e != null && (e.hash != hash || !key.equals(e.key))) 
            e = e.next; 
        V oldValue = null; 
        if (e != null) { 
            V v = e.value; 
            if (value == null || value.equals(v)) { 
                oldValue = v; 
                // All entries following removed node can stay 
                // in list, but all preceding ones need to be 
                // cloned. 
                ++modCount; 
                HashEntry<K,V> newFirst = e.next; 
                for (HashEntry<K,V> p = first; p != e; p = p.next) 
                    newFirst = new HashEntry<K,V>(p.key, p.hash, 
                                                  newFirst, p.value); 
                tab[index] = newFirst; 
                count = c; // write-volatile 
            } 
        } 
        return oldValue; 
    } finally { 
        unlock(); 
    } 
}

size方法

计算ConcurrentHashMap的元素大小是一个有趣的问题,因为是并发操作的,就是在你计算size的时候,他还在并发的插入数据,可能会导致你计算出来的size和你实际的size有相差(在你return size的时候,插入了多个数据),要解决这个问题,JDK1.7版本用两种方案:

  • 第一种方案使用不加锁的模式去尝试多次计算ConcurrentHashMap的size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的
  • 第二种方案是如果第一种方案不符合,就会给每个Segment加上锁,然后计算ConcurrentHashMap的size返回

1.4 其他

ConcurrentHashMap的工作原理以及版本差异

ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类HashTable的结构,Segment内部维护了一个链表数组。ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次 Hash 定位到 Segment,第二次Hash定位到元素所在的链表的头部,因此,这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行操作即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap 可以最高同时支持 Segment 数量大小的写操作(刚好这些写操作都非常平均地分布在所有的 Segment上),所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。JAVA7之前ConcurrentHashMap主要采用锁机制,在对某个Segment进行操作时,将该Segment锁定,不允许对其进行非查询操作,而在JAVA8之后采用CAS无锁算法,这种乐观操作在完成前进行判断,如果符合预期结果才给予执行,对并发操作提供良好的优化。而1.8中做了进一步的优化:

  • JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)
  • JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了
  • JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档
  • JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock,我觉得有以下几点:因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了;在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会产生更多的内存开销。

ConcurrentHashMap中变量使用final和volatile修饰的作用

  • final可实现不变模式(immutable),是多线程安全里最简单的一种保障方式。不变模式主要通过final关键字来限定的。在JMM中final关键字还有特殊的语义。Final域使得确保初始化安全性(initialization safety)成为可能,初始化安全性让不可变形对象不需要同步就能自由地被访问和共享
  • 使用volatile来保证某个变量内存的改变对其他线程即时可见,在配合CAS可以实现不加锁对并发操作的支持remove执行的开始就将table赋给一个局部变量tab,将tab依次复制出来,最后直到该删除位置,将指针指向下一个变量

Similar Posts

上一篇 Java代理简介

Comments