0
点赞
收藏
分享

微信扫一扫

Java多线程并发编程之构建自定义同步工具

陆佃 2023-01-10 阅读 119



当Java类库没有提供适合的同步工具时,就需要构建自定义同步工具。



可阻塞状态依赖操作的结构


​​acquir lock on object state;//请求获取锁​​
1.

​​while(precondition does not hold){//没有满足前提条件​​
1.

​​ release lock;//先释放锁​​
1.

​​ wait until precondition might hold;//等待满足前提条件​​
1.

​​ optionlly fail if interrupted or timeout expires;//因为中断或者超时执行失败​​
1.

​​ reacquire lock;//重新尝试获取锁​​
1.

​​}​​
1.


1.

​​perform action//执行​​
1.

​​ release lock;//释放锁​​





有界缓存实现基类示例


​​public class BaseBoundBuffer<V> {​​
1.

​​ private final V[] buf;​​
1.

​​ private int tail;​​
1.

​​ private int head;​​
1.

​​ private int count;​​
1.


1.

​​ @SuppressWarnings("unchecked")​​
1.

​​ public BaseBoundBuffer(int capacity) {​​
1.

​​ buf = (V[]) new Object[capacity];​​
1.

​​ }​​
1.


1.

​​ public synchronized void doPut(V v) {​​
1.

​​ buf[tail] = v;​​
1.

​​ if (++tail == buf.length)​​
1.

​​ tail = 0;​​
1.

​​ count++;​​
1.

​​ }​​
1.


1.

​​ public synchronized V doTake() {​​
1.

​​ V v = buf[head];
​​
1.

​​ if (++head == buf.length)​​
1.

​​ head = 0;​​
1.

​​ count--;​​
1.

​​ return v;​​
1.

​​ }​​
1.


1.

​​ public final synchronized boolean isFull() {​​
1.

​​ return count == buf.length;​​
1.

​​ }​​
1.


1.

​​ public final synchronized boolean isEmpty() {​​
1.

​​ return count == 0;​​
1.

​​ }​​
1.

​​}​​


阻塞实现方式一:抛异常给调用者


​​  public synchronized void put1(V v)  throws Exception{​​
1.

​​ if(isFull())​​
1.

​​ throw new Exception("full error");​​
1.

​​ doPut(v);​​
1.

​​ }​​


需要调用者是处理前提条件失败的情况,并没有解决根本问题。


阻塞实现方式二:通过轮询和休眠


​​  public void put2(V v) throws InterruptedException {​​
1.

​​ while (true) {//轮询​​
1.

​​ synchronized (this) {​​
1.

​​ if (!isFull()) {​​
1.

​​ doPut(v);​​
1.

​​ return; ​​
1.


​​ }​​
1.

​​ }​​
1.

​​SLEEP_TIME);//休眠​​
1.

​​ }​​
1.

​​ }​​


分析:很难权衡休眠时间SLEEP_TIME设置。如果设置过小,CPU可能会轮询多次,消耗CPU资源也越高;如果设置过大,响应性就越低。


阻塞实现方式三:条件队列


条件队列中的元素是一个个等待相关条件的线程。每个Java对象都可以作为一个锁,每个对象同样可以作为一个条件队列, 并且Object中的wait、notify、notifyAll方法就构成了内部条件队列的API。Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其它线程能获得这个锁并修改对象的状态。Object.notify和Object.notifyAll能唤醒正在等待线程,从条件队列中选取一个线程唤醒并尝试重新获取锁。


​​  public synchronized void put3(V v) throws InterruptedException {​​
1.

​​ while(isFull())​​
1.

​​ wait();​​
1.

​​ doput(v);​​
1.

​​ notifyAll();​​
1.

​​ }​​


分析:获得较好响应,简单易用。




使用条件队列

1.条件谓词

  • 定义:条件谓词是使某个操作成为状态依赖操作的前提条件。条件谓词是由类中各个状态变量构成的表达式。例如,对于put方法的条件谓词就是“缓存不为空”。
  • 关系:在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个状态变量,而每个状态变量必须由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象和条件队列对象(及调用wait和notify等方法所在的对象)必须是同一个对象。
  • 约束:每次调用wait都会隐式地和特定的条件谓词相关联,当调用特定条件谓词时,调用者必须已经持有与条件队列相关的锁,这个锁必须还保护这组成条件谓词的状态变量


