Chapter 04

并发集合

ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue——Java 并发集合完全指南

为什么不能在多线程中使用普通集合?

HashMapArrayListHashSet 等标准集合类不是线程安全的。 在多线程环境中直接使用,可能出现:

Collections.synchronizedXxx() 的局限

虽然 Collections.synchronizedMap(new HashMap<>()) 使每个方法线程安全, 但复合操作(如「不存在则插入」)仍非原子性,且对整个集合加锁导致并发度极低。 推荐使用 java.util.concurrent 包下的专用并发集合。

ConcurrentHashMap

从 Java 7 到 Java 8 的演进

Java 7:Segment 分段锁

  • 内部分为 16 个 Segment(可配置)
  • 每个 Segment 是一个独立的 ReentrantLock
  • 不同 Segment 的操作可并行(最大并发度 = Segment 数)
  • 结构复杂,每个 Segment 是一个 HashTable

Java 8:CAS + synchronized

  • 去掉 Segment,退化为 Node 数组 + 链表/红黑树
  • 插入时用 CAS 操作(无锁路径)
  • 冲突时只锁定单个 bucket(synchronized 链表头节点)
  • 并发度更高(最大并发 = 桶数量),代码更简洁
Java 8 ConcurrentHashMap 结构: Node[](桶数组) ┌────┬────┬────┬────┬────┬────┬────┬────┐ │ 0 │ 1 │ 2 │ 3 │ 4 │...│ │N-1 │ └──┬─┴────┴────┴──┬─┴────┴────┴────┴────┘ │ │ ▼ ▼ 节点数 ≥ 8 时转红黑树 Node(k,v) TreeBin │ ├── TreeNode Node(k,v) ├── TreeNode │ └── TreeNode null 并发策略: · 桶为空 → CAS 直接写入(无锁) · 桶不空 → synchronized 锁桶头节点(只锁一个桶)

核心操作

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 基本 get/put(线程安全)
map.put("a", 1);
Integer val = map.get("a"); // 返回 1

// putIfAbsent:原子性「不存在则插入」
map.putIfAbsent("b", 2);  // 原子操作,不需要额外锁

// compute:原子性地计算并更新值
map.compute("a", (k, v) -> v == null ? 1 : v + 1); // 原子 increment

// computeIfAbsent:不存在时才计算(常用于缓存)
map.computeIfAbsent("user:123", key -> loadUserFromDB(key));

// merge:合并值(线程安全的 word count)
String[] words = {"apple", "banana", "apple"};
for (String word : words) {
    map.merge(word, 1, Integer::sum); // 不存在则放1,存在则累加
}
// 结果:{apple=2, banana=1}

// size() 是近似值,非精确值(高并发下)
// 需要精确计数用 mappingCount()(返回 long)
long exactSize = map.mappingCount();

ConcurrentHashMap 的注意事项

// 错误:复合操作不是原子性的
if (!map.containsKey("key")) {     // 检查
    map.put("key", computeValue()); // 写入 —— 两步不原子!
}

// 正确:使用原子复合操作
map.computeIfAbsent("key", k -> computeValue()); // 原子

// 错误:在迭代时修改(虽不抛异常,但语义不明确)
// 正确做法:收集需要删除的 key,迭代后批量删除
map.entrySet().removeIf(e -> e.getValue() < 0); // 安全的批量删除

CopyOnWriteArrayList

写时复制(Copy-On-Write)是一种并发策略:读操作无锁直接访问原始数组; 写操作(add/set/remove)时,先复制一份新数组,在新数组上完成修改,然后通过 volatile 变量将引用切换到新数组。

CopyOnWriteArrayList 写操作流程: 初始状态:array ──► [A, B, C] 线程 A 执行 add("D"): 1. 加锁(独占写) 2. 复制:newArray = Arrays.copyOf(array, len + 1) newArray ──► [A, B, C, _] 3. 写入:newArray[3] = "D" newArray ──► [A, B, C, D] 4. 切换:array = newArray(volatile 写,立即可见) 5. 解锁 期间读操作:直接读旧 array [A, B, C],不阻塞 切换后读操作:读到新 array [A, B, C, D]
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

list.add("A");
list.add("B");
list.add("C");

// 读操作完全无锁,多线程读没有任何性能开销
for (String s : list) {
    System.out.println(s);
    // 迭代过程中另一线程 add("D") 也不会 ConcurrentModificationException
    // 但此次迭代看不到 "D"(快照迭代器,基于创建时的数组快照)
}

// 适合:事件监听器列表(读多写少,读多在遍历)
CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();

void addListener(EventListener l) { listeners.add(l); }     // 偶尔写
void fireEvent(Event e) {
    for (EventListener l : listeners) { // 频繁读遍历,无锁
        l.onEvent(e);
    }
}
CopyOnWriteArrayList 的代价

每次写操作都要复制整个数组,时间和空间复杂度均为 O(n)。若列表很大且写操作频繁,性能会很差。 只推荐用于读远多于写(如监听器列表)、列表较小的场景。 写多读少的场景应使用 ConcurrentLinkedDequeBlockingQueue

