第一部分:Concurrent集合的基础
1.1 什么是Concurrent集合?
java.util.concurrent
包中的Concurrent集合是为高并发场景设计的线程安全数据结构,旨在提供高效的并发访问能力。它们通过细粒度锁、无锁算法、写时复制等技术,减少锁竞争和上下文切换,提升性能。主要Concurrent集合包括:
ConcurrentHashMap
:线程安全的哈希表,支持高并发读写,适合缓存、配置管理等场景。CopyOnWriteArrayList
:写时复制的线程安全列表,适合读多写少的场景,如事件监听器列表。CopyOnWriteArraySet
:基于CopyOnWriteArrayList
的线程安全集合,适合无重复元素的管理。ConcurrentLinkedQueue
:无锁的线程安全队列,适合任务队列、消息传递等场景。- **
ConcurrentSkipListMap
和ConcurrentSkipListSet
:基于跳表的线程安全有序映射和集合,适合排序需求。 BlockingQueue
及其实现(如ArrayBlockingQueue
、LinkedBlockingQueue
):支持阻塞操作的线程安全队列,适合生产者-消费者模式。
基本示例(ConcurrentHashMap):
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapExample {
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
public void increment(String key) {
map.compute(key, (k, v) -> v == null ? 1 : v + 1);
System.out.println(Thread.currentThread().getName() + " updated: " + key + "=" + map.get(key));
}
public static void main(String[] args) {
Concurrent完成后HashMapExample example = new ConcurrentHashMapExample();
Runnable task = () -> example.increment("counter");
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
threads[i] = new Thread(task, "Thread-" + i);
threads[i].start();
}
}
}
输出示例:
Thread-0 updated: counter=1
Thread-1 updated: counter=2
Thread-2 updated: counter=3
Thread-3 updated: counter=4
Thread-4 updated: counter=5
说明:
ConcurrentHashMap
支持多个线程并发调用compute
,确保线程安全。- 原子操作(如
compute
)避免显式锁,提高性能。
1.2 Concurrent集合与传统同步集合的对比
特性 | Concurrent集合 | 传统同步集合(如Collections.synchronizedMap) |
锁机制 | 细粒度锁、无锁、写时复制 | 全局锁 |
并发性能 | 高,适合高并发 | 低,锁竞争严重 |
迭代安全性 | 支持弱一致性迭代 | 可能抛出ConcurrentModificationException |
复杂性 | 提供原子操作,简化并发逻辑 | 需手动加锁 |
适用场景 | 高并发读写、任务队列等 | 简单线程安全场景 |
适用场景:
- 传统同步集合:适合低并发、简单同步需求的场景。
- Concurrent集合:适合高并发、读多写少或任务队列的场景。
1.3 Concurrent集合的核心优势
- 高并发性:通过细粒度锁或无锁算法减少竞争。
- 线程安全:内置线程安全,无需外部同步。
- 原子操作:提供
compute
、putIfAbsent
等方法,简化并发逻辑。 - 弱一致性迭代:迭代器支持并发修改,适合动态数据场景。
- 多样性:不同集合针对不同场景(如读多写少、队列)优化。
1.4 Concurrent集合的核心挑战
- 弱一致性:迭代器可能反映不完整的更新。
- 复杂性:需理解每种集合的适用场景和性能特性。
- 内存开销:如
CopyOnWriteArrayList
的写时复制可能导致高内存消耗。 - 无条件变量:部分集合不支持复杂线程协调。
第二部分:Concurrent集合的实现与原理
2.1 ConcurrentHashMap的运行机制
ConcurrentHashMap
是高并发场景中最常用的集合,基于分段锁(Java 7)或CAS+同步(Java 8+)实现。
- Java 7:使用
Segment
数组实现分段锁,每个段管理部分桶,读操作无锁,写操作锁住段。 - Java 8+:
- 结构:桶数组+链表/红黑树(桶元素过多时转为红黑树)。
- 锁机制:写操作使用CAS或
synchronized
锁单个桶,读操作无锁。 - 原子操作:
putIfAbsent
、compute
等通过CAS实现。 - 扩容:支持并发扩容,多个线程协同转移桶。
源码分析(简化):
public class ConcurrentHashMap<K, V> {
static class Node<K, V> {
final K key;
V value;
Node<K, V> next;
}
private transient volatile Node<K, V>[] table;
public V putIfAbsent(K key, V value) {
return putVal(key, value, true);
}
private V putVal(K key, V value, boolean onlyIfAbsent) {
int hash = hash(key);
int binCount = 0;
Node<K, V>[] tab = table;
for (;;) {
Node<K, V> e;
int n, i;
if (tab == null || (n = tab.length) == 0) {
tab = initTable();
} else if ((e = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<>(hash, key, value, null))) {
break;
}
} else if ((e.hash & hash) == 0) {
synchronized (e) {
// 更新或插入
}
}
}
return null;
}
}
说明:
- CAS操作(如
casTabAt
)实现无锁插入。 synchronized
锁定单个桶,减少锁粒度。- 红黑树优化高冲突场景的查询性能。
2.2 CopyOnWriteArrayList的运行机制
CopyOnWriteArrayList
采用写时复制策略,适合读多写少场景。
- 写操作:复制底层tuowrite整个数组,修改副本后替换原数组,锁保护写操作。
- 读操作:直接访问数组,无锁,性能高。
- 迭代器:返回数组快照,支持弱一致性迭代。
源码分析(简化):
public class CopyOnWriteArrayList<E> {
private transient volatile Object[] array;
private final ReentrantLock lock = new ReentrantLock();
public boolean add(E e) {
lock.lock();
try {
Object[] elements = array;
Object[] newElements = new Object[elements.length + 1];
System.arraycopy(elements, 0, newElements, 0, elements.length);
newElements[elements.length] = e;
array = newElements;
return true;
} finally {
lock.unlock();
}
}
}
说明:
- 写操作复制整个数组,线程安全但内存开销大。
- 读操作直接访问
array
,无锁高效。
2.3 ConcurrentLinkedQueue的运行机制
ConcurrentLinkedQueue
基于无锁算法(Michael-Scott算法),使用CAS实现线程安全队列。
- 结构:单向链表,
head
和tail
指针。 - 入队:CAS更新
tail
指针和节点。 - 出队:CAS更新
head
指针。
源码分析(简化):
public class ConcurrentLinkedQueue<E> {
private static class Node<E> {
E item;
volatile Node<E> next;
}
private volatile Node<E> head;
private volatile Node<E> tail;
public boolean offer(E e) {
Node<E> newNode = new Node<>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> next = p.next;
if (next == null) {
if (p.casNext(null, newNode)) {
casTail(t, newNode);
return true;
}
} else {
p = next;
}
}
}
}
说明:
- CAS操作确保入队/出队原子性。
- 无锁设计提升高并发性能。
第三部分:Concurrent集合的实现方式
3.1 缓存系统(ConcurrentHashMap)
ConcurrentHashMap
实现高并发缓存:
import java.util.concurrent.ConcurrentHashMap;
public class CacheSystem {
private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();
public String get(String key) {
String value = cache.get(key);
if (value == null) {
value = "Value-from-DB-" + key; // 模拟数据库查询
cache.putIfAbsent(key, value);
System.out.println(Thread.currentThread().getName() + " cache miss: " + key);
} else {
System.out.println(Thread.currentThread().getName() + " cache hit: " + key);
}
return value;
}
public static void main(String[] args) {
CacheSystem cache = new CacheSystem();
Runnable task = () -> cache.get("user1");
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
threads[i] = new Thread(task, "Thread-" + i);
threads[i].start();
}
}
}
输出示例:
Thread-0 cache miss: user1
Thread-1 cache hit: user1
Thread-2 cache hit: user1
Thread-3 cache hit: user1
Thread-4 cache hit: user1
说明:
putIfAbsent
确保缓存初始化线程安全。- 无锁读操作提升并发性能。
3.2 事件监听器(CopyOnWriteArrayList)
CopyOnWriteArrayList
管理事件监听器:
import java.util.concurrent.CopyOnWriteArrayList;
public class EventListenerManager {
private final CopyOnWriteArrayList<Runnable> listeners = new CopyOnWriteArrayList<>();
public void addListener(Runnable listener) {
listeners.add(listener);
}
public void fireEvent() {
for (Runnable listener : listeners) {
listener.run();
}
}
public static void main(String[] args) {
EventListenerManager manager = new EventListenerManager();
manager.addListener(() -> System.out.println(Thread.currentThread().getName() + " received event"));
Runnable task = manager::fireEvent;
Thread[] threads = new Thread[3];
for (int i = 0; i < 3; i++) {
threads[i] = new Thread(task, "Thread-" + i);
threads[i].start();
}
}
}
输出示例:
Thread-0 received event
Thread-1 received event
Thread-2 received event
说明:
- 写时复制确保添加监听器不影响迭代。
- 弱一致性迭代器适合动态监听器列表。
3.3 任务队列(ConcurrentLinkedQueue)
ConcurrentLinkedQueue
实现任务队列:
import java.util.concurrent.ConcurrentLinkedQueue;
public class TaskQueue {
private final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
private final ExecutorService executor = Executors.newFixedThreadPool(2);
public void submit(Runnable task) {
queue.offer(task);
}
public void startProcessing() {
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
Runnable task = queue.poll();
if (task != null) {
task.run();
}
}
});
}
public void shutdown() {
executor.shutdown();
}
public static void main(String[] args) throws InterruptedException {
TaskQueue taskQueue = new TaskQueue();
taskQueue.startProcessing();
for (int i = 0; i < 5; i++) {
taskQueue.submit(() -> System.out.println(Thread.currentThread().getName() + " processing task"));
}
Thread.sleep(100);
taskQueue.shutdown();
}
}
输出示例:
pool-1-thread-1 processing task
pool-1-thread-2 processing task
pool-1-thread-1 processing task
pool-1-thread-2 processing task
pool-1-thread-1 processing task
说明:
- 无锁队列支持高并发任务提交。
poll
和offer
操作线程安全。
3.4 有序配置管理(ConcurrentSkipListMap)
ConcurrentSkipListMap
管理有序配置:
import java.util.concurrent.ConcurrentSkipListMap;
public class ConfigManager {
private final ConcurrentSkipListMap<String, String> config = new ConcurrentSkipListMap<>();
public void updateConfig(String key, String value) {
config.put(key, value);
System.out.println(Thread.currentThread().getName() + " updated: " + key + "=" + value);
}
public void printConfig() {
for (var entry : config.entrySet()) {
System.out.println(Thread.currentThread().getName() + ": " + entry.getKey() + "=" + entry.getValue());
}
}
public static void main(String[] args) {
ConfigManager manager = new ConfigManager();
Runnable updateTask = () -> manager.updateConfig("key" + Thread.currentThread().getName(), "value");
Runnable printTask = manager::printConfig;
Thread[] threads = new Thread[3];
threads[0] = new Thread(updateTask, "Thread-0");
threads[1] = new Thread(updateTask, "Thread-1");
threads[2] = new Thread(printTask, "Thread-2");
for (Thread t : threads) {
t.start();
}
}
}
输出示例:
Thread-0 updated: keyThread-0=value
Thread-1 updated: keyThread-1=value
Thread-2: keyThread-0=value
Thread-2: keyThread-1=value
说明:
- 跳表结构确保键有序。
- 并发更新和迭代线程安全。
第四部分:Concurrent集合的性能分析
4.1 ConcurrentHashMap与Collections.synchronizedMap性能对比
测试ConcurrentHashMap
与Collections.synchronizedMap
的性能:
import java.util.*;
import java.util.concurrent.*;
public class ConcurrentHashMapPerformance {
private final ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
private final Map<String, Integer> syncMap = Collections.synchronizedMap(new HashMap<>());
public void testConcurrentMap() {
concurrentMap.compute("key", (k, v) -> v == null ? 1 : v + 1);
}
public void testSyncMap() {
synchronized (syncMap) {
syncMap.compute("key", (k, v) -> v == null ? 1 : v + 1);
}
}
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMapPerformance test = new ConcurrentHashMapPerformance();
int iterations = 100_000;
int threadCount = 5;
// ConcurrentHashMap性能
long startTime = System.nanoTime();
Thread[] concurrentThreads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
concurrentThreads[i] = new Thread(() -> {
for (int j = 0; j < iterations; j++) {
test.testConcurrentMap();
}
});
concurrentThreads[i].start();
}
for (Thread t : concurrentThreads) {
t.join();
}
long concurrentTime = System.nanoTime() - startTime;
// SynchronizedMap性能
startTime = System.nanoTime();
Thread[] syncThreads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
syncThreads[i] = new Thread(() -> {
for (int j = 0; j < iterations; j++) {
test.testSyncMap();
}
});
syncThreads[i].start();
}
for (Thread t : syncThreads) {
t.join();
}
long syncTime = System.nanoTime() - startTime;
System.out.println("ConcurrentHashMap Time: " + concurrentTime / 1_000_000 + " ms");
System.out.println("SynchronizedMap Time: " + syncTime / 1_000_000 + " ms");
}
}
输出示例:
ConcurrentHashMap Time: 70 ms
SynchronizedMap Time: 120 ms
分析:
ConcurrentHashMap
的细粒度锁和CAS操作提升并发性能。Collections.synchronizedMap
因全局锁导致竞争严重。
4.2 CopyOnWriteArrayList与SynchronizedList性能
测试CopyOnWriteArrayList
与Collections.synchronizedList
的读写性能:
import java.util.*;
import java.util.concurrent.*;
public class ListPerformance {
private final CopyOnWriteArrayList<Integer> cowList = new CopyOnWriteArrayList<>();
private final List<Integer> syncList = Collections.synchronizedList(new ArrayList<>());
public void testCowList() {
cowList.add(1);
}
public void testSyncList() {
syncList.add(1);
}
public static void main(String[] args) throws InterruptedException {
ListPerformance test = new ListPerformance();
int iterations = 100_000;
int threadCount = 5;
// CopyOnWriteArrayList性能
long startTime = System.nanoTime();
Thread[] cowThreads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
cowThreads[i] = new Thread(() -> {
for (int j = 0; j < iterations; j++) {
test.testCowList();
}
});
cowThreads[i].start();
}
for (Thread t : cowThreads) {
t.join();
}
long cowTime = System.nanoTime() - startTime;
// SynchronizedList性能
startTime = System.nanoTime();
Thread[] syncThreads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
syncThreads[i] = new Thread(() -> {
for (int j = 0; j < iterations; j++) {
test.testSyncList();
}
});
syncThreads[i].start();
}
for (Thread t : syncThreads) {
t.join();
}
long syncTime = System.nanoTime() - startTime;
System.out.println("CopyOnWriteArrayList Time: " + cowTime / 1_000_000 + " ms");
System.out.println("SynchronizedList Time: " + syncTime / 1_000_000 + " ms");
}
}
输出示例:
CopyOnWriteArrayList Time: 150 ms
SynchronizedList Time: 100 ms
分析:
CopyOnWriteArrayList
的写时复制导致写操作开销大。Collections.synchronizedList
因全局锁在写多场景下性能较优。
4.3 性能分析图表
以下是ConcurrentHashMap
、CopyOnWriteArrayList
与传统同步集合的性能对比柱状图:
{
"type": "bar",
"data": {
"labels": ["ConcurrentHashMap", "SynchronizedMap", "CopyOnWriteArrayList", "SynchronizedList"],
"datasets": [{
"label": "Execution Time (ms)",
"data": [70, 120, 150, 100],
"backgroundColor": ["#36A2EB", "#FF6384", "#FFCE56", "#4BC0C0"],
"borderColor": ["#36A2EB", "#FF6384", "#FFCE56", "#4BC0C0"],
"borderWidth": 1
}]
},
"options": {
"scales": {
"y": {
"beginAtZero": true,
"title": {
"display": true,
"text": "Time (ms)"
}
},
"x": {
"title": {
"display": true,
"text": "Collection Type"
}
}
}
}
}
分析:
ConcurrentHashMap
在高并发读写场景中性能最佳。CopyOnWriteArrayList
写性能较差,适合读多写少。- 传统同步集合因全局锁在高并发下性能较低。
第五部分:Concurrent集合的实际应用
5.1 高并发缓存(ConcurrentHashMap)
ConcurrentHashMap
实现分布式缓存:
import java.util.concurrent.ConcurrentHashMap;
public class DistributedCache {
private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();
public String getOrCompute(String key) {
return cache.computeIfAbsent(key, k -> {
System.out.println(Thread.currentThread().getName() + " computing: " + k);
return "Value-from-DB-" + k;
});
}
public static void main(String[] args) {
DistributedCache cache = new DistributedCache();
Runnable task = () -> cache.getOrCompute("user1");
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
threads[i] = new Thread(task, "Thread-" + i);
threads[i].start();
}
}
}
输出示例:
Thread-0 computing: user1
Thread-1: Value-from-DB-user1
Thread-2: Value-from-DB-user1
Thread-3: Value-from-DB-user1
Thread-4: Value-from-DB-user1
说明:
computeIfAbsent
确保缓存初始化原子性。- 高并发读操作无锁,提升性能。
5.2 动态事件处理(CopyOnWriteArrayList)
CopyOnWriteArrayList
管理动态事件监听器:
import java.util.concurrent.CopyOnWriteArrayList;
public class DynamicEventManager {
private final CopyOnWriteArrayList<Runnable> listeners = new CopyOnWriteArrayList<>();
public void addListener(Runnable listener) {
listeners.add(listener);
}
public void removeListener(Runnable listener) {
listeners.remove(listener);
}
public void fireEvent() {
for (Runnable listener : listeners) {
listener.run();
}
}
public static void main(String[] args) {
DynamicEventManager manager = new DynamicEventManager();
Runnable listener = () -> System.out.println(Thread.currentThread().getName() + " received event");
manager.addListener(listener);
Thread t1 = new Thread(manager::fireEvent, "Thread-1");
Thread t2 = new Thread(() -> {
manager.fireEvent();
manager.removeListener(listener);
manager.fireEvent();
}, "Thread-2");
t1.start();
t2.start();
}
}
输出示例:
Thread-1 received event
Thread-2 received event
说明:
- 写时复制支持动态添加/移除监听器。
- 迭代器弱一致性适应并发修改。
5.3 消息队列(ConcurrentLinkedQueue)
ConcurrentLinkedQueue
实现消息队列:
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MessageQueue {
private final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
private final ExecutorService executor = Executors.newFixedThreadPool(2);
public void publish(String message) {
queue.offer(message);
}
public void startConsuming() {
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
String message = queue.poll();
if (message != null) {
System.out.println(Thread.currentThread().getName() + " consumed: " + message);
}
}
});
}
public void shutdown() {
executor.shutdown();
}
public static void main(String[] args) throws InterruptedException {
MessageQueue queue = new MessageQueue();
queue.startConsuming();
for (int i = 0; i < 5; i++) {
queue.publish("Message-" + i);
}
Thread.sleep(100);
queue.shutdown();
}
}
输出示例:
pool-1-thread-1 consumed: Message-0
pool-1-thread-2 consumed: Message-1
pool-1-thread-1 consumed: Message-2
pool-1-thread-2 consumed: Message-3
pool-1-thread-1 consumed: Message-4
说明:
- 无锁队列支持高并发消息发布。
poll
和offer
操作高效且线程安全。
第六部分:Concurrent集合的最佳实践
6.1 使用原子操作
优先使用ConcurrentHashMap
的compute
、putIfAbsent
等方法:
map.computeIfAbsent(key, k -> computeValue(k));
6.2 选择合适的集合
- 读多写少:使用
CopyOnWriteArrayList
或ConcurrentHashMap
。 - 队列场景:使用
ConcurrentLinkedQueue
或BlockingQueue
。 - 有序需求:使用
ConcurrentSkipListMap
。
6.3 避免迭代器依赖
迭代器是弱一致性的,避免依赖其最新状态:
// 可能不反映最新更新
for (String key : map.keySet()) {
System.out.println(key);
}
6.4 管理内存开销
CopyOnWriteArrayList
写操作复制数组,需监控内存使用:
if (list.size() > MAX_SIZE) {
// 清理或限制添加
}
6.5 诊断与监控
使用size()
、isEmpty()
等方法检查状态:
System.out.println("Queue size: " + queue.size());
第七部分:总结与展望
7.1 总结
本文全面解析了Java Concurrent集合的原理、实现、性能和应用场景。ConcurrentHashMap
适合高并发键值存储,CopyOnWriteArrayList
适合读多写少的列表管理,ConcurrentLinkedQueue
适合任务队列。开发者需根据场景选择合适的集合,并注意弱一致性和内存开销。
7.2 展望
随着Project Loom的虚拟线程和结构化并发的引入,Concurrent集合将与虚拟线程结合,提供更高的并发性能。开发者应关注无锁编程、响应式编程的进展,以应对未来挑战。