Java中的并发集合类:ConcurrentHashMap
1. 引言:为什么需要 ConcurrentHashMap
在多线程编程环境中,普通的 HashMap
由于不是线程安全的,在多线程并发访问和修改时会出现数据不一致等问题。例如,当多个线程同时对 HashMap
进行扩容操作时,可能会形成环形链表,导致后续的查询操作陷入死循环。
而 Hashtable
虽然是线程安全的,但它在实现线程安全时采用了对整个哈希表加锁的方式,这使得在高并发场景下,所有线程都竞争同一把锁,性能较低。ConcurrentHashMap
应运而生,它既保证了线程安全,又通过更细粒度的锁机制和优化的数据结构,提供了比 Hashtable
更好的并发性能。
2. ConcurrentHashMap 的结构
ConcurrentHashMap
在 JDK 1.7 和 JDK 1.8 中有不同的实现结构。
2.1 JDK 1.7 的结构
在 JDK 1.7 中,ConcurrentHashMap
采用了分段锁(Segment)的设计。它将整个哈希表分为多个 Segment
,每个 Segment
类似于一个小型的 HashMap
,拥有自己独立的锁。当多个线程访问不同的 Segment
时,不会相互竞争锁,从而提高了并发性能。
每个 Segment
包含一个 HashEntry
数组,HashEntry
用于存储键值对。这种结构使得 ConcurrentHashMap
可以支持并发读操作,因为读操作不需要获取锁(除了在某些需要保证一致性的场景下),并且多个写操作可以并发执行在不同的 Segment
上。
2.2 JDK 1.8 的结构
JDK 1.8 对 ConcurrentHashMap
进行了重大改进,摒弃了分段锁的设计,采用了与 HashMap
类似的数组 + 链表 + 红黑树的结构。
在 JDK 1.8 中,ConcurrentHashMap
仍然使用数组来存储哈希桶。当链表长度超过一定阈值(默认为 8)时,链表会转换为红黑树,以提高查找性能。
在并发控制方面,JDK 1.8 采用了 CAS
(Compare - And - Swap)操作和 synchronized
关键字相结合的方式。在插入新元素时,首先通过 CAS
操作尝试将元素插入到数组的指定位置。如果 CAS
操作失败,说明可能有其他线程同时在操作该位置,此时会使用 synchronized
关键字对该数组位置进行加锁,确保操作的原子性。
3. ConcurrentHashMap 的核心方法分析
3.1 put 方法
在 JDK 1.7 中,put
方法首先会根据 key 的哈希值计算出该 key 所在的 Segment
,然后获取该 Segment
的锁,再进行插入操作。
public V put(K key, V value) {
Segment<K, V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & (segmentMask);
if ((s = (Segment<K, V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
在 JDK 1.8 中,put
方法流程如下:
- 计算 key 的哈希值。
- 根据哈希值找到数组的对应位置。
- 如果该位置为空,使用
CAS
操作将新的节点插入到该位置。 - 如果该位置不为空,且当前节点的 hash 为
-1
,表示正在进行扩容操作,当前线程协助扩容。 - 否则,使用
synchronized
关键字对该节点加锁,然后判断该节点是链表节点还是红黑树节点,进行相应的插入操作。
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K, V>[] tab = table;;) {
Node<K, V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K, V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K, V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K, V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K, V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
if ((p = ((TreeBin<K, V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
3.2 get 方法
在 JDK 1.7 中,get
方法首先根据 key 的哈希值找到对应的 Segment
,然后在该 Segment
中进行查找。由于读操作不需要获取锁(除非涉及到 size
等需要保证一致性的操作),因此 get
方法在多线程环境下具有较高的性能。
public V get(Object key) {
Segment<K, V> s;
HashEntry<K, V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K, V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
在 JDK 1.8 中,get
方法流程相对简单。首先计算 key 的哈希值,然后根据哈希值找到数组的对应位置。如果该位置是链表,遍历链表查找;如果是红黑树,在红黑树中查找。整个过程不需要加锁,因为 Node
节点中的 value
和 next
字段都使用了 volatile
关键字修饰,保证了可见性。
public V get(Object key) {
Node<K, V>[] tab; Node<K, V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
3.3 size 方法
在 JDK 1.7 中,size
方法需要遍历所有的 Segment
,为了保证准确性,会先尝试两次不加锁地统计所有 Segment
的元素个数。如果两次统计结果不一致,说明在统计过程中有其他线程对 ConcurrentHashMap
进行了修改,此时会对所有 Segment
加锁,再进行统计。
public int size() {
final Segment<K, V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K, V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow? Integer.MAX_VALUE : size;
}
在 JDK 1.8 中,size
方法通过一个 baseCount
变量来记录元素总数,并且在每次插入、删除操作时,会通过 CAS
操作更新 baseCount
。size
方法直接返回 baseCount
的值,不需要对整个哈希表加锁,提高了性能。
public int size() {
long n = sumCount();
return ((n < 0L)? 0 :
(n > (long)Integer.MAX_VALUE)? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
4. ConcurrentHashMap 的应用场景
- 缓存:在多线程环境下的缓存系统中,
ConcurrentHashMap
可以作为缓存的存储结构。例如,在一个 Web 应用中,多个线程可能同时请求缓存中的数据,并且可能有线程会更新缓存。ConcurrentHashMap
的线程安全特性和高并发性能可以保证缓存操作的正确性和高效性。 - 分布式系统:在分布式系统中,各个节点之间可能需要共享一些数据,
ConcurrentHashMap
可以作为节点内的数据存储结构。例如,在分布式计算框架中,每个节点可能需要维护一个任务队列,ConcurrentHashMap
可以用来存储任务相关的信息,多个线程可以并发地提交任务和获取任务。 - 统计信息收集:在一些需要收集统计信息的场景下,比如统计网站的访问量、用户行为等。多个线程可能同时记录不同的事件,
ConcurrentHashMap
可以用来存储这些统计数据,保证在高并发情况下数据的准确性和一致性。
5. 与其他并发集合类的比较
- 与 Hashtable 的比较:
- 线程安全机制:
Hashtable
使用synchronized
关键字对整个哈希表加锁,而ConcurrentHashMap
在 JDK 1.7 采用分段锁,JDK 1.8 采用CAS
和synchronized
结合的更细粒度锁机制。这使得ConcurrentHashMap
在高并发场景下的性能明显优于Hashtable
。 - 迭代器:
Hashtable
的迭代器是 fail - fast 的,在迭代过程中如果集合结构发生改变,会抛出ConcurrentModificationException
。ConcurrentHashMap
的迭代器是弱一致性的,在迭代过程中允许集合结构发生改变,不会抛出异常,但可能会反映出部分修改。
- 线程安全机制:
- 与 ConcurrentSkipListMap 的比较:
- 数据结构:
ConcurrentHashMap
采用哈希表结构,适合快速的查找、插入和删除操作,时间复杂度平均为 O(1)。ConcurrentSkipListMap
采用跳表结构,时间复杂度为 O(log n)。 - 有序性:
ConcurrentHashMap
不保证元素的顺序,而ConcurrentSkipListMap
可以按照 key 的自然顺序或自定义顺序维护元素,适用于需要有序遍历的场景。
- 数据结构:
6. 总结
ConcurrentHashMap
是 Java 并发包中一个非常重要的集合类,它通过巧妙的设计,在保证线程安全的同时,提供了高效的并发性能。无论是在高并发的 Web 应用、分布式系统,还是统计信息收集等场景下,都有着广泛的应用。理解 ConcurrentHashMap
的结构和核心方法,对于编写高效、可靠的多线程程序至关重要。在实际应用中,需要根据具体的业务需求,合理选择 ConcurrentHashMap
或其他并发集合类,以达到最佳的性能和功能平衡。
通过本文对 ConcurrentHashMap
的深入分析,希望读者对其在 Java 多线程编程中的应用有更清晰的认识,并能够在实际项目中灵活运用,提升系统的并发处理能力。同时,随着 JDK 的不断发展和优化,ConcurrentHashMap
也可能会有进一步的改进和变化,开发者需要持续关注相关的技术动态,以更好地适应新的编程需求。