MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java中的并发集合类:ConcurrentHashMap

2023-04-276.0k 阅读

1. 引言:为什么需要 ConcurrentHashMap

在多线程编程环境中,普通的 HashMap 由于不是线程安全的,在多线程并发访问和修改时会出现数据不一致等问题。例如,当多个线程同时对 HashMap 进行扩容操作时,可能会形成环形链表,导致后续的查询操作陷入死循环。

Hashtable 虽然是线程安全的,但它在实现线程安全时采用了对整个哈希表加锁的方式,这使得在高并发场景下,所有线程都竞争同一把锁,性能较低。ConcurrentHashMap 应运而生,它既保证了线程安全,又通过更细粒度的锁机制和优化的数据结构,提供了比 Hashtable 更好的并发性能。

2. ConcurrentHashMap 的结构

ConcurrentHashMap 在 JDK 1.7 和 JDK 1.8 中有不同的实现结构。

2.1 JDK 1.7 的结构

在 JDK 1.7 中,ConcurrentHashMap 采用了分段锁(Segment)的设计。它将整个哈希表分为多个 Segment,每个 Segment 类似于一个小型的 HashMap,拥有自己独立的锁。当多个线程访问不同的 Segment 时,不会相互竞争锁,从而提高了并发性能。

每个 Segment 包含一个 HashEntry 数组,HashEntry 用于存储键值对。这种结构使得 ConcurrentHashMap 可以支持并发读操作,因为读操作不需要获取锁(除了在某些需要保证一致性的场景下),并且多个写操作可以并发执行在不同的 Segment 上。

2.2 JDK 1.8 的结构

JDK 1.8 对 ConcurrentHashMap 进行了重大改进,摒弃了分段锁的设计,采用了与 HashMap 类似的数组 + 链表 + 红黑树的结构。

在 JDK 1.8 中,ConcurrentHashMap 仍然使用数组来存储哈希桶。当链表长度超过一定阈值(默认为 8)时,链表会转换为红黑树,以提高查找性能。

在并发控制方面,JDK 1.8 采用了 CAS(Compare - And - Swap)操作和 synchronized 关键字相结合的方式。在插入新元素时,首先通过 CAS 操作尝试将元素插入到数组的指定位置。如果 CAS 操作失败,说明可能有其他线程同时在操作该位置,此时会使用 synchronized 关键字对该数组位置进行加锁,确保操作的原子性。

3. ConcurrentHashMap 的核心方法分析

3.1 put 方法

在 JDK 1.7 中,put 方法首先会根据 key 的哈希值计算出该 key 所在的 Segment,然后获取该 Segment 的锁,再进行插入操作。