BlockingQueue 阻塞队列体系

BlockingQueue 是并发编程中生产者-消费者模式的核心工具, 提供阻塞的 put/take 操作:队列满时 put 阻塞,队列空时 take 阻塞。

BlockingQueue 的四类方法

操作 抛出异常 返回特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
删除 remove() poll() take() poll(time, unit)
检查 element() peek() - -

常用实现类

ArrayBlockingQueue
基于数组的有界阻塞队列,容量固定(创建时指定)。生产者和消费者共用同一把锁(ReentrantLock),可选公平性。内存局部性好。
LinkedBlockingQueue
基于链表的(可选)有界阻塞队列,默认容量为 Integer.MAX_VALUE(近似无界)。生产者和消费者分别用不同的锁(putLock/takeLock),并发度更高。
PriorityBlockingQueue
基于堆的无界优先级阻塞队列,元素按优先级排序(需实现 Comparable 或提供 Comparator)。take 返回最高优先级元素。
DelayQueue
无界延迟队列,元素只有在其延迟期满后才能被取出(元素需实现 Delayed 接口)。常用于定时任务调度。
SynchronousQueue
容量为零的特殊队列,每个 put 必须等待一个 take,反之亦然。直接在生产者和消费者之间传递数据,常用于线程池的任务传递(Executors.newCachedThreadPool() 内部使用)。
LinkedTransferQueue
Java 7 引入,无界。支持 transfer()(生产者等待消费者直接取走)和 tryTransfer()(非阻塞),是 SynchronousQueue 的超集。
ConcurrentLinkedQueue
无界非阻塞队列,基于 CAS 的无锁实现(Michael-Scott 算法)。不阻塞,poll() 队列为空时返回 null 而非阻塞。适合低延迟场景。

生产者-消费者实战

import java.util.concurrent.*;

public class ProducerConsumer {
    // 有界队列:控制内存使用,防止生产者跑太快压垮消费者
    private static final BlockingQueue<String> queue =
        new LinkedBlockingQueue<>(100); // 容量 100

    public static void main(String[] args) {
        // 3 个生产者
        for (int i = 0; i < 3; i++) {
            final int id = i;
            new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    try {
                        String item = "item-" + id + "-" + j;
                        queue.put(item); // 队列满时阻塞
                        System.out.println("生产:" + item);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, "producer-" + i).start();
        }

        // 2 个消费者
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                while (true) {
                    try {
                        String item = queue.poll(1, TimeUnit.SECONDS); // 超时取
                        if (item == null) break; // 超时退出
                        System.out.println("消费:" + item);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }, "consumer-" + i).start();
        }
    }
}

DelayQueue 实战:定时任务

import java.util.concurrent.*;

class DelayedTask implements Delayed {
    private final String name;
    private final long executeAt; // 执行时间(纳秒)

    public DelayedTask(String name, long delayMs) {
        this.name = name;
        this.executeAt = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(delayMs);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(executeAt - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.executeAt, ((DelayedTask) o).executeAt);
    }

    public String getName() { return name; }
}

// 使用
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayedTask("清理缓存", 5000));  // 5秒后执行
delayQueue.put(new DelayedTask("发送邮件", 1000));  // 1秒后执行

// 消费者:take() 阻塞直到最近任务到期
while (true) {
    DelayedTask task = delayQueue.take(); // 阻塞等待
    System.out.println("执行:" + task.getName());
}

其他并发集合

ConcurrentSkipListMap / ConcurrentSkipListSet

跳表(Skip List)是一种基于概率的有序数据结构,复杂度 O(log n)。 ConcurrentSkipListMap 是线程安全的 TreeMap 替代品, 通过 CAS 实现无锁的并发有序 Map。

// 线程安全的有序 Map(按 key 排序)
ConcurrentSkipListMap<String, Integer> sortedMap = new ConcurrentSkipListMap<>();
sortedMap.put("banana", 2);
sortedMap.put("apple", 1);
sortedMap.put("cherry", 3);

sortedMap.firstKey(); // "apple"
sortedMap.headMap("b").keySet(); // ["apple"](小于 "b" 的所有 key)

选择并发集合的决策树

需要线程安全集合? │ ├── 键值映射(Map) │ ├── 需要排序 → ConcurrentSkipListMap │ └── 不需要排序 → ConcurrentHashMap │ ├── 列表(List) │ ├── 读多写少(如监听器)→ CopyOnWriteArrayList │ └── 读写均衡 → 考虑 BlockingQueue 或 ConcurrentLinkedDeque │ ├── 集合(Set) │ ├── 需要排序 → ConcurrentSkipListSet │ └── 不需要排序 → ConcurrentHashMap.newKeySet() │ └── 队列(Queue) ├── 需要阻塞 → BlockingQueue(LinkedBlockingQueue 通用) ├── 需要优先级 → PriorityBlockingQueue ├── 需要延迟 → DelayQueue ├── 直接传递 → SynchronousQueue / LinkedTransferQueue └── 非阻塞高性能 → ConcurrentLinkedQueue

本章小结