0
点赞
收藏
分享

微信扫一扫

8.Condition 控制线程通信

8.Condition 控制线程通信

前言

前一篇我们讲述了 同步锁 Lock,那么下面肯定就要讲解一下 同步锁 Lock 如何控制线程之间的通讯。

不过,在讲解 同步锁 Lock 通讯之间,我们首先来回顾一下 基本同步控制之间的线程声明周期,如下图:


8.Condition 控制线程通信_并发编程

image-20200822082951771

可以看到上面有很多通讯的方法. 其中最主要的就是以下的 wait() notify() notifyAll() 方法。这些就是控制线程间通讯的方法。

wait() 与 notify() 和 notifyAll()

  • **wait()**:令当前线程挂起并放弃CPU、同步资源并等待,使别的线程可访问并修改共享资源,而当前线程排队等候其他线程调用notify()或notifyAll()方法唤醒,唤醒后等待重新获得对监视器的所有权后才能继续执行。
  • **notify()**:唤醒正在排队等待同步资源的线程中优先级最高者结束等待
  • **notifyAll ()**:唤醒正在排队等待资源的所有线程结束等待.
说明:
1. wait(),notify(),notifyAll()三个方法必须使用在同步代码块或同步方法中。
2. wait(),notify(),notifyAll()三个方法的调用者必须是同步代码块或同步方法中的同步监视器。
否则,会出现IllegalMonitorStateException异常
3. wait(),notify(),notifyAll()三个方法是定义在java.lang.Object类中。

wait() 方法

  • 在当前线程中调用方法:对象名.wait()
  • 使当前线程进入等待(某对象)状态 ,直到另一线程对该对象发出 notify (或notifyAll) 为止。
  • 调用方法的必要条件:当前线程必须具有对该对象的监控权(加锁)
  • 调用此方法后,当前线程将释放对象监控权 ,然后进入等待
  • 在当前线程被notify后,要重新获得监控权,然后从断点处继续代码的执行。

notify()/notifyAll()

  • 在当前线程中调用方法:对象名.notify()
  • 功能:唤醒等待该对象监控权的一个/所有线程。
  • 调用方法的必要条件:当前线程必须具有对该对象的监控权(加锁)

生产者消费者案例 - 虚假唤醒

现在我们来使用 生产者消费者的案例 来演示 虚假唤醒(notifyAll)的问题。

生产者(​​Productor​​​)将产品交给店员(​​Clerk​​)

而消费者(​​Customer​​​)从店员(​​Clerk​​)处取走产品

店员(​​Clerk​​)一次只能持有固定数量的产品(比如:1)

如果生产者试图生产更多的产品,店员会叫生产者停一下 (wait) ,如果店中有空位放产品了再通知 (notifyAll) 生产者继续生产

如果店中没有产品了,店员会告诉消费者等一下 (wait),如果店中有产品了再通知 (notifyAll) 消费者来取走产品。

1.首先写一个店员类 Clerk 收货以及卖货的方法

// 店员类
class Clerk {

//定义商品的数量
private int product = 0; // 当前商品的数量
private int total = 10; // 货架允许存放的商品总数量

//进货的同步方法:提供给生产者线程调用
public synchronized void get() {
if (product > total) {
// 如果商品超出 货架允许存放商品的数量,则提示产品已摆满!
System.out.println("产品已摆满!");
} else {
// 如果还可以摆放产品,则继续摆放,设置产品的数量++,然后打印出来
System.out.println(Thread.currentThread().getName() + ":" + ++product);
}
}

//卖货的同步方法:提供给消费者线程调用
public synchronized void sale() {
if (product <= 0) {
//如果商品数量小于0,说明缺货了
System.out.println("缺货了");
} else {
//如果商品数量大于0,说明还有商品可以卖(商品的数量--)
System.out.println(Thread.currentThread().getName() + " :" + --product);
}
}

}

2. 创建生产者线程类

//生产者
class Productor implements Runnable {
//定义员工类对象
private Clerk clerk;

//构造器:接收传递的员工类对象
public Productor(Clerk clerk) {
this.clerk = clerk;
}

//生产者线程
@Override
public void run() {
//循环20次,调用员工收货,增加货物的数量
for (int i = 0; i < 20; i++) {
// 休眠200毫秒
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
//调用员工收货
clerk.get();
}
}
}

3.创建消费者线程类

