ConcurrentHashMap详解
1. 简介
ConcurrentHashMap 是 Java 中的线程安全的哈希表实现,专门用于解决多线程环境下的并发访问问题。它在 JDK 1.5 中首次引入,用来替代同步的 HashMap 和 HashTable。相比于 HashTable 采用 synchronized 关键字对整个哈希表进行加锁的方式,ConcurrentHashMap 采用了分段锁的设计,大大提高了并发性能。
2. 重要的内部结构
2.1 Node 节点
- 基本存储单元,继承自 Map.Entry
- 包含 key、value、hash 值和下一个节点的引用
- volatile 修饰 value 和 next 引用,保证可见性
2.2 数据结构
- JDK 1.8 之前: Segment 数组 + HashEntry 数组
- JDK 1.8 之后: Node 数组 + 链表/红黑树
- 链表长度超过 8 时转换为红黑树
- 红黑树节点数小于 6 时转回链表
2.3 内部结构示意图(JDK 1.8+)
+-------------------+
| ConcurrentHashMap |
+-------------------+
|
v
+-----------------+
| Node[] table | // volatile修饰的数组
+-----------------+
| | |
v v v
+---+ +---+ +---+
| 0 |-->| 1 |-->| n | // 数组索引(桶)
+---+ +---+ +---+
|
v
+---------+ +---------+ +---------+
| Node |--->| TreeNode|--->| Node | // 链表或红黑树
| hash=123| | hash=456| | hash=789|
| key=k1 | | key=k2 | | key=k3 |
| val=v1 | | val=v2 | | val=v3 |
+---------+ +---------+ +---------+
3. 关键技术点
3.1 并发控制
-
CAS(Compare And Swap)操作
- 用于实现乐观锁
- 主要应用在 table 初始化、节点添加等操作
-
synchronized 同步
- 锁粒度:单个桶(bucket)
- 只在 hash 冲突时才会使用
-
volatile 变量
- 保证 Node 的 value 和 next 的可见性
- table 数组用 volatile 修饰,保证扩容时的可见性
3.2 重要操作的实现
put 操作
- 计算 key 的 hash 值
- 如果桶为空,使用 CAS 新建 Node
- 如果桶已经初始化:
- 首节点加 synchronized 锁
- 遍历链表或红黑树
- 找到相同 key 则更新,否则插入新节点
- 检查是否需要树化
put 操作详细流程
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许键值为null
if (key == null || value == null) throw new NullPointerException();
// 计算键的哈希值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果表为空,初始化表
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果目标桶为空,直接CAS插入新节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // 插入成功,跳出循环
}
// 如果桶头是ForwardingNode,表示正在扩容,则协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 对桶头节点加锁,保证并发安全
synchronized (f) {
// 再次检查桶头节点是否变化
if (tabAt(tab, i) == f) {
// 如果是普通链表节点
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到相同的键,更新值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 到达链表末尾,插入新节点
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是红黑树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用红黑树的插入方法
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 检查是否需要转换为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 增加计数并检查是否需要扩容
addCount(1L, binCount);
return null;
}
put 操作详细流程
putVal 方法是 ConcurrentHashMap 中实现键值对插入的核心方法,其执行流程可以分为以下几个关键步骤:
1. 参数校验和哈希计算
执行步骤:
-
检查键值是否为 null
- ConcurrentHashMap 不允许键或值为 null,这与 HashMap 不同
- 如果为 null,直接抛出 NullPointerException 异常
-
计算键的哈希值
- 使用 spread 方法对原始哈希码进行再哈希
- spread 方法通过位运算将哈希值分散到不同的桶中,减少冲突
- 这种设计使得哈希分布更均匀,提高了查找效率
2. 表初始化检查
执行步骤:
- 检查哈希表是否已初始化
- 如果表为 null 或长度为 0,则调用 initTable 方法初始化
- initTable 方法使用 CAS 和自旋保证只有一个线程执行初始化
- 这种延迟初始化的策略避免了不必要的内存分配
3. 无冲突时的快速插入
执行步骤:
-
检查目标桶是否为空
- 使用 tabAt 方法原子地获取桶头节点
- tabAt 基于 Unsafe.getObjectVolatile,保证内存可见性
-
使用 CAS 操作插入新节点
- 如果桶为空,尝试通过 CAS 原子地插入新节点
- 这是一种无锁优化,避免了不必要的同步开销
- 如果 CAS 成功,直接跳出循环;失败则重试整个过程
4. 协助扩容处理
执行步骤:
-
检查是否遇到 ForwardingNode
- ForwardingNode 的哈希值为 MOVED(-1),表示当前桶已被迁移
- 这是识别正在进行扩容的标志
-
调用 helpTransfer 协助扩容
- 当前线程会加入扩容过程,协助完成数据迁移
- 这种多线程协同扩容的设计大大提高了扩容效率
- 扩容完成后返回新表,继续进行插入操作
5. 有冲突时的同步插入
执行步骤:
-
对桶头节点加锁
- 使用 synchronized 锁定桶头节点,确保同一时刻只有一个线程操作该桶
- 这种细粒度锁设计大大提高了并发度
-
二次检查桶头节点
- 获取锁后再次检查桶头是否变化,这是典型的双重检查模式
- 如果桶头已变化,则放弃当前操作,重新进入循环
-
根据节点类型执行不同的插入逻辑
-
对于普通链表节点(hash >= 0):
- 遍历链表查找是否存在相同的键
- 如果找到相同的键,则更新值
- 如果到达链表末尾,则在末尾添加新节点
-
对于红黑树节点(instanceof TreeBin):
- 调用 putTreeVal 方法在红黑树中插入或更新节点
- 红黑树的查找效率为 O(log n),适合处理长链表
-
6. 转换结构和计数更新
执行步骤:
-
检查是否需要将链表转换为红黑树
- 如果链表长度达到或超过 TREEIFY_THRESHOLD(8),调用 treeifyBin
- treeifyBin 会检查表的大小,如果小于 MIN_TREEIFY_CAPACITY(64),会优先扩容而非转树
- 这种设计平衡了空间和时间效率
-
增加计数并检查是否需要扩容
- 调用 addCount 方法增加元素计数并检查是否需要扩容
- addCount 使用分离计数器机制,避免了单一计数器的竞争
- 如果需要扩容,会在 addCount 中触发
为什么 put 操作设计得如此复杂?
- 并发安全:通过细粒度锁和 CAS 操作保证多线程环境下的数据一致性
- 性能优化:针对不同场景采用不同策略,如空桶 CAS、非空桶加锁
- 协作扩容:多线程协同完成扩容,提高资源利用率
- 动态调整:根据负载情况动态调整数据结构(链表/红黑树)
put 操作与 HashMap 的区别
- 并发控制:ConcurrentHashMap 使用 CAS 和 synchronized 保证并发安全,HashMap 非线程安全
- null 值处理:ConcurrentHashMap 不允许键值为 null,HashMap 允许
- 扩容机制:ConcurrentHashMap 支持多线程协同扩容,HashMap 单线程扩容
- 锁粒度:ConcurrentHashMap 锁定单个桶,HashMap 无锁设计
get 操作
- 计算 hash 定位到桶
- 遍历桶中的链表或红黑树
- 利用 volatile 的 value 和 next 保证读取的可见性
- 不需要加锁,保证了较高的并发性
get 操作详细流程
flowchart TD
Start[开始get操作] --> CalcHash[计算key的hash值]
CalcHash --> LocateBucket[定位到对应桶位置]
LocateBucket --> CheckBucket{桶是否为空?}
CheckBucket --是--> ReturnNull[返回null]
CheckBucket --否--> Traverse[遍历链表/红黑树]
Traverse --> CompareKey[比较key的hash和equals]
CompareKey --匹配--> ReturnValue[返回value]
CompareKey --不匹配--> NextNode{是否有下一个节点?}
NextNode --是--> Traverse
NextNode --否--> ReturnNull
size 操作
- JDK 1.8 使用 baseCount 和 counterCells 数组
- 通过累加 baseCount 和所有 counterCell 来计算
- 使用 CAS 更新计数,失败则创建新的 counterCell
4. 扩容机制
4.1 触发条件
- 当前容量超过阈值
- 链表长度超过 8 且数组长度小于 64
4.2 扩容步骤
- 创建新的 table 数组,大小为原来的 2 倍
- 将原数组的节点迁移到新数组
- 支持多线程并发扩容
- 使用 ForwardingNode 标识已迁移的桶
5. 扩容期间 get 操作实现机制
5.1 双数组查询机制
// ForwardingNode的find方法实现
Node<K,V> find(int h, Object k) {
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0)
break;
// 重新计算哈希定位新桶位置
int i = (n - 1) & h;
if ((e = tabAt(tab, i)) == null)
break;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
break;
}
}
return null;
}
5.2 查询流程示意图
sequenceDiagram
participant GetThread
participant OldArray
participant ForwardingNode
participant NewArray
GetThread->>OldArray: 发起get请求
OldArray-->>GetThread: 返回ForwardingNode
GetThread->>ForwardingNode: 查询nextTable
ForwardingNode->>NewArray: 转发请求
NewArray-->>GetThread: 返回查询结果
5.3 实现特点
- 无锁读取:全程不使用 synchronized
- 版本感知:自动检测数组版本切换
- 路径回溯:遇到嵌套扩容时递归查询
- 内存屏障:通过 UNSAFE.loadFence 保证读取顺序
5.4 性能优化策略
- 热点数据优先:首先检查原桶位置
- 避免缓存穿透:通过 ForwardingNode 路由到新数组
- 并行探测:允许同时查询新旧数组结构
6. 性能优势
-
分段锁设计
- 细粒度的锁控制
- 提高并发访问效率
-
读操作完全不加锁
- volatile 保证可见性
- 大大提升读性能
-
CAS + synchronized 结合
- 减少锁竞争
- 提高并发效率
7. 使用建议
-
初始容量设置
- 根据预估数据量设置初始容量
- 避免频繁扩容
-
负载因子选择
- 默认 0.75
- 可根据实际需求调整
-
合适的应用场景
- 高并发读写
- 对性能要求较高的场景
8. 注意事项
-
非空键值
- 不支持 null 键和 null 值
- 避免歧义
-
迭代器弱一致性
- iterator 反映创建时的数据状态
- 不会抛出 ConcurrentModificationException
-
内存占用
- 比 HashMap 占用更多内存
- 红黑树结构需要额外空间
9. 与其他集合的对比
9.1 vs HashMap
- 线程安全 vs 非线程安全
- 空间占用较大 vs 空间占用较小
- 不允许 null vs 允许 null
9.2 vs HashTable
- 分段锁 vs 全表锁
- 性能更高 vs 性能较差
- 不允许 null vs 不允许 null
10. 总结
ConcurrentHashMap 通过精心的设计实现了高并发性能:
- 分段锁机制避免了全表锁定
- CAS 操作减少了锁竞争
- volatile 保证了可见性
- 红黑树优化了查找性能
这些特性使其成为了 Java 并发编程中最重要的数据结构之一。在需要线程安全的场景下,ConcurrentHashMap 通常是最佳选择。
11. ConcurrentHashMap 深度解析
11.1 设计演进
-
JDK7 分段锁设计
- 采用 Segment 数组 + HashEntry 数组结构
- 默认 16 个 Segment(并发级别)
- 每个 Segment 独立 ReentrantLock
-
JDK8 优化设计
- 数组 + 链表/红黑树
- 抛弃分段锁,采用 CAS + synchronized
- 锁粒度细化到链表头节点
11.2 核心数据结构
// JDK8 Node定义
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
11.3 并发控制机制
11.3.1 CAS 操作应用场景
- 数组初始化
- 空桶插入
- 扩容触发
11.3.2 synchronized 使用
- 链表头节点加锁
- 红黑树根节点加锁
11.4 关键方法解析
11.4.1 put 方法流程
- 计算 key 的 hash
- 定位数组下标
- 根据不同情况处理:
- 空桶:CAS 插入
- 非空:synchronized 锁头节点
- 检查是否需要树化
11.4.2 get 方法特点
- 无锁读操作
- 通过 volatile 保证可见性
11.5 扩容机制
- 多线程协同扩容
- 迁移期间保证读写访问
- 扩容触发条件:
- 元素数量超过阈值
- 链表长度超过 8(且数组长度 ≥64)
11.6 性能优化点
- 减少锁竞争
- 降低锁粒度
- 无锁读操作
- 并发扩容
11.7 与其他集合的对比
| 特性 | HashMap | ConcurrentHashMap |
|---|---|---|
| 线程安全 | 非线程安全 | 完全线程安全 |
| 锁机制 | 无 | CAS + synchronized |
| 空值存储 | 允许 | 不允许 |
| 迭代器 | fast-fail | 弱一致性 |
11.8 使用场景
- 高并发读写
- 需要保证线程安全的统计场景
- 缓存实现(结合弱引用)
11.9 示例代码
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 线程安全的putIfAbsent
map.putIfAbsent("key", 1);
// 并行处理
map.forEach(1, (k, v) -> System.out.println(k + ": " + v));
11.10 最佳实践
- 合理设置初始容量
- 避免过度依赖 size() 方法
- 使用 search 方法进行并行查找
- 优先使用 computeIfAbsent 原子操作
12. 高频面试题
12.1 ConcurrentHashMap 在 JDK1.8 中为什么放弃分段锁?
答:
- 内存消耗:分段锁需要维护多个 Segment 对象,占用更多内存
- 并发粒度:分段锁的并发度受 Segment 数量限制(默认 16),而 JDK1.8 的锁粒度细化到桶级别
- 性能优化:synchronized 在 JDK1.6 后做了大量优化(偏向锁、轻量级锁),性能接近 ReentrantLock
- 数据结构:链表转红黑树的优化需要更细粒度的同步控制
12.2 ConcurrentHashMap 的 size()方法为什么可能不准确?
答:
- 无锁设计:使用 baseCount 和 CounterCell[]组合计数,避免全局锁
- 并发更新:统计时可能有线程正在更新 counterCells
- 最终一致性:返回的是近似值而非精确快照
- 性能权衡:精确统计需要全局锁,会降低并发性能
12.3 多线程如何协作完成扩容?
答:
- 线程协助:当 put 操作遇到 ForwardingNode 时,当前线程会加入扩容
- 任务划分:每个线程负责 16 个桶的迁移(通过 transferIndex 分配)
- 进度跟踪:通过 sizeCtl 字段记录扩容状态和线程数量
- 重复检测:使用 ForwardingNode 标识已处理的桶
12.4 ForwardingNode 的作用是什么?
答:
- 扩容标识:标记当前桶已完成迁移
- 查询转发:get 操作遇到时自动跳转到新数组
- 线程协调:引导其他线程参与扩容工作
- 哈希优化:特殊 hash 值(MOVED=-1)快速识别
12.5 为什么使用 synchronized 而不是 ReentrantLock?
答:
- 内存开销:synchronized 的锁升级机制更节省内存
- JVM 优化:锁消除、锁粗化等 JVM 内置优化
- 性能提升:JDK1.6 后 synchronized 性能大幅提升
- 代码简化:避免显式加锁/解锁的代码复杂度
12.6 initTable 方法如何保证线程安全?
答:
- sizeCtl 控制:通过 CAS 将 sizeCtl 设为-1 标识初始化中
- 双重检查锁:
if ((tab = table) == null || tab.length == 0) {
while ((sc = sizeCtl) < 0)
Thread.yield();
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 初始化逻辑
} finally {
sizeCtl = sc;
}
}
}
- volatile 可见性:table 字段用 volatile 修饰
- CAS 保障:确保只有一个线程执行初始化
12.7 为什么链表转红黑树的阈值是 8?
答:
- 泊松分布:根据哈希碰撞概率,链表长度达到 8 的概率小于千万分之一
- 空间权衡:树节点占用空间是普通节点的两倍
- 性能平衡:查找时间复杂度从 O(n)降到 O(logn)
- 退化机制:删除节点时若长度<6 会转回链表
12.8 get 操作为什么不需要加锁?
答:
- volatile 保证:Node 的 val 和 next 都用 volatile 修饰
- 不可变对象:key 和 hash 在创建后不可修改
- 安全访问:通过 UNSAFE.getObjectVolatile 保证可见性
- 结构稳定性:查询时遇到扩容会自动转发到新表
12.9 ConcurrentHashMap 的迭代器有什么特点?
答:
- 弱一致性:反映创建迭代器时或之后的更新
- 无快速失败:不会抛出 ConcurrentModificationException
- 分段遍历:按桶顺序遍历,可能跳过正在修改的桶
- 安全隔离:迭代器持有数组的快照引用
12.10 如何保证多线程 put 操作的原子性?
答:
- CAS 初始化:空桶插入使用 CAS
- synchronized 锁:哈希冲突时锁住桶头节点
- 扩容协作:遇到扩容时先协助完成迁移
- 版本控制:通过 sizeCtl 检测并发修改
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
// 同步代码块...
}
}
addCount(1L, binCount);
return null;
}
12.11 addCount 方法的工作原理是什么?
答:
- 双重计数机制:使用 baseCount 和 counterCells 数组实现高并发计数
- CAS 优先:优先尝试更新 baseCount,失败后才使用 counterCells
- 热点分散:通过线程哈希值选择不同的 CounterCell,减少竞争
- 扩容触发:同时负责检查并触发表扩容操作
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 尝试直接更新baseCount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果counterCells为空,或者当前线程对应的cell为空,
// 或者CAS更新cell失败,则调用fullAddCount
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// 检查是否需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 当元素数量超过阈值,并且表长度小于最大容量时进行扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// 如果已经在扩容中
if (sc < 0) {
// 检查是否可以加入扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 加入扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 当前没有线程在扩容,则发起扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
addCount 方法执行流程
addCount 方法的执行可以分为两个主要部分:计数更新和扩容检查。
1. 计数更新部分
执行步骤:
-
首先尝试直接通过 CAS 操作更新 baseCount
- 这是最简单高效的路径,适用于并发度较低的情况
- 使用 CAS 避免了锁的开销,提高了性能
-
如果 baseCount 更新失败或 counterCells 已存在,则尝试更新 counterCells
- 当多线程同时更新时,CAS 操作容易失败,此时需要分散热点
- counterCells 数组将计数压力分散到多个单元格,减少竞争
-
根据当前线程的哈希值选择一个 CounterCell
- 使用 ThreadLocalRandom.getProbe()获取线程标识
- 不同线程倾向于更新不同的 CounterCell,降低冲突概率
-
如果选中的 CounterCell 为空或 CAS 更新失败,则调用 fullAddCount 方法处理
- fullAddCount 会处理更复杂的情况,如创建新的 CounterCell
- 包含退避和重试机制,确保最终能够成功更新计数
-
如果 check 参数小于等于 1,直接返回,否则计算总数
- 这是一个优化,避免不必要的总数计算
- 只有在可能需要扩容时才计算总数
2. 扩容检查部分
执行步骤:
-
当 check >= 0 时,检查是否需要扩容
- check 参数通常是 binCount(桶中的节点数量)
- 负值表示不需要检查扩容,是一种优化
-
计算当前表的大小标记值 resizeStamp(n)
- 这个值包含了数组长度的信息
- 用于标识特定大小的扩容操作,确保扩容的一致性
-
如果 sizeCtl 为负数,表示已经有线程在进行扩容
- sizeCtl < 0 是扩容进行中的标志
- 此时检查是否可以协助扩容,而不是启动新的扩容
-
检查多个条件决定是否可以协助扩容
- 验证扩容标记是否匹配当前表大小
- 检查是否达到最大扩容线程数
- 确认 nextTable 不为空且 transferIndex > 0
-
如果条件允许,则通过 CAS 增加扩容线程计数,并调用 transfer 方法协助扩容
- 多线程协同扩容是 ConcurrentHashMap 的一大特色
- 这种设计使得扩容操作能够并行执行,提高效率
-
如果 sizeCtl 不为负数,表示当前没有线程在扩容
- 此时需要启动新的扩容操作
- 通过 CAS 将 sizeCtl 设置为负数,标记开始扩容
-
调用 transfer 方法开始扩容操作
- transfer 方法负责实际的节点迁移工作
- 传入 null 作为 nextTable 参数,表示需要创建新表
-
扩容完成后,重新计算元素总数,检查是否需要再次扩容
- 这是一个循环检查,确保容量满足需求
- 在高并发添加的情况下可能需要连续多次扩容
为什么需要这么复杂的计数机制?
- 避免热点竞争:如果只使用一个计数器,在高并发下会成为性能瓶颈
- 减少 CAS 失败:通过分散计数到多个 cell,减少了 CAS 操作的冲突概率
- 提高并发度:不同线程可以同时更新不同的 CounterCell,提高并发性能
- 弱一致性:size()方法不需要获取全局锁,只需要累加所有计数值
与 LongAdder 的关系
ConcurrentHashMap 的计数机制与 JDK 8 中的 LongAdder 实现原理非常相似,都采用了分散热点的设计思想:
- 都使用 baseCount/base 作为初始计数器
- 都使用 Cell/CounterCell 数组分散计数
- 都通过线程探针值(probe)选择对应的 Cell
- 都在 CAS 失败时进行退避和重试
12.11 fullAddCount 方法的工作原理是什么?
答:
- 复杂计数的后备方案:当 addCount 中的快速路径失败时被调用
- 创建和初始化 CounterCells:负责 CounterCells 数组的初始化
- 处理竞争:通过退避和重试机制处理高并发下的竞争
- 动态扩容:在竞争激烈时扩容 CounterCells 数组
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
fullAddCount 方法执行流程
fullAddCount 方法的执行流程比 addCount 更加复杂,它需要处理多种情况:
1. 线程标识初始化
执行步骤:
-
检查当前线程是否有有效的探针值(probe)
- 探针值用于确定线程应该使用哪个 CounterCell
- 如果探针值为 0,表示尚未初始化
-
如果探针值为 0,则初始化线程的随机数生成器
- 调用 ThreadLocalRandom.localInit()强制初始化
- 获取新的探针值,并将 wasUncontended 设为 true
- 这确保每个线程都有唯一的标识,减少冲突
2. 主循环处理
fullAddCount 的核心是一个无限循环,根据不同情况执行不同的分支:
2.1 CounterCells 数组已存在
当 counterCells 数组已经初始化时:
-
尝试在对应槽位创建新的 CounterCell
- 如果当前线程对应的槽位为空,尝试创建新的 CounterCell
- 使用 cellsBusy 作为锁,确保只有一个线程可以修改结构
- 创建成功后直接退出循环
-
处理已知的竞争情况
- 如果 wasUncontended 为 false,表示之前已经发生过竞争
- 将 wasUncontended 设为 true,准备在 rehash 后重试
-
尝试 CAS 更新已存在的 CounterCell
- 如果对应槽位已有 CounterCell,尝试直接 CAS 更新其值
- 成功则退出循环,失败则继续处理
-
检查是否需要扩容
- 如果 CAS 失败且数组未过期且未达到最大大小,考虑扩容
- 使用 collide 标志控制是否进行扩容
- 第一次冲突时设置 collide 为 true,第二次冲突时才扩容
-
执行扩容操作
- 获取 cellsBusy 锁后,创建一个两倍大小的新数组
- 将原数组元素复制到新数组
- 扩容后重置 collide 标志并继续循环
-
更新线程探针值
- 如果以上操作都未成功,则更新线程的探针值
- 使用 ThreadLocalRandom.advanceProbe(h)生成新的哈希值
- 这相当于 rehash,目的是减少下次冲突的可能性
2.2 CounterCells 数组不存在
当 counterCells 数组尚未初始化时:
-
尝试初始化 CounterCells 数组
- 获取 cellsBusy 锁后创建大小为 2 的新数组
- 根据线程探针值选择一个槽位并初始化
- 初始化成功后退出循环
-
回退到 baseCount
- 如果无法获取锁或初始化失败,尝试直接更新 baseCount
- 这是最后的回退方案,确保计数一定能够更新
为什么需要 fullAddCount 方法?
- 处理复杂场景:addCount 方法只处理简单场景,复杂情况下委托给 fullAddCount
- 初始化 CounterCells:负责 CounterCells 数组的创建和初始化
- 动态调整:根据并发情况动态扩容 CounterCells 数组
- 退避与重试:实现了退避和重试机制,减少高并发下的冲突
fullAddCount 与 LongAdder 的 longAccumulate 方法
fullAddCount 方法的设计与 JDK 8 中 LongAdder 的 longAccumulate 方法几乎相同:
- 都使用无限循环处理各种情况
- 都实现了 Cell 数组的初始化、更新和扩容
- 都使用线程探针值进行哈希定位
- 都实现了退避和重试机制
- 都在最后提供了回退到 base 值的机制
这种设计使得 ConcurrentHashMap 的计数操作在高并发环境下具有出色的性能,避免了单一计数器可能导致的严重竞争问题。
13. 最佳实践
- 建议设置初始容量避免频繁扩容
- 预估并发量合理设置并发级别
- 尽量使用不可变对象作为 key
- 关注 size()与 mappingCount()的区别
- 使用 forEach 系列方法进行批量操作
11.5 扩容机制
- 多线程协同扩容
- 迁移期间保证读写访问
- 扩容触发条件:
- 元素数量超过阈值
- 链表长度超过 8(且数组长度 ≥64)
11.5.1 transfer 方法源码
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算每个线程应该处理的桶数量,最小为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // 预设值16
// 如果新表为空,则创建一个原表两倍大小的新表
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // 处理内存溢出
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n; // 用于分配任务的索引
}
int nextn = nextTab.length;
// 创建一个特殊节点,标记当前桶已迁移完成
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 标记是否需要继续推进任务
boolean advance = true;
// 标记是否已完成所有迁移工作
boolean finishing = false;
// i是当前处理的桶索引,bound是当前任务的下界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 分配或前进任务
while (advance) {
int nextIndex, nextBound;
// 如果已经处理完当前区间或正在完成
if (--i >= bound || finishing)
advance = false;
// 如果没有更多的桶需要处理
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// CAS尝试更新transferIndex,为当前线程分配任务区间
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 检查是否完成迁移
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果所有桶都已处理完
if (finishing) {
nextTable = null;
table = nextTab; // 替换为新表
sizeCtl = (n << 1) - (n >>> 1); // 新的阈值为新容量的0.75倍
return;
}
// 当前线程完成任务,尝试减少活动扩容线程计数
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 检查是否是最后一个扩容线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 所有线程都已完成,标记结束
finishing = advance = true;
i = n; // 再次检查所有桶
}
}
// 如果当前桶为空,放入ForwardingNode标记
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 如果当前桶已经是ForwardingNode,说明已被处理
else if ((fh = f.hash) == MOVED)
advance = true;
// 处理当前桶
else {
// 锁定当前桶的头节点
synchronized (f) {
// 再次检查头节点是否变化
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 如果是链表节点
if (fh >= 0) {
// 根据节点hash值的高位(n的大小对应的位),将链表分为两部分
int runBit = fh & n;
Node<K,V> lastRun = f;
// 寻找最后一段相同高位的节点序列
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 根据高位是否为0,决定放入低位表还是高位表
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 遍历链表,根据高位分组
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将低位链表放入新表的i位置
setTabAt(nextTab, i, ln);
// 将高位链表放入新表的i+n位置
setTabAt(nextTab, i + n, hn);
// 在旧表中标记该桶已迁移
setTabAt(tab, i, fwd);
// 继续处理下一个桶
advance = true;
}
// 如果是红黑树节点
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历树节点,根据高位分组
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 根据节点数量决定是构建红黑树还是链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 将低位树/链表放入新表的i位置
setTabAt(nextTab, i, ln);
// 将高位树/链表放入新表的i+n位置
setTabAt(nextTab, i + n, hn);
// 在旧表中标记该桶已迁移
setTabAt(tab, i, fwd);
// 继续处理下一个桶
advance = true;
}
}
}
}
}
}
11.5.2 扩容流程详解
ConcurrentHashMap 的扩容是一个复杂的过程,它支持多线程协同工作,主要流程如下:
1. 任务分配机制
执行步骤:
- 计算任务步长
- 根据 CPU 核心数计算每个线程应处理的桶数量(stride)
- 最小为 16 个桶,确保任务粒度合适
- 创建新表
- 如果是第一个扩容线程,创建一个大小为原表两倍的新表
- 将 nextTable 引用指向新表,作为全局共享变量
- 任务分配
- 使用 transferIndex 作为分配指针,初始值为原表长度
- 线程通过 CAS 更新 transferIndex,为自己分配一段连续的桶区间
- 每个线程处理的区间为[nextBound, nextIndex-1]
2. 数据迁移策略
执行步骤:
- 创建 ForwardingNode
- 这是一个特殊节点,哈希值为 MOVED(-1)
- 用于标记当前桶已完成迁移,并引导其他操作到新表
- 处理空桶
- 如果桶为空,直接放入 ForwardingNode 标记已处理
- 使用 CAS 操作确保并发安全
- 处理非空桶
- 对桶头节点加 synchronized 锁,确保独占访问
- 再次检查桶头是否变化,实现双重检查锁定模式
- 链表迁移
- 根据节点哈希值与桶数量 n 的按位与结果,将链表分为两部分:
- 结果为 0 的节点:放入新表的相同位置 i
- 结果为 1 的节点:放入新表的 i+n 位置
- 这种分配方式确保了哈希分布的均匀性
- 红黑树迁移
- 类似链表,将树节点分为两组
- 根据分组后的节点数量决定是构建新的红黑树还是转回链表
- 如果节点数小于等于 UNTREEIFY_THRESHOLD(6),则转为链表
- 标记完成
- 处理完当前桶后,将旧表中的桶设置为 ForwardingNode
- 这样其他线程的读操作会被转发到新表
3. 扩容完成处理
执行步骤:
- 线程完成检测
- 当线程完成自己的任务区间,通过 CAS 减少 sizeCtl 中的线程计数
- 检查是否是最后一个扩容线程
- 最终检查
- 最后一个线程会设置 finishing 标志
- 再次检查所有桶,确保没有遗漏
- 表切换
- 所有桶处理完成后,将 table 引用指向新表
- 设置新的 sizeCtl 阈值(新容量的 0.75 倍)
- 清除 nextTable 引用
4. 并发控制机制
- 读操作处理
- 读操作遇到 ForwardingNode 时会自动跳转到新表继续查找
- 这确保了扩容过程中读操作的正确性
- 写操作处理
- 写操作遇到 ForwardingNode 时会帮助扩容
- 这种协作机制加速了扩容过程
- 任务协调
- 通过 sizeCtl 字段记录参与扩容的线程数
- 通过 resizeStamp 记录当前扩容操作的标识
11.5.3 扩容触发条件
ConcurrentHashMap 的扩容会在以下情况触发:
- 元素数量超过阈值
- 在 put 操作的最后会调用 addCount 方法增加计数
- 当元素数量超过 sizeCtl(阈值)时触发扩容
- 链表过长
- 当链表长度达到 TREEIFY_THRESHOLD(8)时
- 如果表长度小于 MIN_TREEIFY_CAPACITY(64),优先触发扩容而非树化
11.5.4 为什么采用多线程扩容?
- 提高扩容效率
- 在大容量表的情况下,单线程扩容会导致严重的性能下降
- 多线程协同工作可以充分利用多核 CPU 资源
- 减少阻塞时间
- 扩容过程中,其他线程可以协助完成扩容,而不是等待
- 这种设计减少了因扩容导致的操作阻塞
- 负载均衡
- 通过 stride 步长控制,合理分配任务
- 避免单个线程负担过重
11.5.5 扩容过程中的并发安全
- ForwardingNode 机制
- 标记已迁移的桶,引导读写操作到新表
- 确保扩容过程中的操作正确性
- 双重检查锁定
- 在获取锁后再次检查桶头节点,避免不必要的操作
- CAS 操作
- 使用 CAS 更新 transferIndex 分配任务
- 使用 CAS 更新 sizeCtl 记录扩容状态和线程数
- volatile 保证
- nextTable 和 transferIndex 等字段使用 volatile 修饰
- 确保多线程之间的可见性