1. 简介

ConcurrentHashMap 是 Java 中的线程安全的哈希表实现,专门用于解决多线程环境下的并发访问问题。它在 JDK 1.5 中首次引入,用来替代同步的 HashMap 和 HashTable。相比于 HashTable 采用 synchronized 关键字对整个哈希表进行加锁的方式,ConcurrentHashMap 采用了分段锁的设计,大大提高了并发性能。

2. 重要的内部结构

2.1 Node 节点

  • 基本存储单元,继承自 Map.Entry
  • 包含 key、value、hash 值和下一个节点的引用
  • volatile 修饰 value 和 next 引用,保证可见性

2.2 数据结构

  • JDK 1.8 之前: Segment 数组 + HashEntry 数组
  • JDK 1.8 之后: Node 数组 + 链表/红黑树
    • 链表长度超过 8 时转换为红黑树
    • 红黑树节点数小于 6 时转回链表

2.3 内部结构示意图(JDK 1.8+)

+-------------------+
| ConcurrentHashMap |
+-------------------+
         |
         v
+-----------------+
| Node[] table    |  // volatile修饰的数组
+-----------------+
 |       |       |
 v       v       v
+---+   +---+   +---+
| 0 |-->| 1 |-->| n |  // 数组索引(桶)
+---+   +---+   +---+
         |
         v
       +---------+    +---------+    +---------+
       | Node    |--->| TreeNode|--->| Node    |  // 链表或红黑树
       | hash=123|    | hash=456|    | hash=789|
       | key=k1  |    | key=k2  |    | key=k3  |
       | val=v1  |    | val=v2  |    | val=v3  |
       +---------+    +---------+    +---------+

3. 关键技术点

3.1 并发控制

  1. CAS(Compare And Swap)操作

    • 用于实现乐观锁
    • 主要应用在 table 初始化、节点添加等操作
  2. synchronized 同步

    • 锁粒度:单个桶(bucket)
    • 只在 hash 冲突时才会使用
  3. volatile 变量

    • 保证 Node 的 value 和 next 的可见性
    • table 数组用 volatile 修饰,保证扩容时的可见性

3.2 重要操作的实现

put 操作

  1. 计算 key 的 hash 值
  2. 如果桶为空,使用 CAS 新建 Node
  3. 如果桶已经初始化:
    • 首节点加 synchronized 锁
    • 遍历链表或红黑树
    • 找到相同 key 则更新,否则插入新节点
    • 检查是否需要树化