//消费者
class Consumer implements Runnable {
//定义员工类对象
private Clerk clerk;

//构造器:接收传递的员工类对象
public Consumer(Clerk clerk) {
this.clerk = clerk;
}

//消费者线程
@Override
public void run() {
//循环20次,调用员工卖货,减少货物的数量
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}

4.创建生产者和消费者的线程,执行测试

public class TestProductorAndConsumer {

public static void main(String[] args) {
//创建员工类对象
Clerk clerk = new Clerk();
//创建生产者线程类对象
Productor productor = new Productor(clerk);
//创建消费者线程类对象
Consumer consumer = new Consumer(clerk);

//创建线程
new Thread(productor, "生产者A").start();
new Thread(consumer, "消费者B").start();
}

}

执行测试如下:

8.Condition 控制线程通信_多线程_02


image-20201103211614315

5.存在的问题:消费者就算没货,依然不断去消费;生产者已经摆满产品,依然不断去生产产品

那么怎么去解决这个问题呢?

这个时候,就需要使用线程的通信 wait() 和 notifyAll() 的方法来处理了。处理思路如下:

  • 生产者:当生产已经摆满产品,那么就设置线程阻塞 wait() ;反之,使用 notifyall() 开启生产
  • 消费者:当产品已经缺货,那么久设置线程阻塞 wait(); 反之,使用 notifyall() 开启消费

5.1 生产者:当生产已经摆满产品,那么就设置线程阻塞 wait() ;反之,使用 notifyall() 开启生产

8.Condition 控制线程通信_接口_03


image-20201103212511552

5.2 消费者:当产品已经缺货,那么久设置线程阻塞 wait(); 反之,使用 notifyall() 开启消费

8.Condition 控制线程通信_thread_04


image-20201103212538344

5.3 执行效果

8.Condition 控制线程通信_java_05


image-20201103212926441

6.存在问题:当我们将生产产品的总数量调小为 1,双线程将会无法停止执行。根本的原因是最后一个 wait 没有被唤醒

6.1 首先我们将生产产品的总数量调小为 1,如下

8.Condition 控制线程通信_java_06


image-20201103213322161

6.2 再次执行生产者 与 消费者 的线程,发现无法停止

8.Condition 控制线程通信_thread_07


image-20201103213544563

8.Condition 控制线程通信_java_08


image-20201103214105426

其实根本的原因是线程在循环获取的时候,由于全部都走入了 ​​if(product > total)​​​ 中,无法走到 ​​else​​ 分支唤醒线程,导致一直阻塞。

解决的办法很简单,只要去掉 ​​else​​ 分支即可,将唤醒的步骤放在最下方一定会执行的地方。

  • 去除生产的 get() 方法 else 分支

8.Condition 控制线程通信_thread_09


image-20201103214316925

