第三章 - 共享模型之管程(三)
wait-notify
小故事 - 为什么需要 wait
- 由于条件不满足,小南不能继续进行计算
- 但小南如果一直占用着锁,其它人就得一直阻塞,效率太低

- 于是老王单开了一间休息室(调用 wait 方法),让小南到休息室(WaitSet)等着去了,但这时锁释放开,其它人可以由老王随机安排进屋
- 直到小M将烟送来,大叫一声 [ 你的烟到了 ] (调用 notify 方法)


原理之 wait / notify

- Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
- BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
- BLOCKED 线程会在 Owner 线程释放锁时唤醒
- WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味着立刻获得锁,仍需进入EntryList 重新竞争
API 介绍
- obj.wait( ) 让进入 object 监视器的线程到 waitSet 等待
- obj.notify( ) 在 object 上正在 waitSet 等待的线程中挑一个唤醒
- obj.notifyAll( ) 让 object 上正在 waitSet 等待的所有线程全部唤醒
@Slf4j(topic = "c.Test18")
public class Test18 {
static final Object lock = new Object();
public static void main(String[] args) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

- 因为还没有获得对象的锁,直接调用会报错。
- 应该先获取对象的锁,才能调用wait( )方法
@Slf4j(topic = "c.Test18")
public class Test18 {
static final Object lock = new Object();
public static void main(String[] args) {
synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
notify与notifyAll
@Slf4j(topic = "c.TestWaitNotify")
public class TestWaitNotify {
final static Object obj = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (obj) {
log.debug("执行....");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码....");
}
},"t1").start();
new Thread(() -> {
synchronized (obj) {
log.debug("执行....");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码....");
}
},"t2").start();
sleep(2);
log.debug("唤醒 obj 上其它线程");
synchronized (obj) {
obj.notifyAll();
}
}
}
16:46:22.477 c.TestWaitNotify [t1] - 执行....
16:46:22.480 c.TestWaitNotify [t2] - 执行....
16:46:24.480 c.TestWaitNotify [main] - 唤醒 obj 上其它线程
16:46:24.481 c.TestWaitNotify [t1] - 其它代码....
16:47:23.572 c.TestWaitNotify [t1] - 执行....
16:47:23.574 c.TestWaitNotify [t2] - 执行....
16:47:25.575 c.TestWaitNotify [main] - 唤醒 obj 上其它线程
16:47:25.575 c.TestWaitNotify [t2] - 其它代码....
16:47:25.576 c.TestWaitNotify [t1] - 其它代码....
- wait( ) 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到notify 为止
- wait(long n) 有时限的等待, 到 n 毫秒后结束等待,或是被 notify
@Slf4j(topic = "c.TestWaitNotify")
public class TestWaitNotify {
final static Object obj = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (obj) {
log.debug("执行....");
try {
obj.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码....");
}
},"t1").start();
}
}
16:50:20.502 c.TestWaitNotify [t1] - 执行....
16:50:21.506 c.TestWaitNotify [t1] - 其它代码....
wait notify 的正确姿势
sleep(long n) 和 wait(long n) 的区别
- sleep 是 Thread 的方法,而 wait 是 Object 的方法
- sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
- sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
- 它们状态都是 TIMED_WAITING
@Slf4j(topic = "c.Test19")
public class Test19 {
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (lock) {
log.debug("获得锁");
try {
lock.wait(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
Sleeper.sleep(1);
synchronized (lock) {
log.debug("获得锁");
}
}
}


step 1
@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep1 {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
if (!hasCigarette) {
log.debug("没烟,先歇会!");
sleep(2);
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小南").start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
synchronized (room) {
log.debug("可以开始干活了");
}
}, "其它人").start();
}
sleep(1);
new Thread(() -> {
hasCigarette = true;
log.debug("烟到了噢!");
}, "送烟的").start();
}
}