put 操作详细流程

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 不允许键值为null
    if (key == null || value == null) throw new NullPointerException();
    // 计算键的哈希值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果表为空,初始化表
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果目标桶为空,直接CAS插入新节点
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // 插入成功,跳出循环
        }
        // 如果桶头是ForwardingNode,表示正在扩容,则协助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 对桶头节点加锁,保证并发安全
            synchronized (f) {
                // 再次检查桶头节点是否变化
                if (tabAt(tab, i) == f) {
                    // 如果是普通链表节点
                    if (fh >= 0) {
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 找到相同的键,更新值
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 到达链表末尾,插入新节点
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 如果是红黑树节点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插入方法
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 检查是否需要转换为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 增加计数并检查是否需要扩容
    addCount(1L, binCount);
    return null;
}

put 操作详细流程

putVal 方法是 ConcurrentHashMap 中实现键值对插入的核心方法,其执行流程可以分为以下几个关键步骤:

1. 参数校验和哈希计算

执行步骤:

  1. 检查键值是否为 null

    • ConcurrentHashMap 不允许键或值为 null,这与 HashMap 不同
    • 如果为 null,直接抛出 NullPointerException 异常
  2. 计算键的哈希值

    • 使用 spread 方法对原始哈希码进行再哈希
    • spread 方法通过位运算将哈希值分散到不同的桶中,减少冲突
    • 这种设计使得哈希分布更均匀,提高了查找效率

2. 表初始化检查

执行步骤:

  1. 检查哈希表是否已初始化
    • 如果表为 null 或长度为 0,则调用 initTable 方法初始化
    • initTable 方法使用 CAS 和自旋保证只有一个线程执行初始化
    • 这种延迟初始化的策略避免了不必要的内存分配

3. 无冲突时的快速插入

执行步骤:

  1. 检查目标桶是否为空

    • 使用 tabAt 方法原子地获取桶头节点
    • tabAt 基于 Unsafe.getObjectVolatile,保证内存可见性
  2. 使用 CAS 操作插入新节点

    • 如果桶为空,尝试通过 CAS 原子地插入新节点
    • 这是一种无锁优化,避免了不必要的同步开销
    • 如果 CAS 成功,直接跳出循环;失败则重试整个过程

4. 协助扩容处理

执行步骤:

  1. 检查是否遇到 ForwardingNode

    • ForwardingNode 的哈希值为 MOVED(-1),表示当前桶已被迁移
    • 这是识别正在进行扩容的标志
  2. 调用 helpTransfer 协助扩容

    • 当前线程会加入扩容过程,协助完成数据迁移
    • 这种多线程协同扩容的设计大大提高了扩容效率
    • 扩容完成后返回新表,继续进行插入操作

5. 有冲突时的同步插入

执行步骤:

  1. 对桶头节点加锁

    • 使用 synchronized 锁定桶头节点,确保同一时刻只有一个线程操作该桶
    • 这种细粒度锁设计大大提高了并发度
  2. 二次检查桶头节点

    • 获取锁后再次检查桶头是否变化,这是典型的双重检查模式
    • 如果桶头已变化,则放弃当前操作,重新进入循环
  3. 根据节点类型执行不同的插入逻辑

    • 对于普通链表节点(hash >= 0):

      • 遍历链表查找是否存在相同的键
      • 如果找到相同的键,则更新值
      • 如果到达链表末尾,则在末尾添加新节点
    • 对于红黑树节点(instanceof TreeBin):

      • 调用 putTreeVal 方法在红黑树中插入或更新节点
      • 红黑树的查找效率为 O(log n),适合处理长链表

6. 转换结构和计数更新

执行步骤:

  1. 检查是否需要将链表转换为红黑树

    • 如果链表长度达到或超过 TREEIFY_THRESHOLD(8),调用 treeifyBin
    • treeifyBin 会检查表的大小,如果小于 MIN_TREEIFY_CAPACITY(64),会优先扩容而非转树
    • 这种设计平衡了空间和时间效率
  2. 增加计数并检查是否需要扩容

    • 调用 addCount 方法增加元素计数并检查是否需要扩容
    • addCount 使用分离计数器机制,避免了单一计数器的竞争
    • 如果需要扩容,会在 addCount 中触发

为什么 put 操作设计得如此复杂?

  1. 并发安全:通过细粒度锁和 CAS 操作保证多线程环境下的数据一致性
  2. 性能优化:针对不同场景采用不同策略,如空桶 CAS、非空桶加锁
  3. 协作扩容:多线程协同完成扩容,提高资源利用率
  4. 动态调整:根据负载情况动态调整数据结构(链表/红黑树)

put 操作与 HashMap 的区别

  1. 并发控制:ConcurrentHashMap 使用 CAS 和 synchronized 保证并发安全,HashMap 非线程安全
  2. null 值处理:ConcurrentHashMap 不允许键值为 null,HashMap 允许
  3. 扩容机制:ConcurrentHashMap 支持多线程协同扩容,HashMap 单线程扩容
  4. 锁粒度:ConcurrentHashMap 锁定单个桶,HashMap 无锁设计

get 操作

  1. 计算 hash 定位到桶
  2. 遍历桶中的链表或红黑树
  3. 利用 volatile 的 value 和 next 保证读取的可见性
  4. 不需要加锁,保证了较高的并发性

get 操作详细流程

flowchart TD
    Start[开始get操作] --> CalcHash[计算key的hash值]
    CalcHash --> LocateBucket[定位到对应桶位置]
    LocateBucket --> CheckBucket{桶是否为空?}
    CheckBucket --是--> ReturnNull[返回null]
    CheckBucket --否--> Traverse[遍历链表/红黑树]
    Traverse --> CompareKey[比较key的hash和equals]
    CompareKey --匹配--> ReturnValue[返回value]
    CompareKey --不匹配--> NextNode{是否有下一个节点?}
    NextNode --是--> Traverse
    NextNode --否--> ReturnNull

size 操作

  1. JDK 1.8 使用 baseCount 和 counterCells 数组
  2. 通过累加 baseCount 和所有 counterCell 来计算
  3. 使用 CAS 更新计数,失败则创建新的 counterCell

4. 扩容机制

4.1 触发条件

  • 当前容量超过阈值
  • 链表长度超过 8 且数组长度小于 64

4.2 扩容步骤

  1. 创建新的 table 数组,大小为原来的 2 倍
  2. 将原数组的节点迁移到新数组
  3. 支持多线程并发扩容
  4. 使用 ForwardingNode 标识已迁移的桶

5. 扩容期间 get 操作实现机制

5.1 双数组查询机制

// ForwardingNode的find方法实现
Node<K,V> find(int h, Object k) {
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
        if (k == null || tab == null || (n = tab.length) == 0)
            break;
        // 重新计算哈希定位新桶位置
        int i = (n - 1) & h;
        if ((e = tabAt(tab, i)) == null)
            break;
        for (;;) {
            int eh; K ek;
            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            if (eh < 0) {
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                    return e.find(h, k);
            }
            if ((e = e.next) == null)
                break;
        }
    }
    return null;
}

5.2 查询流程示意图

sequenceDiagram
    participant GetThread
    participant OldArray
    participant ForwardingNode
    participant NewArray

    GetThread->>OldArray: 发起get请求
    OldArray-->>GetThread: 返回ForwardingNode
    GetThread->>ForwardingNode: 查询nextTable
    ForwardingNode->>NewArray: 转发请求
    NewArray-->>GetThread: 返回查询结果

5.3 实现特点

  1. 无锁读取:全程不使用 synchronized
  2. 版本感知:自动检测数组版本切换
  3. 路径回溯:遇到嵌套扩容时递归查询
  4. 内存屏障:通过 UNSAFE.loadFence 保证读取顺序

5.4 性能优化策略

  • 热点数据优先:首先检查原桶位置
  • 避免缓存穿透:通过 ForwardingNode 路由到新数组
  • 并行探测:允许同时查询新旧数组结构

6. 性能优势

  1. 分段锁设计

    • 细粒度的锁控制
    • 提高并发访问效率
  2. 读操作完全不加锁

    • volatile 保证可见性
    • 大大提升读性能
  3. CAS + synchronized 结合

    • 减少锁竞争
    • 提高并发效率

7. 使用建议

  1. 初始容量设置

    • 根据预估数据量设置初始容量
    • 避免频繁扩容
  2. 负载因子选择

    • 默认 0.75
    • 可根据实际需求调整
  3. 合适的应用场景

    • 高并发读写
    • 对性能要求较高的场景

8. 注意事项

  1. 非空键值

    • 不支持 null 键和 null 值
    • 避免歧义
  2. 迭代器弱一致性

    • iterator 反映创建时的数据状态
    • 不会抛出 ConcurrentModificationException
  3. 内存占用

    • 比 HashMap 占用更多内存
    • 红黑树结构需要额外空间

9. 与其他集合的对比

9.1 vs HashMap

  • 线程安全 vs 非线程安全
  • 空间占用较大 vs 空间占用较小
  • 不允许 null vs 允许 null

9.2 vs HashTable

  • 分段锁 vs 全表锁
  • 性能更高 vs 性能较差
  • 不允许 null vs 不允许 null

10. 总结

ConcurrentHashMap 通过精心的设计实现了高并发性能:

  1. 分段锁机制避免了全表锁定
  2. CAS 操作减少了锁竞争
  3. volatile 保证了可见性
  4. 红黑树优化了查找性能

这些特性使其成为了 Java 并发编程中最重要的数据结构之一。在需要线程安全的场景下,ConcurrentHashMap 通常是最佳选择。

11. ConcurrentHashMap 深度解析

11.1 设计演进

  • JDK7 分段锁设计

    • 采用 Segment 数组 + HashEntry 数组结构
    • 默认 16 个 Segment(并发级别)
    • 每个 Segment 独立 ReentrantLock
  • JDK8 优化设计

    • 数组 + 链表/红黑树
    • 抛弃分段锁,采用 CAS + synchronized
    • 锁粒度细化到链表头节点

11.2 核心数据结构

// JDK8 Node定义
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
}