  • 去除消费的 sale() 方法 else 分支

8.Condition 控制线程通信_接口_10


image-20201103214402805

6.3 执行效果

8.Condition 控制线程通信_java_11


image-20201103214446056

7.存在问题:虚假唤醒。

上面的我们单个生产者,单个消费者执行是没有问题的。但是如果有多个生产者和消费者,将会出现 并发虚假唤醒的情况。

7.1 创建两个生产者、消费者线程执行

8.Condition 控制线程通信_java_12


image-20201103215958829

这个为什么会出现这种情况呢?

这是因为线程的 notifyAll() 方法会将所有并发的 消费者 或者 生产者 线程进行唤醒,导致重复计算 产品的数量,从而导致数量的错误。

而这种同时将多个阻塞线程唤醒的情况,就是虚假唤醒。

7.2 查看API文档,确认如何解决虚假唤醒

在文档中搜索 ​​object​​​ 类,查看 ​​wait()​​ 方法如下:

8.Condition 控制线程通信_接口_13


image-20201103220510583

8.Condition 控制线程通信_接口_14


image-20201103220601394

7.3 在 ​​wait() ​​​方法调用的地方,改为 ​​while​​ 循环

8.Condition 控制线程通信_thread_15


image-20201103220704583

8.Condition 控制线程通信_多线程_16


image-20201103220746383

8.Condition 控制线程通信_并发编程_17


image-20201103220804721

7.4 测试执行

在生产与消费方法中,使用 while 解决了 虚假唤醒之后,下面来执行看看,如下:

8.Condition 控制线程通信_thread_18


image-20201103220928349

8.完整演示源码

/**
* 生产者和消费者案例
*/
public class TestProductorAndConsumer {

public static void main(String[] args) {
//创建员工类对象
Clerk clerk = new Clerk();
//创建生产者线程类对象
Productor productor = new Productor(clerk);
//创建消费者线程类对象
Consumer consumer = new Consumer(clerk);

//创建线程
new Thread(productor, "生产者A").start();
new Thread(consumer, "消费者B").start();

new Thread(productor, "生产者c").start();
new Thread(consumer, "消费者D").start();
}

}

// 店员类
class Clerk {

//定义商品的数量
private int product = 0; // 当前商品的数量
private int total = 1; // 货架允许存放的商品总数量

//进货的同步方法:提供给生产者线程调用
public synchronized void get() {
while (product > total) {
// 如果商品超出 货架允许存放商品的数量,则提示产品已摆满!
System.out.println("产品已摆满!");
// 已经摆满货物,需要停止生产,设置线程阻塞
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//反之开始生产,唤醒线程执行操作
this.notifyAll();
// 如果还可以摆放产品,则继续摆放,设置产品的数量++,然后打印出来
System.out.println(Thread.currentThread().getName() + ":" + ++product);

}

//卖货的同步方法:提供给消费者线程调用
public synchronized void sale() {
while (product <= 0) {
//如果商品数量小于0,说明缺货了
System.out.println("缺货了");
//已经缺货,需要停止消费,设置线程阻塞
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//有货物可以消费,唤醒线程
this.notifyAll();
//如果商品数量大于0,说明还有商品可以卖(商品的数量--)
System.out.println(Thread.currentThread().getName() + " :" + --product);

}

}

//生产者
class Productor implements Runnable {
//定义员工类对象
private Clerk clerk;

//构造器:接收传递的员工类对象
public Productor(Clerk clerk) {
this.clerk = clerk;
}

//生产者线程
@Override
public void run() {
//循环20次,调用员工收货,增加货物的数量
for (int i = 0; i < 20; i++) {
// 休眠200毫秒
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
//调用员工收货
clerk.get();
}
}
}

//消费者
class Consumer implements Runnable {
//定义员工类对象
private Clerk clerk;

//构造器:接收传递的员工类对象
public Consumer(Clerk clerk) {
this.clerk = clerk;
}

//消费者线程
@Override
public void run() {
//循环20次,调用员工卖货,减少货物的数量
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}

Condition

- Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的
功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的 Object
本中的不同。

- Condition 对象中,与 wait、notify notifyAll 方法对应的分别是 await、signal signalAll。

- Condition 实例实质上被绑定到一个锁上。要为特定 Lock 实例获得 Condition 实例,请使用其 newCondition() 方法。

生产者消费者案例 - 改用同步锁 以及 Condition 通讯

上面我们讲诉了生产者消费者的案例,在这种案例中我们采用的是 synchronized 同步方法来阻止线程安全问题的,如下:

8.Condition 控制线程通信_多线程_19


image-20201103223435108

当然除了 同步方法,我们还可以使用锁 Lock 来阻止线程安全问题。下面我们将代码改写为使用 Lock 的实现方式。

1.该案例中的同步方法,改为 Lock 实现

8.Condition 控制线程通信_java_20


image-20201103225302110

在改完了锁之后,我们可以看到线程通讯依然是使用 ​​this.wait()​​​ 和 ​​this.notifyAll()​​​ ,那么对于 ​​lock​​ 来说,有没有对应的 线程通讯 方法呢?

当然有,就是上面介绍的 Condition 通讯锁。

2.通过lock获取condition

private final ReentrantLock lock = new ReentrantLock(); // 创建锁
private Condition condition = lock.newCondition(); // 通过lock获取condition

8.Condition 控制线程通信_多线程_21


image-20201103225757510

3.使用 condition 来设置线程通讯:在 Condition 对象中,与 wait、notify 和 notifyAll 方法对应的分别是 await、signal 和 signalAll。

8.Condition 控制线程通信_多线程_22


image-20201103225929904

4.修改后的员工类

// 店员类
class Clerk {

//定义商品的数量
private int product = 0; // 当前商品的数量
private int total = 1; // 货架允许存放的商品总数量
private final ReentrantLock lock = new ReentrantLock(); // 创建锁
private Condition condition = lock.newCondition(); // 通过lock获取condition

//进货的同步方法:提供给生产者线程调用
public void get() {
//加锁
lock.lock();
try {
while (product > total) {
// 如果商品超出 货架允许存放商品的数量,则提示产品已摆满!
System.out.println("产品已摆满!");
// 已经摆满货物,需要停止生产,设置线程阻塞
try {
// this.wait();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//反之开始生产,唤醒线程执行操作
// this.notifyAll();
condition.signalAll();
// 如果还可以摆放产品,则继续摆放,设置产品的数量++,然后打印出来
System.out.println(Thread.currentThread().getName() + ":" + ++product);
} finally {
lock.unlock(); // 释放锁
}

}

//卖货的同步方法:提供给消费者线程调用
public void sale() {
lock.lock(); // 加锁
try {
while (product <= 0) {
//如果商品数量小于0,说明缺货了
System.out.println("缺货了");
//已经缺货,需要停止消费,设置线程阻塞
try {
// this.wait();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//有货物可以消费,唤醒线程
// this.notifyAll();
condition.signalAll();
//如果商品数量大于0,说明还有商品可以卖(商品的数量--)
System.out.println(Thread.currentThread().getName() + " :" + --product);
} finally {
lock.unlock(); // 释放锁
}

}

}


举报

相关推荐

0 条评论