Java ConcurrentHashMap的分段锁机制
Java ConcurrentHashMap的分段锁机制
在多线程编程的场景中,ConcurrentHashMap
是Java提供的一个线程安全的哈希表实现,它通过分段锁机制来提高并发性能。相较于传统的 Hashtable
以及 synchronizedMap
,ConcurrentHashMap
能够在高并发环境下提供更好的性能和扩展性。
1. 分段锁机制概述
ConcurrentHashMap
内部采用了分段(Segment)的设计,每个分段都是一个独立的哈希表,并且每个分段都有自己独立的锁。这样,当多个线程访问 ConcurrentHashMap
时,如果它们访问的是不同的分段,就可以同时进行操作,而不需要竞争同一个锁,从而大大提高了并发访问的效率。
在早期的Java版本(如JDK 1.7)中,ConcurrentHashMap
实现为一个由多个 Segment
组成的数组。每个 Segment
继承自 ReentrantLock
,因此具有锁的功能。当一个线程需要对 ConcurrentHashMap
进行写操作(如 put
方法)时,它只需要获取对应 Segment
的锁,而不会影响其他 Segment
的访问。对于读操作(如 get
方法),在大多数情况下不需要获取锁,因为 ConcurrentHashMap
采用了一些优化手段,如使用 volatile
关键字保证数据的可见性。
2. 实现原理分析
2.1 数据结构
在JDK 1.7中,ConcurrentHashMap
的数据结构主要由 Segment
数组和 HashEntry
数组构成。
final Segment<K,V>[] segments;
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
transient int count;
// 其他相关字段和方法
}
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
// 构造函数
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
}
segments
数组是 ConcurrentHashMap
的核心,每个 Segment
内部维护着一个 HashEntry
数组,用于存储键值对。
2.2 初始化
在 ConcurrentHashMap
的构造函数中,会初始化 segments
数组以及每个 Segment
的 table
数组。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1;
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
在上述代码中,首先根据 concurrencyLevel
确定 segments
数组的大小(ssize
),并计算出 segmentShift
和 segmentMask
,用于后续定位 Segment
。然后根据 initialCapacity
计算每个 Segment
内部 table
数组的初始大小(cap
),并初始化每个 Segment
。
2.3 put 操作
put
方法用于向 ConcurrentHashMap
中插入键值对。它首先通过哈希算法定位到对应的 Segment
,然后获取该 Segment
的锁,再进行插入操作。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
在上述代码中,首先计算键的哈希值,并通过 segmentShift
和 segmentMask
定位到对应的 Segment
。如果该 Segment
为空,则调用 ensureSegment
方法创建。然后调用该 Segment
的 put
方法进行插入操作。
Segment
的 put
方法如下:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock()? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
在 Segment
的 put
方法中,首先尝试获取锁。如果获取锁失败,则调用 scanAndLockForPut
方法尝试在扫描链表的过程中获取锁。获取锁后,根据哈希值定位到 table
数组中的位置,并遍历链表查找是否存在相同键的节点。如果存在,则更新值;否则,创建新的节点插入到链表头部。如果插入后 Segment
的元素个数超过阈值(threshold
),则调用 rehash
方法进行扩容。最后释放锁并返回旧值。
2.4 get 操作
get
方法用于从 ConcurrentHashMap
中获取指定键的值。由于 HashEntry
中的 value
字段使用了 volatile
关键字修饰,因此读操作在大多数情况下不需要获取锁,从而提高了并发性能。
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
在上述代码中,首先通过哈希算法定位到对应的 Segment
,然后根据哈希值定位到 table
数组中的位置,并遍历链表查找是否存在相同键的节点。如果存在,则返回对应的值;否则,返回 null
。
3. JDK 1.8的改进
在JDK 1.8中,ConcurrentHashMap
的实现进行了重大改进。它摒弃了 Segment
的设计,采用了类似于 HashMap
的数组 + 链表 + 红黑树的数据结构,并使用 CAS
(Compare and Swap)操作和 synchronized
关键字来保证线程安全。
在JDK 1.8中,ConcurrentHashMap
的数据结构如下:
transient volatile Node<K,V>[] table;
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
// 其他相关方法
}
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev;
boolean red;
// 其他相关方法
}
put
方法的实现如下:
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeNode) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeNode<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
在JDK 1.8的 put
方法中,首先计算键的哈希值。如果 table
为空,则调用 initTable
方法进行初始化。然后根据哈希值定位到数组中的位置。如果该位置为空,则使用 CAS
操作直接插入新节点。如果该位置的节点正在进行扩容(fh == MOVED
),则调用 helpTransfer
方法协助扩容。否则,对该位置的节点加 synchronized
锁,然后进行插入或更新操作。如果链表长度超过阈值(TREEIFY_THRESHOLD
),则将链表转换为红黑树。
get
方法的实现如下:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
JDK 1.8的 get
方法与JDK 1.7类似,也是通过哈希算法定位到数组中的位置,然后遍历链表或红黑树查找对应的节点。由于 Node
中的 val
字段使用了 volatile
关键字修饰,因此读操作不需要加锁。
4. 性能对比与应用场景
JDK 1.7的 ConcurrentHashMap
通过分段锁机制在一定程度上提高了并发性能,但由于每个 Segment
是一个独立的哈希表,存在一定的空间浪费。同时,在高并发写操作时,可能会因为多个线程竞争同一个 Segment
的锁而导致性能瓶颈。
JDK 1.8的 ConcurrentHashMap
采用了更细粒度的锁机制(synchronized
锁加在每个链表或红黑树的头节点上)和 CAS
操作,在高并发场景下具有更好的性能。它不仅减少了锁的竞争,还在一定程度上提高了空间利用率。
在应用场景方面,ConcurrentHashMap
适用于多线程环境下需要高效读写的哈希表场景。例如,在缓存系统、高并发的计数器等场景中,ConcurrentHashMap
能够提供良好的性能和线程安全性。
5. 总结
ConcurrentHashMap
的分段锁机制是Java多线程编程中的一个重要设计模式,它通过将数据分段并为每个分段提供独立的锁,有效地提高了并发性能。从JDK 1.7到JDK 1.8,ConcurrentHashMap
的实现不断优化,采用了更先进的技术来进一步提升性能和降低锁的竞争。在实际应用中,我们应该根据具体的场景和需求选择合适版本的 ConcurrentHashMap
,以充分发挥其性能优势。
通过深入理解 ConcurrentHashMap
的分段锁机制及其实现原理,我们能够更好地编写高效、线程安全的多线程程序,提升系统的整体性能和稳定性。同时,这也为我们理解其他并发数据结构和多线程编程技术提供了有益的参考。