11.3 并发控制机制

11.3.1 CAS 操作应用场景

  • 数组初始化
  • 空桶插入
  • 扩容触发

11.3.2 synchronized 使用

  • 链表头节点加锁
  • 红黑树根节点加锁

11.4 关键方法解析

11.4.1 put 方法流程

  1. 计算 key 的 hash
  2. 定位数组下标
  3. 根据不同情况处理:
    • 空桶:CAS 插入
    • 非空:synchronized 锁头节点
  4. 检查是否需要树化

11.4.2 get 方法特点

  • 无锁读操作
  • 通过 volatile 保证可见性

11.5 扩容机制

  • 多线程协同扩容
  • 迁移期间保证读写访问
  • 扩容触发条件:
    • 元素数量超过阈值
    • 链表长度超过 8(且数组长度 ≥64)

11.6 性能优化点

  • 减少锁竞争
  • 降低锁粒度
  • 无锁读操作
  • 并发扩容

11.7 与其他集合的对比

特性 HashMap ConcurrentHashMap
线程安全 非线程安全 完全线程安全
锁机制 CAS + synchronized
空值存储 允许 不允许
迭代器 fast-fail 弱一致性

11.8 使用场景

  • 高并发读写
  • 需要保证线程安全的统计场景
  • 缓存实现(结合弱引用)

