Java并发容器和框架
参考书籍:《Java并发编程的艺术》
1 ConcurrentHashMap
ConcurrentHashMap时线程安全且高效的HashMap。
1.1 CurrentHashMap的使用原因
1.1.1 HashMap线程不安全
多线程下,使用HashMap进行put会引起死循环,所以并发情况下不能使用HashMap,例子如下:
final HashMap<String, String> map = new HashMap<String,String >(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 10000; i++){
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString()," ");
}
},"ftf" + i).start();
}
}
},"ftf");
t.start();
t.join();
原因是HashMap再并发执行put操作时会引起死循环,多线程导致HashMap的Entry链表形成唤醒数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
1.1.2 HashTable效率低下
HashTable容器使用synchronized保证线程安全,但是线程竞争激烈情况下,HashTable效率非常低下。因为同一时刻只能有一个线程使用put元素添加元素,所以竞争越激烈,效率越低。
1.1.3 ConcurrentHashMap的锁分段技术
HashTable效率低下的原因是所有线程都必须竞争一把锁。而ConcurrentHashMap使用了锁分段技术,每把锁用于容器中的一部分数据,这样多线程访问容器里不同数据段时,线程之间就不会竞争,有效提高并发效率。
1.2 ConcurrentHashMap的结构
从下面的类图可以看出,ConcurrentHashMap是有Segment数据结构和HashEntry数组结构组成的。Segment是一种可重入锁,HashEntry用于存储键值对。一个Segment里面包含了一个HashEntry数组,HashEntry是一个链表结构的元素。因此每个Segment守护着一个HashEntry数组的元素,当对HashEntry数组的数据进行修改时,首先要获得它对应的Segment锁。
1.3 ConcurrentHashMap的初始化
1.3.1 初始化segment数组
segment数组的长度ssize是通过concurrencyLevel计算得出的。ssize大小为2的N次方,这是为了保证通过按位与的散列算法来定位segment数组的索引。第一个大于等于currencyLevel的2的N次方指就是ssize。例如currencyLevel等于14时,ssize等于16.
1.3.2 初始化segmentShift和SegmentMask
首先需要知道sshift等于ssize从1左移到大于等于currencyLevel的次数,比如currencyLevel等于14,ssize等于16,sshift等于4,那么segmentShift等于32-ssift,segmentMask等于ssize-1。32是因为hash方法最大数位32位的。
1.3.3 初始化每个segment
输入参数initialCapacity是初始化容量,loadfactor是每个segment的负载因子,在构造方法中需要通过两个参数来初始化每个segment。默认情况下segmengt容量16,负载因子为0.75。
1.4 定位Segment
要定位到Segment,首先将hashCode使用Wng/Jenkins hash的变种算法进行再散列。目的是减少散列冲突,使元素均匀地分布在不同地Segment上,从而提高容器的存取效率。然后进行散列计算。
1.5 ConcurrentHashMap的操作
1.5.1 get操作
get操作先经过一次再散列,然后使用散列值通过散列运算定位到Segment,再通过散列算法定位到元素。get操作不需要加锁,这是因为get方法里要使用的共享变量都定义成volatile,能够保证再线程之间的可见性,只能被单线程写,而读不需要加锁。这是用volatile替换锁的经典应用场景。
1.5.2 put操作
put操作首先定义到Segment位置,然后经历两步:第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置,然后将其放在HashEntry数组里。
1、是否需要扩容
在插入元素前会判断是否超过容量,超过则扩容。相比之下,HashMap扩容机制是插入元素后判断元素是否已经到达容量,到达了就进行扩容,但是很有可能扩容后没有新元素插入,这样HashMap就进行了一次无效扩容。
2、如何扩容
扩容的时候,首先会创建容量是原来容量的两倍的数组,然后将原数组的元素进行再散列后插入新的数组,为了高效,ConcurrentHashMap不会对整容器扩容,而是对某个Segment扩容。
1.5.3 size操作
Concurrent容器内元素的个数,需要统计每个Segment里面的元素个数,每个Segment里维护了volatile修饰的count全局变量,那么size操作仅仅是把所有count加起来就行吗?答案是否定的,因为如果累加count的过程中,前面计算过的Segment插入新元素,那么size就并不准确。那么直接把所有Segment锁住再统计不就行了吗?这样的话size就大大降低了并发度。
实际上,size操作是先尝试通过不锁住Segment的方式来统计所有Segment的元素个数,如果count不发生变化,则统计成功,如果count发生了变化统计结果不同,那么就会采用加锁的方式来统计所有Segment的元素个数。如何判断count发生了变化呢?使用modCount变量,在put、remove、clean方法操作元素时都会把modCount变量加一,统计前后比较modCount是否发生变化,从而得知容器大小是否变化。
2 ConcurrentLinkedQueue
要实现线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞散发。阻塞算法使用锁实现,非阻塞算法使用CAS实现。
ConcurrentLinkedQueue是一种基于链接节点的无界线程安全队列,采用先进先出的规则对节点排序,采用”wait-free“算法(CAS)实现。
2.1 ConcurrentLinkedQueue的结构
ConcurrentLinkedQueue的类图如下所示,ConcurrentLinkedQueue由head节点和tail节点组成,每个节点由节点元素和指向下一个节点的指针组成,从而组成单向链表结构的队列。
2.2 入队
2.2.1 入队的过程
入队就是把入队节点添加到队列的尾部。主要做两件事,第一是将入队节点设置为当前队列尾节点的下一个节点,第二个是更新tail节点,这里和普通队列并不相同,如果tail节点的next节点为空,则将入队节点作为tail节点的next节点,如果tail节点的next节点不为空,则将入队节点作为tail节点,具体看下图。其中第二步会使用CAS算法,将入队节点设置成尾节点的next节点,不成功则重试。
2.2.2 定位尾节点
上面介绍可知,tail节点并不总是尾节点,每次入队需要通过tail节点来找到尾节点,尾节点可能是tail节点,也有可能是tail节点的next节点。
2.2.3 HOPS的设计意图
可以看到tail节点并不总是尾节点,那么为什么不让tail节点永远作为队列的尾节点呢?这样不是代码量少而且逻辑清晰易懂吗?原因是:这样做每次都要循环CAS更新tail节点,减少CAS更新tail的次数,就能提高入队的效率。使用HOPS常量来控制tail节点和尾节点的距离(默认为1),当距离大于等于1时,才会更新tail节点。距离长带来的负面效果时每次入队定位尾节点的时间越长,但本质上它通过增加对volatile变量的读操作来减少对volatile变量的写操作,而写操作开销远大于读操作的开销,因此入队效率会有所提升。
2.3 出队
出队就是从队列里返回一个节点的元素,并清空该节点的元素引用。和入队类似,出队并不会每次更新head节点,只有当head节点里没有元素时,出队操作才会更新head节点。注意head节点一开始是指向空节点的。步骤如下:首先获取头节点的元素,判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,循环获取;如果不为空,使用CAS方式将头节点的引用设置为null,如果CAS成功,直接返回头节点元素,如果不成功,表示一个线程已经进行了一次出队操作更新了head节点,导致元素变化,需要重新获取头节点。注意头节点不一定等于head节点,head节点只有距离大于等于HOPS时才会更新。
3 Java中的阻塞队列
3.1 阻塞队列定义
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列:
1、支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的进程,直到队列不满
2、支持阻塞的移除方法:当队列为空时,获取元素的线程会等待队列变成非空
阻塞队列常用于生产者消费者模式,生产者向队列里添加元素,消费者从队列取元素。阻塞队列就是用来存放元素、获取元素的容器。
3.2 Java里的阻塞队列
1、ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
2、LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
3、PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
4、DelayQueue:一个使用优先级队列实现的无界阻塞队列。
5、SynchronousQueue:一个不存储元素的阻塞队列。
6、LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
7、LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
3.3 阻塞队列的实现原理
==使用通知模式实现。==所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。以JDK中的ArrayBlockingQueue为例,使用了Condition来实现。
当往队列里插入一个元素,如果队列不可用,那么主要通过LockSupport.park(this)来实现阻塞生产者。当线程被阻塞队列阻塞时,线程会进入WAITING状态。
4 Fork/Join框架
Fork/Join是Java7提供的并行执行任务的框架,Fork就是把一个大任务切分成若干个子任务并行地执行,Join就是合并这些子任务地执行结果,最后得到这个大任务地结果。
4.1 工作窃取算法
工作窃取算法是指某个线程从其他队列里窃取任务来执行。对于一个比较大地任务,可以把这个任务分成若干不依赖的子任务,每个子任务放到不同的队列中,队列和线程一一对应。存在一种情况,某一队列的任务被某一线程完成了,而其他队列还有任务未完成,这时为了尽快完成任务,已经完成任务的线程会从未完成的队列中窃取一些任务来执行。通常会使用双端队列,被窃取的线程从队列头部拿任务,窃取线程从队列尾部拿任务。
工作窃取算法的优点是:充分利用线程进行并行计算,减少了线程间的竞争。
工作窃取算法的缺点是:在某些情况下还是存在竞争,例如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。
4.2 使用Fork/Join框架的例子
下面以一个简单例子演示如何使用Fork/Join框架。需求是:计算1+2+3+4的结果(实际情况不会这么简单,仅仅是作为演示)。
首先,考虑把总任务fork成两个子任务,一个子任务计算1+2,另一个子任务计算3+4,然后再join两个任务的结果。
ForkJoinTask与一般任务的主要区别在于它需要实现compute方法,在这个方法里,需要判断任务是否足够小,如果足够小就直接执行任务,如果不够小,就分割成两个子任务,每个子任务调用fork方法时,又会进入compute方法,判断当前子任务是否需要继续分割,如果不需要继续分割,就执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到结果。
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end){
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end-start)<=THRESHOLD;
if(canCompute){
for(int i=start;i<=end;i++){
sum+=i;
}
}
else{
int middle = (start+end)/2;
CountTask leftTask = new CountTask(start,middle);
CountTask rightTask = new CountTask(middle+1,end);
leftTask.fork();
rightTask.fork();
int leftResult = leftTask.join();
int rightResult = rightTask.join();
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(1,4);
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
4.3 Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
4.3.1 ForkJoinTask的fork方法
当调用fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步的执行这个任务,然后立即返回结果。pushTask方法把当前任务存放在ForkJoinTask数组队列里,再调用ForkJoinPool的signalWork方法唤醒或创建一个工作进程来执行任务。
4.3.2 ForkJoinTask的join方法
Join方法主要是阻塞当前线程并等待获取结果。首先,调用doJoin方法,通过它来得到当前任务的状态,从而判断返回什么结果,任务状态有四种:已完成(NORMAL),被取消(CANCELED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。