- ForkJoin(分支合并计算):(底层维护一个双向队列)
- 并发
public class HuLock{
//获取锁
public void lock(){}
//释放锁
public void unlock(){}
}
HuLock huLock = new HuLock();
public void doSomething(){
//获取锁,表示同一时刻只允许一个线程执行这个方法
huLock.lock();
try{
...
} finally{
//优雅,在finally中释放锁
huLock.unlock();
}
}
来个测试代码测试一下:
//构造一个,可能发生线程安全问题的共享变量
private static long count = 0;
//让两个线程,并发对count++
public static void main(String[] args) throws Exception{
Thread thread1 = new Thread(() -> add());
Thread thread2 = new Thread(() -> add());
//启动两个线程
thread1.start();
thread2.start();
//等待两个线程执行结束
thread1.join();
thread2.join();
System.out.println(count);
private static ExampleLock example
= new ExampleLock();
//循环执行count++;,进行100000次
private static void add(){
exampleLock.lock();
for(int i = 0; i < 100000; i++){
count++;
}
add2();
//要是没啥异常,我就直接释放锁了
exampleLock.unlock();
}
}
@Override
public boolean tryAcquire(int acquires) {
// CAS 方式尝试获取锁,成功返回true,失败返回false
if (compareAndSetState(0, 1)) {
return true;
}
return false;
}
这段代码在尝试获取锁时一上来判断并开始抢锁,一旦抢锁成功就返回true。那我可以这样呀,就是加入一些机制,让线程不要一有机会就抢锁,先考虑一下其他线程的感受再抢锁,公平一点,哥们,你是不是从来没抢到呀,那要不你先来这种
//此时就得研究一下AQS的内部实现逻辑了,也就是原理了,干他丫的
public abstract class AbstractQueuedSynchronizer {
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
static final class Node {}
}
static final class Node {
// ... 省略一些暂不关注的
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
//想要公平锁,调用的时候就传true进来
public HuLock(boolean fair){
sync = fair ? new FairSync() : new NonFairSync();
}
- J.U.C常用组件
- CountDownLatch:原理:每次有线程调用countDown()这个方法后,计数器就会数量-1,假设计数器数量==0,然后await()方法就会被唤醒并被执行
- CyclicBarrier:用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行
和CountdownLaych相似,都是通过维护计数器来实现的。线程执行await()方法之后计数器会-1,并进行等待,直到计数器减为0,所有await()方法执行完后,之前再等待的线程才能继续执行,CyclicBarrier类
- Semaphore(信号量):限流的时候用(Semaphore类似于OS中的信号量,可以控制对互斥资源的访问线程数(例子代码中模拟了对某个服务的并发请求,每次只能有3个客户端同时访问,请求总数为10))
此外, - java.util.concurrent类中提供了Condition类来实现线程之间的协调
- ReadWriteLock
写个缓存,没有用锁之前,读写操作出错了,本来咱们实例化了五个线程出来,虽然读操作可以多线程同时执行,但是写操作必须第一个写完第二个再写,你看咱们打印出的结果,说明不用锁时明显多线程写入时出错了
将刚才咱们写的缓存,加上读写锁,控制一下写入的顺序(用读写锁将写入操作保护为一个原子性的操作)
- BlockingQueue
- 同步队列:SynchronousQueue:容量为1,就是每次进如队列一个元素后,队列就满了,必须等待取出来那一个之后,才能再往里面放入一个元素
- ForkJoin
public class ForkJoinExample extends RecursiveTask<Integer>{
private final int threshod = 5;
private int first;
private int last;
public ForkJoinExample(int first, int last){
this.first = first;
this.last = last;
}
@Override
protected Integer computer(){
int result = 0;
if(last - first <= threshold){
//任务足够小则直接计算
for(int i = first; i < last; i++){
result += i;
}
} else{
//否则,代表任务太大,需拆分成小任务
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 10000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}
-
并发集合,一部分,不全
- 多用并发集合少用同步集合,比如应该使用ConcurrentHashMap而少用Hashtable
-
ConcurrentHashMap
咱们一般:
Map<String, String> map = new HashMap<>();
这里经常会出现这种错误:
解决方法一:
解决方法二:
Map<String, String> map = new ConcurrentHashMap<>();
- CopyOnWriteArrayList
Notes:
凡是多线程总是会遇到java.util.ConcurrentModificationException并发修改异常
解决方法一:
List<String> list = new Vector<>();//Vector默认是线程安全的
解决方法二:
用Collections工具类里面的方法
解决方法三:
用JUC下面的类
用的是Lock模板圈实现锁
解决方法三(用CopyOnWrite)比解决方法一(用Vector)牛逼在哪里,源码里面CopyOnWrite的add()方法用的是lock模板圈,Vector的add()方法用的是synchronized,synchronized比lock模板圈慢一点
- CopyOnWriteArraySet
解决方法一:用Collections工具类里面的方法
解决方法二:用JUC下面的类
Set<String> set = new CopyOnWriteArraySet<>();
回过头来:
另外,还有一些零碎概念:
- 其实阻塞和唤醒就是通过改写state值去玩的, 再加上调度算法就可以实现状态的切换了