11.9 示例代码

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 线程安全的putIfAbsent
map.putIfAbsent("key", 1);

// 并行处理
map.forEach(1, (k, v) -> System.out.println(k + ": " + v));

11.10 最佳实践

  1. 合理设置初始容量
  2. 避免过度依赖 size() 方法
  3. 使用 search 方法进行并行查找
  4. 优先使用 computeIfAbsent 原子操作

12. 高频面试题

12.1 ConcurrentHashMap 在 JDK1.8 中为什么放弃分段锁?

  1. 内存消耗:分段锁需要维护多个 Segment 对象,占用更多内存
  2. 并发粒度:分段锁的并发度受 Segment 数量限制(默认 16),而 JDK1.8 的锁粒度细化到桶级别
  3. 性能优化:synchronized 在 JDK1.6 后做了大量优化(偏向锁、轻量级锁),性能接近 ReentrantLock
  4. 数据结构:链表转红黑树的优化需要更细粒度的同步控制

12.2 ConcurrentHashMap 的 size()方法为什么可能不准确?

  1. 无锁设计:使用 baseCount 和 CounterCell[]组合计数,避免全局锁
  2. 并发更新:统计时可能有线程正在更新 counterCells
  3. 最终一致性:返回的是近似值而非精确快照
  4. 性能权衡:精确统计需要全局锁,会降低并发性能

12.3 多线程如何协作完成扩容?

  1. 线程协助:当 put 操作遇到 ForwardingNode 时,当前线程会加入扩容
  2. 任务划分:每个线程负责 16 个桶的迁移(通过 transferIndex 分配)
  3. 进度跟踪:通过 sizeCtl 字段记录扩容状态和线程数量
  4. 重复检测:使用 ForwardingNode 标识已处理的桶

12.4 ForwardingNode 的作用是什么?

  1. 扩容标识:标记当前桶已完成迁移
  2. 查询转发:get 操作遇到时自动跳转到新数组
  3. 线程协调:引导其他线程参与扩容工作
  4. 哈希优化:特殊 hash 值(MOVED=-1)快速识别

12.5 为什么使用 synchronized 而不是 ReentrantLock?

  1. 内存开销:synchronized 的锁升级机制更节省内存
  2. JVM 优化:锁消除、锁粗化等 JVM 内置优化
  3. 性能提升:JDK1.6 后 synchronized 性能大幅提升
  4. 代码简化:避免显式加锁/解锁的代码复杂度

12.6 initTable 方法如何保证线程安全?

  1. sizeCtl 控制:通过 CAS 将 sizeCtl 设为-1 标识初始化中
  2. 双重检查锁
if ((tab = table) == null || tab.length == 0) {
    while ((sc = sizeCtl) < 0)
        Thread.yield();
    if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
        try {
            // 初始化逻辑
        } finally {
            sizeCtl = sc;
        }
    }
}
  1. volatile 可见性:table 字段用 volatile 修饰
  2. CAS 保障:确保只有一个线程执行初始化

12.7 为什么链表转红黑树的阈值是 8?

  1. 泊松分布:根据哈希碰撞概率,链表长度达到 8 的概率小于千万分之一
  2. 空间权衡:树节点占用空间是普通节点的两倍
  3. 性能平衡:查找时间复杂度从 O(n)降到 O(logn)
  4. 退化机制:删除节点时若长度<6 会转回链表

12.8 get 操作为什么不需要加锁?

  1. volatile 保证:Node 的 val 和 next 都用 volatile 修饰
  2. 不可变对象:key 和 hash 在创建后不可修改
  3. 安全访问:通过 UNSAFE.getObjectVolatile 保证可见性
  4. 结构稳定性:查询时遇到扩容会自动转发到新表

