一、介绍
CountDownLatch,CyclicBarrier和Semaphore都是java并发包concurrent提供的并发编程的工具类,是比synchrorized(关键字)更高效的同步结构。
CountDownLatch:某个线程阻塞等待,直到其他线程执行完,他才被唤醒执行
CyclicBarrier:一组线程相互等待到某个状态,然后这一组线程再同时执行
Semaphore:实现多个共享资源的互斥使用,同时控制线程的并发数
二、三个类的使用
1、CountDownLatch
让调用await()的线程等待,直到指定数量的线程执行完(每执行完一个countDown(),直到减为0),唤醒等待的线程执行
- 构造器:
//count为计数值
public CountDownLatch(int count) {}
- 常用方法:
//使调用该方法的线程阻塞,直到latch减到0,或该线程中断interrupted
public void await(){ }
/**
*使调用该方法的线程阻塞,直到latch减到0,或超出指定阻塞时间,或该线程中断interrupted
* timeout 等待时间
* unit 等待的时间的单位
*/
public boolean await(long timeout, TimeUnit unit) {}
//count减1,直到减为0,释放所有调用await()的线程
public void countDown() {}
- 使用实例
public class Test {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
new Thread(){
public void run() {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread(){
public void run() {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
try {
System.out.println("等待2个子线程执行完毕...");
latch.await();
System.out.println("2个子线程已经执行完毕");
System.out.println("继续执行主线程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 执行结果:
等待2个子线程执行完毕...
子线程Thread-0正在执行
子线程Thread-1正在执行
子线程Thread-1执行完毕
子线程Thread-0执行完毕
2个子线程已经执行完毕
继续执行主线程
2、CyclicBarrier
让一组正在执行的线程调用await()(相当于给线程加了一个Barrier)等待,当达到指定数量时在全部同时执行。
- 构造器:
//parties:指让多少个线程或者任务等待至barrier状态
//barrierAction:当这些线程都达到barrier状态时会执行的内容。
public CyclicBarrier(int parties) {}
public CyclicBarrier(int parties, Runnable barrierAction) {}
- 常用方法:
//挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务
public int await(){};
//让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务
public int await(long timeout, TimeUnit unit){ }
- 使用实例
//假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() {
@Override
public void run() {
System.out.println("当前线程"+Thread.currentThread().getName());
}
});
for(int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(5000); //以睡眠来模拟写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}
- 运行结果:
线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2正在写入数据...
线程Thread-3正在写入数据...
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
当前线程Thread-3
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
3、Semaphore
实现多个资源互斥访问,并控制线程并发数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可
正常的锁(concurrency.locks或synchronized锁)在任何时刻都只允许一个任务访问一项资源,而 Semaphore允许n个任务同时访问这个资源
- 构造函数:
//参数permits表示某资源的数量,即同时可以允许多少线程进行访问
public Semaphore(int permits) {}
//fair表示是否是公平的,若为公平,则先来的限制性,不公平,抢占式执行
public Semaphore(int permits, boolean fair) {
sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}
- 常用方法:
//获取一个资源,若没有则等待直到获取为止。
public void acquire(){ }
//获取指定个数的资源,若没有则等待直到获取为止。
public void acquire(int permits){ }
//释放一个资源
public void release() { }
//释放指定个数的资源
public void release(int permits) { }
//下述方法会返回执行结果
//尝试获取一个资源,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire() { };
//尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit){ };
//尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits) { };
//尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit){ };
//获取当前可用的资源数目
public int availablePermits() {}
- 使用实例
// 假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。
public class Test {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 执行结果:
工人0占用一个机器在生产...
工人1占用一个机器在生产...
工人2占用一个机器在生产...
工人4占用一个机器在生产...
工人5占用一个机器在生产...
工人0释放出机器
工人2释放出机器
工人3占用一个机器在生产...
工人7占用一个机器在生产...
工人4释放出机器
工人5释放出机器
工人1释放出机器
工人6占用一个机器在生产...
工人3释放出机器
工人7释放出机器
工人6释放出机器