Fork me on GitHub

ConcurrentHashMap1.7实现

今天我们将深入剖析一个比Hashtable性能更优的线程安全的Map类,它就是ConcurrentHashMap,本文基于Java1.7的源码做剖析。

为什么要使用ConcurrentHashMap?   

  1. 多线程环境下,HashMap会处于不安全状态。例如put操作可能会引起程序死循环,Cpu占有率达百分百,原因是多线程会导致HashMap的Entry链表形成环形数据结构,如此一来,他的next结点将永不为空,就会产生死循环获取Entry。既然不能使用HashMap,在多线程环境下就给并发容器有了登场机会。具体请参考谈谈HashMap线程不安全的体现 .  
  2. Hashtable效率低下。Hashtable相信很多人都知道,相比HashMap就增加了线程安全机制,在过去Hashtable就会被使用,但由于它的实现是利用synchronized来保证线程安全的,在线程竞争激烈的情况下,效率就非常低下了。原因在于当有线程访问Hashtable的同步方法时,其他线程也来访问就会进行阻塞或者轮询状态。因此效率低下,如今开发中已经鲜有人使用Hashtable了。
  3. ConcurrentHashMap采用锁分段技术可有效提高并发访问率。下面来做深入剖析。

实现原理及结构图

ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。如下图是ConcurrentHashMap的内部结构图:
mark
从图中可以看到,ConcurrentHashMap内部分为很多个Segment,然后每个Segment(继承ReentrantLock,这样就自带了锁的功能)下面包含一个HashEntry数组,数组元素是一个节点为HashEntry链表。

初始化

初始化函数参数有三个: int initialCapacity,float loadFactor, int concurrencyLevel。

  • initialCapacity:表示新创建的这个ConcurrentHashMap的初始容量,也就是上面的结构图中的所有HashEntry节点的总数量。默认值为static final int DEFAULT_INITIAL_CAPACITY = 16;
  • loadFactor:表示负载因子,就是当ConcurrentHashMap中的元素个数大于loadFactor * 最大容量时就需要rehash,进行扩容操作。默认值为static final float DEFAULT_LOAD_FACTOR = 0.75f;
  • concurrencyLevel:表示并发级别,这个值用来确定Segment的个数,Segment的个数是大于等于concurrencyLevel的第一个2的n次方的数。比如,如果concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为static final int DEFAULT_CONCURRENCY_LEVEL = 16;。理想情况下ConcurrentHashMap的真正的并发访问量能够达到concurrencyLevel,因为有concurrencyLevel个Segment,假如有concurrencyLevel个线程需要访问Map,并且需要访问的数据都恰好分别落在不同的Segment中,则这些线程能够无竞争地自由访问(因为他们不需要竞争同一把锁),达到同时访问的效果。这也是为什么这个参数起名为“并发级别”的原因。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//1.若参数不正确,抛出异常
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//MAX_SEGMENTS=1 << 30
//2.判断并发级别是否大于MAX_SEGMENTS,若大于,把并发级别置为MAX_SEGMENTS
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
//3.根据并发级别得出最终segments数组的大小
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//4.segmentShift和segmentMask分别表示段偏移量和段掩码,这两个和segment的定位比较相关。
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
//5.根据initialCapacity计算每个HashEntry数组的大小,是2的幂次方,最小值为2,
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
//6.最后创建一个Segment实例,将其当做Segment数组的第一个元素
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}

put操作

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public V put(K key, V value) {
Segment<K,V> s;
//1.value为null,则直接抛出空指针异常
if (value == null)
throw new NullPointerException();
//2.对key进行hash,文末详解这个hash方法
int hash = hash(key);
//segmentShift:段偏移量 ,segmentMask:段掩码.
//3.将hash右移segmentShift,然后与segmentMask做位与运算得到segment的索引。
int j = (hash >>> segmentShift) & segmentMask;
//4.使用Unsafe方式得到segment,若segment未初始化,则调用ensureSegment方法,通过CAS进行赋值
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
//5.执行Segment的put方法通过加锁机制插入数据
return s.put(key, hash, value, false);
}
//Segment的put方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//使用tryLock()方法得到锁,没有得到锁的执行scanAndLockForPut方法,通过重复执行tryLock()方法尝试获取锁
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
//插入数据的具体逻辑.....
} finally {
//执行完插入操作时,会通过unlock()方法释放锁,接着唤醒阻塞线程继续执行
unlock();
}
return oldValue;
}
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
//重复执行尝试获得锁
while (!tryLock()) {
//获得锁的具体逻辑
//....
//当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程B;
//在多处理器环境下,重复次数为64,单处理器重复次数为1
//static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//....
}
return node;
}

get操作

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
//1.通过对key进行hash,然后得到所在segment索引
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//2.通过Unsafe得到segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//3.然后得到HashEntry链表的索引,进行遍历得到
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

size操作

先给3次机会,不lock所有的Segment,遍历所有Segment,累加各个Segment的大小得到整个Map的大小,如果某相邻的两次计算获取的所有Segment的更新的次数(每个Segment都有一个modCount变量,这个变量在Segment中的Entry被修改时会加一,通过这个值可以得到每个Segment的更新操作的次数)是一样的,说明计算过程中没有更新操作,则直接返回这个值。如果这三次不加锁的计算过程中Map的更新次数有变化,则之后的计算先对所有的Segment加锁,再遍历所有Segment计算Map大小,最后再解锁所有Segment。
源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public int size() {
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

关于hash

使用key定位Segment之前进要行一次hash操作,这次hash的作用是什么呢?看看hash的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}

这里用到了Wang/Jenkins hash算法的变种,主要的目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。
举个简单的例子:

1
2
3
4
System.out.println(Integer.parseInt("0001111", 2) & 15);
System.out.println(Integer.parseInt("0011111", 2) & 15);
System.out.println(Integer.parseInt("0111111", 2) & 15);
System.out.println(Integer.parseInt("1111111", 2) & 15);

这些数字得到的hash值都是一样的,全是15,所以如果不进行第一次预hash,发生冲突的几率还是很大的,但是如果我们先把上例中的二进制数字使用hash()函数先进行一次预hash,得到的结果是这样的:

1
2
3
4
0100|0111|0110|0111|1101|1010|0100|1110
1111|0111|0100|0011|0000|0001|1011|1000
0111|0111|0110|1001|0100|0110|0011|1110
1000|0011|0000|0000|1100|1000|0001|1010

可以看到每一位的数据都散开了,并且ConcurrentHashMap中是使用预hash值的高位参与运算的。比如put方法,int segmentIndex = (hash >>> segmentShift) & segmentMask;,先将hash值向右移segmentShift,然后与segmentMask做位与运算。以默认值28和15为例得到的结果都别为:4,15,7,8,没有冲突!

参考:

http://www.infoq.com/cn/articles/ConcurrentHashMap/
http://www.jianshu.com/p/e694f1e868ec

-------------本文结束感谢您的阅读-------------
坚持原创技术分享,您的支持将鼓励我继续创作!