12.9 ConcurrentHashMap 的迭代器有什么特点?

  1. 弱一致性:反映创建迭代器时或之后的更新
  2. 无快速失败:不会抛出 ConcurrentModificationException
  3. 分段遍历:按桶顺序遍历,可能跳过正在修改的桶
  4. 安全隔离:迭代器持有数组的快照引用

12.10 如何保证多线程 put 操作的原子性?

  1. CAS 初始化:空桶插入使用 CAS
  2. synchronized 锁:哈希冲突时锁住桶头节点
  3. 扩容协作:遇到扩容时先协助完成迁移
  4. 版本控制:通过 sizeCtl 检测并发修改
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break;
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            // 同步代码块...
        }
    }
    addCount(1L, binCount);
    return null;
}

12.11 addCount 方法的工作原理是什么?

  1. 双重计数机制:使用 baseCount 和 counterCells 数组实现高并发计数
  2. CAS 优先:优先尝试更新 baseCount,失败后才使用 counterCells
  3. 热点分散:通过线程哈希值选择不同的 CounterCell,减少竞争
  4. 扩容触发:同时负责检查并触发表扩容操作
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 尝试直接更新baseCount
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // 如果counterCells为空,或者当前线程对应的cell为空,
        // 或者CAS更新cell失败,则调用fullAddCount
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    // 检查是否需要扩容
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 当元素数量超过阈值,并且表长度小于最大容量时进行扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            // 如果已经在扩容中
            if (sc < 0) {
                // 检查是否可以加入扩容
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 加入扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 当前没有线程在扩容,则发起扩容
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

addCount 方法执行流程

addCount 方法的执行可以分为两个主要部分:计数更新和扩容检查。

1. 计数更新部分

执行步骤:

  1. 首先尝试直接通过 CAS 操作更新 baseCount

    • 这是最简单高效的路径,适用于并发度较低的情况
    • 使用 CAS 避免了锁的开销,提高了性能
  2. 如果 baseCount 更新失败或 counterCells 已存在,则尝试更新 counterCells

    • 当多线程同时更新时,CAS 操作容易失败,此时需要分散热点
    • counterCells 数组将计数压力分散到多个单元格,减少竞争
  3. 根据当前线程的哈希值选择一个 CounterCell

    • 使用 ThreadLocalRandom.getProbe()获取线程标识
    • 不同线程倾向于更新不同的 CounterCell,降低冲突概率
  4. 如果选中的 CounterCell 为空或 CAS 更新失败,则调用 fullAddCount 方法处理

    • fullAddCount 会处理更复杂的情况,如创建新的 CounterCell
    • 包含退避和重试机制,确保最终能够成功更新计数
  5. 如果 check 参数小于等于 1,直接返回,否则计算总数

    • 这是一个优化,避免不必要的总数计算
    • 只有在可能需要扩容时才计算总数

2. 扩容检查部分

执行步骤:

  1. 当 check >= 0 时,检查是否需要扩容

    • check 参数通常是 binCount(桶中的节点数量)
    • 负值表示不需要检查扩容,是一种优化
  2. 计算当前表的大小标记值 resizeStamp(n)

    • 这个值包含了数组长度的信息
    • 用于标识特定大小的扩容操作,确保扩容的一致性
  3. 如果 sizeCtl 为负数,表示已经有线程在进行扩容

    • sizeCtl < 0 是扩容进行中的标志
    • 此时检查是否可以协助扩容,而不是启动新的扩容
  4. 检查多个条件决定是否可以协助扩容

    • 验证扩容标记是否匹配当前表大小
    • 检查是否达到最大扩容线程数
    • 确认 nextTable 不为空且 transferIndex > 0
  5. 如果条件允许,则通过 CAS 增加扩容线程计数,并调用 transfer 方法协助扩容

    • 多线程协同扩容是 ConcurrentHashMap 的一大特色
    • 这种设计使得扩容操作能够并行执行,提高效率
  6. 如果 sizeCtl 不为负数,表示当前没有线程在扩容

    • 此时需要启动新的扩容操作
    • 通过 CAS 将 sizeCtl 设置为负数,标记开始扩容
  7. 调用 transfer 方法开始扩容操作

    • transfer 方法负责实际的节点迁移工作
    • 传入 null 作为 nextTable 参数,表示需要创建新表
  8. 扩容完成后,重新计算元素总数,检查是否需要再次扩容

    • 这是一个循环检查,确保容量满足需求
    • 在高并发添加的情况下可能需要连续多次扩容

为什么需要这么复杂的计数机制?

  1. 避免热点竞争:如果只使用一个计数器,在高并发下会成为性能瓶颈
  2. 减少 CAS 失败:通过分散计数到多个 cell,减少了 CAS 操作的冲突概率
  3. 提高并发度:不同线程可以同时更新不同的 CounterCell,提高并发性能
  4. 弱一致性:size()方法不需要获取全局锁,只需要累加所有计数值

与 LongAdder 的关系

ConcurrentHashMap 的计数机制与 JDK 8 中的 LongAdder 实现原理非常相似,都采用了分散热点的设计思想:

  1. 都使用 baseCount/base 作为初始计数器
  2. 都使用 Cell/CounterCell 数组分散计数
  3. 都通过线程探针值(probe)选择对应的 Cell
  4. 都在 CAS 失败时进行退避和重试

12.11 fullAddCount 方法的工作原理是什么?

  1. 复杂计数的后备方案:当 addCount 中的快速路径失败时被调用
  2. 创建和初始化 CounterCells:负责 CounterCells 数组的初始化
  3. 处理竞争:通过退避和重试机制处理高并发下的竞争
  4. 动态扩容:在竞争激烈时扩容 CounterCells 数组
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        if ((as = counterCells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}

fullAddCount 方法执行流程

fullAddCount 方法的执行流程比 addCount 更加复杂,它需要处理多种情况:

1. 线程标识初始化

执行步骤:

  1. 检查当前线程是否有有效的探针值(probe)

    • 探针值用于确定线程应该使用哪个 CounterCell
    • 如果探针值为 0,表示尚未初始化
  2. 如果探针值为 0,则初始化线程的随机数生成器

    • 调用 ThreadLocalRandom.localInit()强制初始化
    • 获取新的探针值,并将 wasUncontended 设为 true
    • 这确保每个线程都有唯一的标识,减少冲突

2. 主循环处理

fullAddCount 的核心是一个无限循环,根据不同情况执行不同的分支:

2.1 CounterCells 数组已存在

当 counterCells 数组已经初始化时:

  1. 尝试在对应槽位创建新的 CounterCell

    • 如果当前线程对应的槽位为空,尝试创建新的 CounterCell
    • 使用 cellsBusy 作为锁,确保只有一个线程可以修改结构
    • 创建成功后直接退出循环
  2. 处理已知的竞争情况

    • 如果 wasUncontended 为 false,表示之前已经发生过竞争
    • 将 wasUncontended 设为 true,准备在 rehash 后重试
  3. 尝试 CAS 更新已存在的 CounterCell

    • 如果对应槽位已有 CounterCell,尝试直接 CAS 更新其值
    • 成功则退出循环,失败则继续处理
  4. 检查是否需要扩容

    • 如果 CAS 失败且数组未过期且未达到最大大小,考虑扩容
    • 使用 collide 标志控制是否进行扩容
    • 第一次冲突时设置 collide 为 true,第二次冲突时才扩容
  5. 执行扩容操作

    • 获取 cellsBusy 锁后,创建一个两倍大小的新数组
    • 将原数组元素复制到新数组
    • 扩容后重置 collide 标志并继续循环
  6. 更新线程探针值

    • 如果以上操作都未成功,则更新线程的探针值
    • 使用 ThreadLocalRandom.advanceProbe(h)生成新的哈希值
    • 这相当于 rehash,目的是减少下次冲突的可能性

2.2 CounterCells 数组不存在

当 counterCells 数组尚未初始化时:

  1. 尝试初始化 CounterCells 数组

    • 获取 cellsBusy 锁后创建大小为 2 的新数组
    • 根据线程探针值选择一个槽位并初始化
    • 初始化成功后退出循环
  2. 回退到 baseCount

    • 如果无法获取锁或初始化失败,尝试直接更新 baseCount
    • 这是最后的回退方案,确保计数一定能够更新

为什么需要 fullAddCount 方法?

  1. 处理复杂场景:addCount 方法只处理简单场景,复杂情况下委托给 fullAddCount
  2. 初始化 CounterCells:负责 CounterCells 数组的创建和初始化
  3. 动态调整:根据并发情况动态扩容 CounterCells 数组
  4. 退避与重试:实现了退避和重试机制,减少高并发下的冲突

fullAddCount 与 LongAdder 的 longAccumulate 方法

fullAddCount 方法的设计与 JDK 8 中 LongAdder 的 longAccumulate 方法几乎相同:

  1. 都使用无限循环处理各种情况
  2. 都实现了 Cell 数组的初始化、更新和扩容
  3. 都使用线程探针值进行哈希定位
  4. 都实现了退避和重试机制
  5. 都在最后提供了回退到 base 值的机制

这种设计使得 ConcurrentHashMap 的计数操作在高并发环境下具有出色的性能,避免了单一计数器可能导致的严重竞争问题。

13. 最佳实践

  1. 建议设置初始容量避免频繁扩容
  2. 预估并发量合理设置并发级别
  3. 尽量使用不可变对象作为 key
  4. 关注 size()与 mappingCount()的区别
  5. 使用 forEach 系列方法进行批量操作

11.5 扩容机制

  • 多线程协同扩容
  • 迁移期间保证读写访问
  • 扩容触发条件:
    • 元素数量超过阈值
    • 链表长度超过 8(且数组长度 ≥64)

11.5.1 transfer 方法源码

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 计算每个线程应该处理的桶数量,最小为16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // 预设值16
    // 如果新表为空,则创建一个原表两倍大小的新表
    if (nextTab == null) {
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // 处理内存溢出
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;  // 用于分配任务的索引
    }
    int nextn = nextTab.length;
    // 创建一个特殊节点,标记当前桶已迁移完成
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 标记是否需要继续推进任务
    boolean advance = true;
    // 标记是否已完成所有迁移工作
    boolean finishing = false;
    // i是当前处理的桶索引,bound是当前任务的下界
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 分配或前进任务
        while (advance) {
            int nextIndex, nextBound;
            // 如果已经处理完当前区间或正在完成
            if (--i >= bound || finishing)
                advance = false;
            // 如果没有更多的桶需要处理
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // CAS尝试更新transferIndex,为当前线程分配任务区间
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 检查是否完成迁移
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 如果所有桶都已处理完
            if (finishing) {
                nextTable = null;
                table = nextTab;  // 替换为新表
                sizeCtl = (n << 1) - (n >>> 1); // 新的阈值为新容量的0.75倍
                return;
            }
            // 当前线程完成任务,尝试减少活动扩容线程计数
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 检查是否是最后一个扩容线程
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 所有线程都已完成,标记结束
                finishing = advance = true;
                i = n; // 再次检查所有桶
            }
        }
        // 如果当前桶为空,放入ForwardingNode标记
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 如果当前桶已经是ForwardingNode,说明已被处理
        else if ((fh = f.hash) == MOVED)
            advance = true;
        // 处理当前桶
        else {
            // 锁定当前桶的头节点
            synchronized (f) {
                // 再次检查头节点是否变化
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 如果是链表节点
                    if (fh >= 0) {
                        // 根据节点hash值的高位(n的大小对应的位),将链表分为两部分
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        // 寻找最后一段相同高位的节点序列
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        // 根据高位是否为0,决定放入低位表还是高位表
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        // 遍历链表,根据高位分组
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 将低位链表放入新表的i位置
                        setTabAt(nextTab, i, ln);
                        // 将高位链表放入新表的i+n位置
                        setTabAt(nextTab, i + n, hn);
                        // 在旧表中标记该桶已迁移
                        setTabAt(tab, i, fwd);
                        // 继续处理下一个桶
                        advance = true;
                    }
                    // 如果是红黑树节点
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        // 遍历树节点,根据高位分组
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 根据节点数量决定是构建红黑树还是链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // 将低位树/链表放入新表的i位置
                        setTabAt(nextTab, i, ln);
                        // 将高位树/链表放入新表的i+n位置
                        setTabAt(nextTab, i + n, hn);
                        // 在旧表中标记该桶已迁移
                        setTabAt(tab, i, fwd);
                        // 继续处理下一个桶
                        advance = true;
                    }
                }
            }
        }
    }
}