- 其它干活的线程,都要一直阻塞,效率太低
- 小南线程必须睡足 2s 后才能醒来,就算烟提前送到,也无法立刻醒来
- 加了 synchronized (room) 后,就好比小南在里面反锁了门睡觉,烟根本没法送进门,main 没加synchronized 就好像 main 线程是翻窗户进来的
- 解决方法,使用 wait - notify 机制
sleep(1);
new Thread(() -> {
synchronized (room) {
hasCigarette = true;
log.debug("烟到了噢!");
}
}, "送烟的").start();
}

step 2
@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep2 {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
if (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小南").start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
synchronized (room) {
log.debug("可以开始干活了");
}
}, "其它人").start();
}
sleep(1);
new Thread(() -> {
synchronized (room) {
hasCigarette = true;
log.debug("烟到了噢!");
room.notify();
}
}, "送烟的").start();
}
}

- 解决了其它干活的线程阻塞的问题
- 但如果有其它线程也在等待条件呢?送烟的主线程notify会不会错误地叫醒其他线程呢?
step 3
@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep3 {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
if (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小南").start();
new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("外卖送到没?[{}]", hasTakeout);
if (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小女").start();
sleep(1);
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外卖到了噢!");
room.notify();
}
}, "送外卖的").start();
}
}

- notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】
- 解决方法,改为 notifyAll
step 4
sleep(1);
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外卖到了噢!");
room.notifyAll();
}
}, "送外卖的").start();

- 用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了
- 解决方法,用 while + wait,当条件不成立,再次 wait
step 5
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
if (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小南").start();
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
while (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小南").start();

正确姿势
synchronized(lock) {
while(条件不成立) {
lock.wait();
}
}
synchronized(lock) {
lock.notifyAll();
}
同步模式之保护性暂停
定义
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式

实现
@Slf4j(topic = "c.Test20")
public class Test20 {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
log.debug("等待结果");
List<String> list = (List<String>) guardedObject.get();
log.debug("结果大小:{}", list.size());
}, "t1").start();
new Thread(() -> {
log.debug("执行下载");
try {
List<String> list = Downloader.download();
guardedObject.complete(list);
} catch (IOException e) {
e.printStackTrace();
}
}, "t2").start();
}
}
class GuardedObject {
private Object response;
private final Object lock = new Object();
public Object get() {
synchronized (lock) {
while (response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
this.response = response;
lock.notifyAll();
}
}
}
public class Downloader {
public static List<String> download() throws IOException {
HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
List<String> lines = new ArrayList<>();
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
lines.add(line);
}
}
return lines;
}
}

