为什么不能在多线程中使用普通集合?
HashMap、ArrayList、HashSet 等标准集合类不是线程安全的。
在多线程环境中直接使用,可能出现:
- 数据丢失:两个线程同时 put,触发 HashMap 扩容,可能导致条目丢失。
- 无限循环:Java 7 的 HashMap 在多线程扩容时会产生循环链表,导致 CPU 100% 占用(Java 8 已修复,但仍非线程安全)。
- ConcurrentModificationException:遍历集合时另一线程修改,触发快速失败(fail-fast)机制。
- 内存可见性问题:一个线程的修改对另一线程不可见。
虽然 Collections.synchronizedMap(new HashMap<>()) 使每个方法线程安全,
但复合操作(如「不存在则插入」)仍非原子性,且对整个集合加锁导致并发度极低。
推荐使用 java.util.concurrent 包下的专用并发集合。
ConcurrentHashMap
从 Java 7 到 Java 8 的演进
Java 7:Segment 分段锁
- 内部分为 16 个 Segment(可配置)
- 每个 Segment 是一个独立的 ReentrantLock
- 不同 Segment 的操作可并行(最大并发度 = Segment 数)
- 结构复杂,每个 Segment 是一个 HashTable
Java 8:CAS + synchronized
- 去掉 Segment,退化为 Node 数组 + 链表/红黑树
- 插入时用 CAS 操作(无锁路径)
- 冲突时只锁定单个 bucket(synchronized 链表头节点)
- 并发度更高(最大并发 = 桶数量),代码更简洁
核心操作
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 基本 get/put(线程安全)
map.put("a", 1);
Integer val = map.get("a"); // 返回 1
// putIfAbsent:原子性「不存在则插入」
map.putIfAbsent("b", 2); // 原子操作,不需要额外锁
// compute:原子性地计算并更新值
map.compute("a", (k, v) -> v == null ? 1 : v + 1); // 原子 increment
// computeIfAbsent:不存在时才计算(常用于缓存)
map.computeIfAbsent("user:123", key -> loadUserFromDB(key));
// merge:合并值(线程安全的 word count)
String[] words = {"apple", "banana", "apple"};
for (String word : words) {
map.merge(word, 1, Integer::sum); // 不存在则放1,存在则累加
}
// 结果:{apple=2, banana=1}
// size() 是近似值,非精确值(高并发下)
// 需要精确计数用 mappingCount()(返回 long)
long exactSize = map.mappingCount();
ConcurrentHashMap 的注意事项
// 错误:复合操作不是原子性的
if (!map.containsKey("key")) { // 检查
map.put("key", computeValue()); // 写入 —— 两步不原子!
}
// 正确:使用原子复合操作
map.computeIfAbsent("key", k -> computeValue()); // 原子
// 错误:在迭代时修改(虽不抛异常,但语义不明确)
// 正确做法:收集需要删除的 key,迭代后批量删除
map.entrySet().removeIf(e -> e.getValue() < 0); // 安全的批量删除
CopyOnWriteArrayList
写时复制(Copy-On-Write)是一种并发策略:读操作无锁直接访问原始数组; 写操作(add/set/remove)时,先复制一份新数组,在新数组上完成修改,然后通过 volatile 变量将引用切换到新数组。
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("A");
list.add("B");
list.add("C");
// 读操作完全无锁,多线程读没有任何性能开销
for (String s : list) {
System.out.println(s);
// 迭代过程中另一线程 add("D") 也不会 ConcurrentModificationException
// 但此次迭代看不到 "D"(快照迭代器,基于创建时的数组快照)
}
// 适合:事件监听器列表(读多写少,读多在遍历)
CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();
void addListener(EventListener l) { listeners.add(l); } // 偶尔写
void fireEvent(Event e) {
for (EventListener l : listeners) { // 频繁读遍历,无锁
l.onEvent(e);
}
}
每次写操作都要复制整个数组,时间和空间复杂度均为 O(n)。若列表很大且写操作频繁,性能会很差。
只推荐用于读远多于写(如监听器列表)、列表较小的场景。
写多读少的场景应使用 ConcurrentLinkedDeque 或 BlockingQueue。
BlockingQueue 阻塞队列体系
BlockingQueue 是并发编程中生产者-消费者模式的核心工具,
提供阻塞的 put/take 操作:队列满时 put 阻塞,队列空时 take 阻塞。
BlockingQueue 的四类方法
| 操作 | 抛出异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 删除 | remove() | poll() | take() | poll(time, unit) |
| 检查 | element() | peek() | - | - |
常用实现类
生产者-消费者实战
import java.util.concurrent.*;
public class ProducerConsumer {
// 有界队列:控制内存使用,防止生产者跑太快压垮消费者
private static final BlockingQueue<String> queue =
new LinkedBlockingQueue<>(100); // 容量 100
public static void main(String[] args) {
// 3 个生产者
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
for (int j = 0; j < 10; j++) {
try {
String item = "item-" + id + "-" + j;
queue.put(item); // 队列满时阻塞
System.out.println("生产:" + item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "producer-" + i).start();
}
// 2 个消费者
for (int i = 0; i < 2; i++) {
new Thread(() -> {
while (true) {
try {
String item = queue.poll(1, TimeUnit.SECONDS); // 超时取
if (item == null) break; // 超时退出
System.out.println("消费:" + item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "consumer-" + i).start();
}
}
}
DelayQueue 实战:定时任务
import java.util.concurrent.*;
class DelayedTask implements Delayed {
private final String name;
private final long executeAt; // 执行时间(纳秒)
public DelayedTask(String name, long delayMs) {
this.name = name;
this.executeAt = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(delayMs);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeAt - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeAt, ((DelayedTask) o).executeAt);
}
public String getName() { return name; }
}
// 使用
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayedTask("清理缓存", 5000)); // 5秒后执行
delayQueue.put(new DelayedTask("发送邮件", 1000)); // 1秒后执行
// 消费者:take() 阻塞直到最近任务到期
while (true) {
DelayedTask task = delayQueue.take(); // 阻塞等待
System.out.println("执行:" + task.getName());
}
其他并发集合
ConcurrentSkipListMap / ConcurrentSkipListSet
跳表(Skip List)是一种基于概率的有序数据结构,复杂度 O(log n)。
ConcurrentSkipListMap 是线程安全的 TreeMap 替代品,
通过 CAS 实现无锁的并发有序 Map。
// 线程安全的有序 Map(按 key 排序)
ConcurrentSkipListMap<String, Integer> sortedMap = new ConcurrentSkipListMap<>();
sortedMap.put("banana", 2);
sortedMap.put("apple", 1);
sortedMap.put("cherry", 3);
sortedMap.firstKey(); // "apple"
sortedMap.headMap("b").keySet(); // ["apple"](小于 "b" 的所有 key)
选择并发集合的决策树
本章小结
- 普通集合(HashMap、ArrayList)非线程安全,
Collections.synchronizedXxx()粒度太粗,推荐使用java.util.concurrent专用并发集合。 - ConcurrentHashMap(Java 8)用 CAS + 桶级 synchronized 实现高并发读写,
compute/merge/computeIfAbsent提供原子复合操作。 - CopyOnWriteArrayList 读无锁,写时复制,适合读多写少的小列表(如事件监听器)。
- BlockingQueue 是生产者-消费者模式的核心,提供阻塞的 put/take,有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 等多种实现。
- ConcurrentSkipListMap 提供线程安全的有序 Map;DelayQueue 支持延迟任务调度。