11.5.2 扩容流程详解

ConcurrentHashMap 的扩容是一个复杂的过程,它支持多线程协同工作,主要流程如下:

1. 任务分配机制

执行步骤:

  1. 计算任务步长
  • 根据 CPU 核心数计算每个线程应处理的桶数量(stride)
  • 最小为 16 个桶,确保任务粒度合适
  1. 创建新表
  • 如果是第一个扩容线程,创建一个大小为原表两倍的新表
  • 将 nextTable 引用指向新表,作为全局共享变量
  1. 任务分配
  • 使用 transferIndex 作为分配指针,初始值为原表长度
  • 线程通过 CAS 更新 transferIndex,为自己分配一段连续的桶区间
  • 每个线程处理的区间为[nextBound, nextIndex-1]

2. 数据迁移策略

执行步骤:

  1. 创建 ForwardingNode
  • 这是一个特殊节点,哈希值为 MOVED(-1)
  • 用于标记当前桶已完成迁移,并引导其他操作到新表
  1. 处理空桶
  • 如果桶为空,直接放入 ForwardingNode 标记已处理
  • 使用 CAS 操作确保并发安全
  1. 处理非空桶
  • 对桶头节点加 synchronized 锁,确保独占访问
  • 再次检查桶头是否变化,实现双重检查锁定模式
  1. 链表迁移
  • 根据节点哈希值与桶数量 n 的按位与结果,将链表分为两部分:
  • 结果为 0 的节点:放入新表的相同位置 i
  • 结果为 1 的节点:放入新表的 i+n 位置
  • 这种分配方式确保了哈希分布的均匀性
  1. 红黑树迁移
  • 类似链表,将树节点分为两组
  • 根据分组后的节点数量决定是构建新的红黑树还是转回链表
  • 如果节点数小于等于 UNTREEIFY_THRESHOLD(6),则转为链表
  1. 标记完成
  • 处理完当前桶后,将旧表中的桶设置为 ForwardingNode
  • 这样其他线程的读操作会被转发到新表