带超时版 GuardedObject
class GuardedObject1 {
private Object response;
private final Object lock = new Object();
public Object get(long timeout) {
synchronized (lock) {
long begin = System.currentTimeMillis();
long passesTime = 0;
while (response == null) {
long waitTime = timeout - passesTime;
if (waitTime <= 0) {
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passesTime = System.currentTimeMillis() - begin;
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
this.response = response;
lock.notifyAll();
}
}
}
public static void main(String[] args) {
GuardedObject1 guardedObject = new GuardedObject1();
new Thread(() -> {
log.debug("等待结果");
Object response = guardedObject.get(2000);
log.debug("结果大小:{}", response);
}, "t1").start();
new Thread(() -> {
log.debug("执行下载");
try {
Thread.sleep(1000);
guardedObject.complete(new Object());
} catch (Exception e) {
e.printStackTrace();
}
}, "t2").start();
}

new Thread(() -> {
log.debug("执行下载");
try {
Thread.sleep(3000);
guardedObject.complete(new Object());
} catch (Exception e) {
e.printStackTrace();
}
}, "t2").start();

new Thread(() -> {
log.debug("执行下载");
try {
Thread.sleep(1000);
guardedObject.complete(null);
} catch (Exception e) {
e.printStackTrace();
}
}, "t2").start();

原理之 join
t1.join();
synchronized (t1) {
while (t1.isAlive()) {
t1.wait(0);
}
}
join源码
public final void join() throws InterruptedException {
join(0);
}
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
- 从代码中,我们可以发现。当millis==0时,会进入while( isAlive( ) )循环;即只要子线程是活的,主线程就不停的等待。
- wait( )的作用是让“当前线程”等待,而这里的“当前线程”是指当前运行的线程。虽然是调用子线程的wait( )方法,但是它是通过“主线程”去调用的;所以,休眠的是主线程,而不是“子线程”!
- 这样理解: 例子中的Thread t只是一个对象 , isAlive( )判断当前对象(例子中的t对象)是否存活, wait()阻塞的是当前执行的线程(一般是main方法)
- 可以看出,Join方法实现是通过wait( )。 当main线程调用t.join时候,main线程会获得线程对象t的锁(wait 意味着拿到该对象的锁),调用该对象的wait( ),直到该对象唤醒main线程 ,比如退出后。这就意味着main 线程调用t.join时,必须能够拿到线程t对象的锁。
多任务版 GuardedObject
- 图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员
- 如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

class GuardedObject {
private int id;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
private Object response;
private final Object lock = new Object();
public Object get(long timeout) {
synchronized (lock) {
long begin = System.currentTimeMillis();
long passesTime = 0;
while (response == null) {
long waitTime = timeout - passesTime;
if (waitTime <= 0) {
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passesTime = System.currentTimeMillis() - begin;
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
this.response = response;
lock.notifyAll();
}
}
}
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
private static synchronized int generateId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
@Slf4j(topic = "c.People")
class People extends Thread {
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
@Slf4j(topic = "c.Postman")
class Postman extends Thread {
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
@Slf4j(topic = "c.GuardedObjectTest")
public class GuardedObjectTest {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new People().start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}

异步模式之生产者/消费者 (重点)
定义
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应(一个生产一个消费)
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式

实现
@Slf4j(topic = "c.ProductConsumerTest")
public class ProductConsumerTest {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id, "值" + id));
}, "生产者" + i).start();
}
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Message message = queue.take();
}
}, "消费者").start();
}
}
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
private LinkedList<Message> list = new LinkedList<>();
private int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
public Message take() {
synchronized (list) {
while (list.isEmpty()) {
try {
log.debug("队列为空, 消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = list.removeFirst();
log.debug("已消费消息 {}", message);
list.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (list) {
while (list.size() == capcity) {
try {
log.debug("队列已满, 生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addLast(message);
log.debug("已生产消息 {}", message);
list.notifyAll();
}
}
}
final class Message {
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" + "id=" + id + ", value=" + value + '}';
}
}

Park & Unpark
基本使用
LockSupport.park();
LockSupport.unpark(暂停线程对象)
@Slf4j(topic = "c.TestParkUnpark")
public class TestParkUnpark {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);
}
}

@Slf4j(topic = "c.TestParkUnpark")
public class TestParkUnpark {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(2);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);
}
}

特点
- wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
- park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify
原理之 park & unpark
- 线程就像一个旅人,Parker 就像他随身携带的背包,条件变量就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
- 调用 park 就是要看需不需要停下来歇息
- 如果备用干粮耗尽,那么钻进帐篷歇息
- 如果备用干粮充足,那么不需停留,继续前进
- 调用 unpark,就好比令干粮充足
- 如果这时线程还在帐篷,就唤醒让他继续前进
- 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
- 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
先调用park的情况
- 当前线程调用 Unsafe.park( ) 方法
- 检查 _counter, 本情况为
0
, 这时, 获得_mutex 互斥锁
(mutex对象有个等待队列 _cond
) - 线程进入
_cond
条件变量阻塞
- 设置
_counter = 0
(没干粮了)

调用unpark
- 调用
Unsafe.unpark(Thread_0)方法
,设置_counter 为 1
- 唤醒
_cond
条件变量中的 Thread_0 - Thread_0
恢复运行
- 设置
_counter
为 0

- 调用
Unsafe.unpark(Thread_0)
方法,设置 _counter 为 1
- 当前
线程
调用 Unsafe.park()
方法 - 检查
_counter
,本情况为 1
,这时线程 无需阻塞,继续运行
- 设置
_counter
为 0