2.条件队列使用规则

  • 通常都有一个条件谓词
  • 永远在调用wait之前测试条件谓词,并且在wait中返回后再次测试;
  • 永远在循环中调用wait;
  • 确保构成条件谓词的状态变量被锁保护,而这个锁必须与这个条件队列相关联;
  • 当调用wait、notify和notifyAll时,要持有与条件队列相关联的锁;
  • 在检查条件谓词之后,开始执行被保护的逻辑之前,不要释放锁;



3.通知

尽量使用notifyAll,而不是nofify.因为nofify会随机唤醒一个线程从休眠状态变为Blocked状态(Blocked状态是种线程一直处于尝试获取锁的状态,即一旦发现锁可用,马上持有锁),而notifyAll会唤醒条件队列中所有的线程从休眠状态变为Blocked状态.考虑这么种情况,假如线程A因为条件谓词Pa进入休眠状态,线程B因为条件谓词Pb进入休眠状态.这时Pb为真,线程C执行单一的notify.如果JVM随机选择了线程A进行唤醒,那么线程A检查条件谓词Pa不为真后又进入了休眠状态.从这以后再也没有其它线程能被唤醒,程序会一直处于休眠状态.如果使用notifyAll就不一样了,JVM会唤醒条件队列中所有等待线程从休眠状态变为Blocked状态,即使随机选出一个线程一因为条件谓词不为真进入休眠状态,其它线程也会去竞争锁从而继续执行下去.


4.状态依赖方法的标准形式


​​void stateDependentMethod throws InterruptedException{​​
​​ synchronized(lock){​​
​​
​​
​​ while(!conditionPredicate))​​
​​ lock.wait();​​
​​ ​​
​​ //dosomething();​​
​​ ....​​
​​
​​
​​ notifyAll();​​
​​}​​


显示Condition对象


显示的Condition对象是一种更灵活的选择,提供了更丰富的功能:在每个锁上可以存在多个等待,条件等待可以是中断的获不可中断的,基于时限的等待,以及公平的或非公平的队列操作。一个Condition可以和一个Lock关联起来,就像一个条件队列和一个内置锁关联起来一样。要创建一个Condition,可以在相关联的Lock上调用Lock.newCondition方法。以下用显示条件变量重新实现有界缓存


​​public class ConditionBoundedBuffer<V> {​​
​​ private final V[] buf;​​
​​ private int tail;​​
​​ private int head;​​
​​ private int count;​​
​​ private Lock lock = new ReentrantLock();​​
​​ private Condition notFullCondition = lock.newCondition();​​
​​ private Condition notEmptyCondition = lock.newCondition();​​
​​
​​
​​ @SuppressWarnings("unchecked")​​
​​ public ConditionBoundedBuffer(int capacity) {​​
​​ buf = (V[]) new Object[capacity];​​
​​ }​​
​​ ​​
​​ public void doPut(V v) throws InterruptedException {​​
​​ try {​​
​​ lock.lock();​​
​​ while (count == buf.length)​​
​​ notFullCondition.await();​​
​​ buf[tail] = v;​​
​​ if (++tail == buf.length)​​
​​ tail = 0;​​
​​ count++;​​
​​ notEmptyCondition.signal();​​
​​ } finally {​​
​​ lock.unlock();​​
​​ }​​
​​ }​​
​​ ​​
​​ public V doTake() throws InterruptedException {​​
​​ try {​​
​​ lock.lock();​​
​​ while (count == 0)​​
​​ notEmptyCondition.await();​​
​​ V v = buf[head];​​
​​ buf[head] = null;​​
​​ if (++head == buf.length)​​
​​ head = 0;​​
​​ count--;​​
​​ notFullCondition.signal();​​
​​ return v;​​
​​ } finally {​​
​​ lock.unlock();​​
​​ }​​
​​ }​​
​​}​​

举报

相关推荐

0 条评论