3. 扩容完成处理

执行步骤:

  1. 线程完成检测
  • 当线程完成自己的任务区间,通过 CAS 减少 sizeCtl 中的线程计数
  • 检查是否是最后一个扩容线程
  1. 最终检查
  • 最后一个线程会设置 finishing 标志
  • 再次检查所有桶,确保没有遗漏
  1. 表切换
  • 所有桶处理完成后,将 table 引用指向新表
  • 设置新的 sizeCtl 阈值(新容量的 0.75 倍)
  • 清除 nextTable 引用

4. 并发控制机制

  1. 读操作处理
  • 读操作遇到 ForwardingNode 时会自动跳转到新表继续查找
  • 这确保了扩容过程中读操作的正确性
  1. 写操作处理
  • 写操作遇到 ForwardingNode 时会帮助扩容
  • 这种协作机制加速了扩容过程
  1. 任务协调
  • 通过 sizeCtl 字段记录参与扩容的线程数
  • 通过 resizeStamp 记录当前扩容操作的标识

11.5.3 扩容触发条件

ConcurrentHashMap 的扩容会在以下情况触发:

  1. 元素数量超过阈值
  • 在 put 操作的最后会调用 addCount 方法增加计数
  • 当元素数量超过 sizeCtl(阈值)时触发扩容
  1. 链表过长
  • 当链表长度达到 TREEIFY_THRESHOLD(8)时
  • 如果表长度小于 MIN_TREEIFY_CAPACITY(64),优先触发扩容而非树化

11.5.4 为什么采用多线程扩容?

  1. 提高扩容效率
  • 在大容量表的情况下,单线程扩容会导致严重的性能下降
  • 多线程协同工作可以充分利用多核 CPU 资源
  1. 减少阻塞时间
  • 扩容过程中,其他线程可以协助完成扩容,而不是等待
  • 这种设计减少了因扩容导致的操作阻塞
  1. 负载均衡
  • 通过 stride 步长控制,合理分配任务
  • 避免单个线程负担过重

11.5.5 扩容过程中的并发安全

  1. ForwardingNode 机制
  • 标记已迁移的桶,引导读写操作到新表
  • 确保扩容过程中的操作正确性
  1. 双重检查锁定
  • 在获取锁后再次检查桶头节点,避免不必要的操作
  1. CAS 操作
  • 使用 CAS 更新 transferIndex 分配任务
  • 使用 CAS 更新 sizeCtl 记录扩容状态和线程数
  1. volatile 保证
  • nextTable 和 transferIndex 等字段使用 volatile 修饰
  • 确保多线程之间的可见性