public V put(K key, V value) {
    Segment<K, V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & (segmentMask);
    if ((s = (Segment<K, V>)UNSAFE.getObject
         (segments, (j << SSHIFT) + SBASE)) == null)
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

在 JDK 1.8 中,put 方法流程如下:

  1. 计算 key 的哈希值。
  2. 根据哈希值找到数组的对应位置。
  3. 如果该位置为空,使用 CAS 操作将新的节点插入到该位置。
  4. 如果该位置不为空,且当前节点的 hash 为 -1,表示正在进行扩容操作,当前线程协助扩容。
  5. 否则,使用 synchronized 关键字对该节点加锁,然后判断该节点是链表节点还是红黑树节点,进行相应的插入操作。
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K, V>[] tab = table;;) {
        Node<K, V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K, V>(hash, key, value, null)))
                break;
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K, V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K, V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K, V>(hash, key,
                                                           value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K, V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K, V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

3.2 get 方法

在 JDK 1.7 中,get 方法首先根据 key 的哈希值找到对应的 Segment,然后在该 Segment 中进行查找。由于读操作不需要获取锁(除非涉及到 size 等需要保证一致性的操作),因此 get 方法在多线程环境下具有较高的性能。

public V get(Object key) {
    Segment<K, V> s;
    HashEntry<K, V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K, V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
             (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

在 JDK 1.8 中,get 方法流程相对简单。首先计算 key 的哈希值,然后根据哈希值找到数组的对应位置。如果该位置是链表,遍历链表查找;如果是红黑树,在红黑树中查找。整个过程不需要加锁,因为 Node 节点中的 valuenext 字段都使用了 volatile 关键字修饰,保证了可见性。

public V get(Object key) {
    Node<K, V>[] tab; Node<K, V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

3.3 size 方法

在 JDK 1.7 中,size 方法需要遍历所有的 Segment,为了保证准确性,会先尝试两次不加锁地统计所有 Segment 的元素个数。如果两次统计结果不一致,说明在统计过程中有其他线程对 ConcurrentHashMap 进行了修改,此时会对所有 Segment 加锁,再进行统计。

public int size() {
    final Segment<K, V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K, V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow? Integer.MAX_VALUE : size;
}

在 JDK 1.8 中,size 方法通过一个 baseCount 变量来记录元素总数,并且在每次插入、删除操作时,会通过 CAS 操作更新 baseCountsize 方法直接返回 baseCount 的值,不需要对整个哈希表加锁,提高了性能。

public int size() {
    long n = sumCount();
    return ((n < 0L)? 0 :
            (n > (long)Integer.MAX_VALUE)? Integer.MAX_VALUE :
            (int)n);
}

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

4. ConcurrentHashMap 的应用场景

  1. 缓存:在多线程环境下的缓存系统中,ConcurrentHashMap 可以作为缓存的存储结构。例如,在一个 Web 应用中,多个线程可能同时请求缓存中的数据,并且可能有线程会更新缓存。ConcurrentHashMap 的线程安全特性和高并发性能可以保证缓存操作的正确性和高效性。
  2. 分布式系统:在分布式系统中,各个节点之间可能需要共享一些数据,ConcurrentHashMap 可以作为节点内的数据存储结构。例如,在分布式计算框架中,每个节点可能需要维护一个任务队列,ConcurrentHashMap 可以用来存储任务相关的信息,多个线程可以并发地提交任务和获取任务。
  3. 统计信息收集:在一些需要收集统计信息的场景下,比如统计网站的访问量、用户行为等。多个线程可能同时记录不同的事件,ConcurrentHashMap 可以用来存储这些统计数据,保证在高并发情况下数据的准确性和一致性。

5. 与其他并发集合类的比较

  1. 与 Hashtable 的比较
    • 线程安全机制Hashtable 使用 synchronized 关键字对整个哈希表加锁,而 ConcurrentHashMap 在 JDK 1.7 采用分段锁,JDK 1.8 采用 CASsynchronized 结合的更细粒度锁机制。这使得 ConcurrentHashMap 在高并发场景下的性能明显优于 Hashtable
    • 迭代器Hashtable 的迭代器是 fail - fast 的,在迭代过程中如果集合结构发生改变,会抛出 ConcurrentModificationExceptionConcurrentHashMap 的迭代器是弱一致性的,在迭代过程中允许集合结构发生改变,不会抛出异常,但可能会反映出部分修改。
  2. 与 ConcurrentSkipListMap 的比较
    • 数据结构ConcurrentHashMap 采用哈希表结构,适合快速的查找、插入和删除操作,时间复杂度平均为 O(1)。ConcurrentSkipListMap 采用跳表结构,时间复杂度为 O(log n)。
    • 有序性ConcurrentHashMap 不保证元素的顺序,而 ConcurrentSkipListMap 可以按照 key 的自然顺序或自定义顺序维护元素,适用于需要有序遍历的场景。

6. 总结

ConcurrentHashMap 是 Java 并发包中一个非常重要的集合类,它通过巧妙的设计,在保证线程安全的同时,提供了高效的并发性能。无论是在高并发的 Web 应用、分布式系统,还是统计信息收集等场景下,都有着广泛的应用。理解 ConcurrentHashMap 的结构和核心方法,对于编写高效、可靠的多线程程序至关重要。在实际应用中,需要根据具体的业务需求,合理选择 ConcurrentHashMap 或其他并发集合类,以达到最佳的性能和功能平衡。

通过本文对 ConcurrentHashMap 的深入分析,希望读者对其在 Java 多线程编程中的应用有更清晰的认识,并能够在实际项目中灵活运用,提升系统的并发处理能力。同时,随着 JDK 的不断发展和优化,ConcurrentHashMap 也可能会有进一步的改进和变化,开发者需要持续关注相关的技术动态,以更好地适应新的编程需求。