0
点赞
收藏
分享

微信扫一扫

第三章 - 共享模型之管程(三)

第三章 - 共享模型之管程(三)

wait-notify

小故事 - 为什么需要 wait

  • 由于条件不满足,小南不能继续进行计算
  • 但小南如果一直占用着锁,其它人就得一直阻塞,效率太低

Untitled

  • 于是老王单开了一间休息室(调用 wait 方法),让小南到休息室(WaitSet)等着去了,但这时锁释放开,其它人可以由老王随机安排进屋
  • 直到小M将烟送来,大叫一声 [ 你的烟到了 ] (调用 notify 方法)

Untitled

  • 小南于是可以离开休息室,重新进入竞争锁的队列

Untitled

原理之 wait / notify

Untitled

  • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
  • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
  • BLOCKED 线程会在 Owner 线程释放锁时唤醒
  • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味着立刻获得锁,仍需进入EntryList 重新竞争

API 介绍

  • obj.wait( ) 让进入 object 监视器的线程到 waitSet 等待
  • obj.notify( ) 在 object 上正在 waitSet 等待的线程中挑一个唤醒
  • obj.notifyAll( ) 让 object 上正在 waitSet 等待的所有线程全部唤醒
  • 直接调用wait( )方法
@Slf4j(topic = "c.Test18")
public class Test18 {
    static final Object lock = new Object();
    public static void main(String[] args) {

            try {
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        
    }
}

Untitled

  • 因为还没有获得对象的锁,直接调用会报错。
  • 应该先获取对象的锁,才能调用wait( )方法
@Slf4j(topic = "c.Test18")
public class Test18 {
    static final Object lock = new Object();
    public static void main(String[] args) {
        synchronized (lock) {
            try {
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

notify与notifyAll

@Slf4j(topic = "c.TestWaitNotify")
public class TestWaitNotify {
    final static Object obj = new Object();

    public static void main(String[] args) {

        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行....");
                try {
                    obj.wait(); // 让线程在obj上一直等待下去
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其它代码....");
            }
        },"t1").start();

        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行....");
                try {
                    obj.wait(); // 让线程在obj上一直等待下去
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其它代码....");
            }
        },"t2").start();

        // 主线程两秒后执行
        sleep(2);
        log.debug("唤醒 obj 上其它线程");
        synchronized (obj) {
//            obj.notify(); // 唤醒obj上一个线程
            obj.notifyAll(); // 唤醒obj上所有等待线程
        }
    }
}
16:46:22.477 c.TestWaitNotify [t1] - 执行....
16:46:22.480 c.TestWaitNotify [t2] - 执行....
16:46:24.480 c.TestWaitNotify [main] - 唤醒 obj 上其它线程
16:46:24.481 c.TestWaitNotify [t1] - 其它代码....
16:47:23.572 c.TestWaitNotify [t1] - 执行....
16:47:23.574 c.TestWaitNotify [t2] - 执行....
16:47:25.575 c.TestWaitNotify [main] - 唤醒 obj 上其它线程
16:47:25.575 c.TestWaitNotify [t2] - 其它代码....
16:47:25.576 c.TestWaitNotify [t1] - 其它代码....
  • wait( ) 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到notify 为止
  • wait(long n) 有时限的等待, 到 n 毫秒后结束等待,或是被 notify
@Slf4j(topic = "c.TestWaitNotify")
public class TestWaitNotify {
    final static Object obj = new Object();

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行....");
                try {
                    //如果1秒钟内没有同一个对象的其他线程唤醒t1,那么1秒钟后t1就继续向下执行
                    obj.wait(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其它代码....");
            }
        },"t1").start();
		}

}
16:50:20.502 c.TestWaitNotify [t1] - 执行....
16:50:21.506 c.TestWaitNotify [t1] - 其它代码....

wait notify 的正确姿势

sleep(long n) 和 wait(long n) 的区别

  • sleep 是 Thread 的方法,而 wait 是 Object 的方法
  • sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
  • sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
  • 它们状态都是 TIMED_WAITING
@Slf4j(topic = "c.Test19")
public class Test19 {

    static final Object lock = new Object(); //建议共享变量都加上final
    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (lock) {
                log.debug("获得锁");
                try {
//                    Thread.sleep(20000);
                    lock.wait(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();

        Sleeper.sleep(1);
        synchronized (lock) {
            log.debug("获得锁");
        }
    }
}

Untitled

Untitled

step 1

@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep1 {
    static final Object room = new Object();
    static boolean hasCigarette = false; // 有没有烟
    static boolean hasTakeout = false;

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("有烟没?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("没烟,先歇会!");
                    sleep(2);
                }
                log.debug("有烟没?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("可以开始干活了");
                }
            }
        }, "小南").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }

        sleep(1);
        new Thread(() -> {
          // 这里能不能加 synchronized (room)?
          hasCigarette = true;
          log.debug("烟到了噢!");
        }, "送烟的").start();
    }

}

Untitled

  • 其它干活的线程,都要一直阻塞,效率太低
  • 小南线程必须睡足 2s 后才能醒来,就算烟提前送到,也无法立刻醒来
  • 加了 synchronized (room) 后,就好比小南在里面反锁了门睡觉,烟根本没法送进门,main 没加synchronized 就好像 main 线程是翻窗户进来的
  • 解决方法,使用 wait - notify 机制
		sleep(1);
    new Thread(() -> {
        // 这里能不能加 synchronized (room)?
        synchronized (room) {
            hasCigarette = true;
            log.debug("烟到了噢!");
        }
    }, "送烟的").start();
}

Untitled

step 2

@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep2 {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("有烟没?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("没烟,先歇会!");
                    try {
                        room.wait(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有烟没?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("可以开始干活了");
                }
            }
        }, "小南").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }

        sleep(1);
        new Thread(() -> {
            synchronized (room) {
                hasCigarette = true;
                log.debug("烟到了噢!");
                room.notify();
            }
        }, "送烟的").start();
    }

}

Untitled

  • 解决了其它干活的线程阻塞的问题
  • 但如果有其它线程也在等待条件呢?送烟的主线程notify会不会错误地叫醒其他线程呢?

step 3

@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep3 {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    // 虚假唤醒
    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("有烟没?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("没烟,先歇会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有烟没?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("可以开始干活了");
                } else {
                    log.debug("没干成活...");
                }
            }
        }, "小南").start();

        new Thread(() -> {
            synchronized (room) {
                Thread thread = Thread.currentThread();
                log.debug("外卖送到没?[{}]", hasTakeout);
                if (!hasTakeout) {
                    log.debug("没外卖,先歇会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("外卖送到没?[{}]", hasTakeout);
                if (hasTakeout) {
                    log.debug("可以开始干活了");
                } else {
                    log.debug("没干成活...");
                }
            }
        }, "小女").start();

        sleep(1);
        new Thread(() -> {
            synchronized (room) {
                hasTakeout = true;
                log.debug("外卖到了噢!");
//                room.notifyAll();
                room.notify();
            }
        }, "送外卖的").start();

    }

}

Untitled

  • notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】
  • 解决方法,改为 notifyAll

step 4

sleep(1);
new Thread(() -> {
    synchronized (room) {
        hasTakeout = true;
        log.debug("外卖到了噢!");
        room.notifyAll();
    }
}, "送外卖的").start();

Untitled

  • 用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了
  • 解决方法,用 while + wait,当条件不成立,再次 wait

step 5

new Thread(() -> {
    synchronized (room) {
        log.debug("有烟没?[{}]", hasCigarette);
        if (!hasCigarette) {
            log.debug("没烟,先歇会!");
            try {
                room.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("有烟没?[{}]", hasCigarette);
        if (hasCigarette) {
            log.debug("可以开始干活了");
        } else {
            log.debug("没干成活...");
        }
    }
}, "小南").start();
  • 改动后
new Thread(() -> {
    synchronized (room) {
        log.debug("有烟没?[{}]", hasCigarette);
        while (!hasCigarette) {
            log.debug("没烟,先歇会!");
            try {
                room.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("有烟没?[{}]", hasCigarette);
        if (hasCigarette) {
            log.debug("可以开始干活了");
        } else {
            log.debug("没干成活...");
        }
    }
}, "小南").start();

Untitled

正确姿势

synchronized(lock) {
  while(条件不成立) {
    lock.wait();
  }
  // 干活
}

//另一个线程
synchronized(lock) {
  lock.notifyAll();
}

同步模式之保护性暂停

定义

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

Untitled

实现

/**
 * @author xiexu
 * @create 2022-01-31 10:38 AM
 */
@Slf4j(topic = "c.Test20")
public class Test20 {

		// t1 等待 t2 的下载结果
    public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            // 等待结果
            log.debug("等待结果");
            List<String> list = (List<String>) guardedObject.get();
            log.debug("结果大小:{}", list.size());
        }, "t1").start();

        new Thread(() -> {
            log.debug("执行下载");
            try {
                List<String> list = Downloader.download();
                //产生结果
                guardedObject.complete(list);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }

}

class GuardedObject {

    // 结果
    private Object response;
    private final Object lock = new Object();

    // 获取结果
    public Object get() {
        synchronized (lock) {
            // 没有结果则一直等待
						// 防止虚假唤醒
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }

    // 产生结果
    public void complete(Object response) {
        synchronized (lock) {
            // 给成员变量赋值
            this.response = response;
            // 产生结果,通知等待线程
            lock.notifyAll();
        }
    }

}
public class Downloader {
    public static List<String> download() throws IOException {
        HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
        List<String> lines = new ArrayList<>();
        try (BufferedReader reader =
                     new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
            String line;
            while ((line = reader.readLine()) != null) {
                lines.add(line);
            }
        }
        return lines;
    }
}

Untitled

带超时版 GuardedObject

// 增加超时效果
class GuardedObject1 {

    // 结果
    private Object response;
    private final Object lock = new Object();

    // 获取结果
    // timeout:表示要等待多久
    public Object get(long timeout) {
        synchronized (lock) {
            // 1、开始时间
            long begin = System.currentTimeMillis();
            // 2、经历的时间
            long passesTime = 0;
						// 没有结果则一直等待
						// 防止虚假唤醒
            while (response == null) {
                // 这一轮循环应该等待的时间(假设 timeout 是 1000,结果在 400 时被唤醒了,那么还有 600 要等)
                long waitTime = timeout - passesTime;
                // 经历的时间超过了最大等待时间时,退出循环
                if (waitTime <= 0) {
                    break;
                }
                try {
										// this.wait(timeout)的问题: 虚假唤醒在15:00:01的时候,此时response还是null, 此时经历时间就为1s,
                    // 进入while循环的时候response还是空,此时判断1s <= timeout 2s,此时再次this.wait(2s)吗,此时已经经历了1s,
                    // 所以只要再等1s就可以了. 所以等待的时间应该是 超时时间(timeout) - 经历的时间(passedTime)
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 3、求得经历时间(如果提前被唤醒,这时已经经历的时间假设为 400)
                passesTime = System.currentTimeMillis() - begin;
            }
            return response;
        }
    }

    // 产生结果
    public void complete(Object response) {
        synchronized (lock) {
            // 给成员变量赋值
            this.response = response;
            // 产生结果,通知等待线程
            lock.notifyAll();
        }
    }

}
		// t1 等待 t2 的下载结果
    public static void main(String[] args) {
        GuardedObject1 guardedObject = new GuardedObject1();
        new Thread(() -> {
            // 等待结果
            log.debug("等待结果");
            Object response = guardedObject.get(2000);
            log.debug("结果大小:{}", response);
        }, "t1").start();

        new Thread(() -> {
            log.debug("执行下载");
            try {
                Thread.sleep(1000);
                //产生结果
                guardedObject.complete(new Object());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }

Untitled

new Thread(() -> {
            log.debug("执行下载");
            try {
                Thread.sleep(3000);
                //产生结果
                guardedObject.complete(new Object());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t2").start();

Untitled

new Thread(() -> {
            log.debug("执行下载");
            try {
                Thread.sleep(1000);
                //产生结果
                guardedObject.complete(null);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t2").start();

Untitled

原理之 join

  • 是调用者轮询检查线程 alive 状态
t1.join();
  • 等价于下面的代码
synchronized (t1) {
  // 调用者线程进入 t1 的 waitSet 等待, 直到 t1 运行结束
  // 此处t1线程对象作为了锁
  while (t1.isAlive()) {
    // 调用线程进了锁t1的waitSet
    // 注意,调用线程不是t1,t1此处是作为锁而不是作为线程
    // 调用线程是其他线程,一般是主线程
    t1.wait(0);
  }
}

join源码

public final void join() throws InterruptedException {
    join(0);
}
public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
						// 和上面的超时增强原理一样
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
  • 从代码中,我们可以发现。当millis==0时,会进入while( isAlive( ) )循环;即只要子线程是活的,主线程就不停的等待。
  • wait( )的作用是让“当前线程”等待,而这里的“当前线程”是指当前运行的线程。虽然是调用子线程的wait( )方法,但是它是通过“主线程”去调用的;所以,休眠的是主线程,而不是“子线程”!
  • 这样理解: 例子中的Thread t只是一个对象 , isAlive( )判断当前对象(例子中的t对象)是否存活, wait()阻塞的是当前执行的线程(一般是main方法)
  • 可以看出,Join方法实现是通过wait( )。 当main线程调用t.join时候,main线程会获得线程对象t的锁(wait 意味着拿到该对象的锁),调用该对象的wait( ),直到该对象唤醒main线程 ,比如退出后。这就意味着main 线程调用t.join时,必须能够拿到线程t对象的锁。

多任务版 GuardedObject

  • 图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员
  • 如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

Untitled

class GuardedObject {

    // 标识 GuardedObject
    private int id;

    public GuardedObject(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    // 结果
    private Object response;
    private final Object lock = new Object();

    // 获取结果
    // timeout:表示要等待多久
    public Object get(long timeout) {
        synchronized (lock) {
            // 1、开始时间
            long begin = System.currentTimeMillis();
            // 2、经历的时间
            long passesTime = 0;
            // 没有结果则一直等待
            // 防止虚假唤醒
            while (response == null) {
                // 这一轮循环应该等待的时间(假设 timeout 是 1000,结果在 400 时被唤醒了,那么还有 600 要等)
                long waitTime = timeout - passesTime;
                // 经历的时间超过了最大等待时间时,退出循环
                if (waitTime <= 0) {
                    break;
                }
                try {
                    // this.wait(timeout)的问题: 虚假唤醒在15:00:01的时候,此时response还是null, 此时经历时间就为1s,
                    // 进入while循环的时候response还是空,此时判断1s <= timeout 2s,此时再次this.wait(2s)吗,此时已经经历了1s,
                    // 所以只要再等1s就可以了. 所以等待的时间应该是 超时时间(timeout) - 经历的时间(passedTime)
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 3、求得经历时间(如果提前被唤醒,这时已经经历的时间假设为 400)
                passesTime = System.currentTimeMillis() - begin;
            }
            return response;
        }
    }
    
    // 产生结果
    public void complete(Object response) {
        synchronized (lock) {
            // 给成员变量赋值
            this.response = response;
            // 产生结果,通知等待线程
            lock.notifyAll();
        }
    }

}
// 信箱类
class Mailboxes {

    // Hashtable 是线程安全的
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();

    private static int id = 1;

    // 生产唯一 id
    private static synchronized int generateId() {
        return id++;
    }

    // HashTable线程安全的,不用加synchronized
    public static GuardedObject getGuardedObject(int id) {
				//根据id获取到box并删除对应的key和value,避免堆内存爆了
        return boxes.remove(id);
    }

    // HashTable线程安全的,不用加synchronized
    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(), go);
        return go;
    }

    // HashTable线程安全的,不用加synchronized
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }

}
@Slf4j(topic = "c.People")
// 居民类
class People extends Thread {

    @Override
    public void run() {
        // 收信
        GuardedObject guardedObject = Mailboxes.createGuardedObject();
        log.debug("开始收信 id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
    }
}
@Slf4j(topic = "c.Postman")
// 邮寄员类
class Postman extends Thread {

    private int id;
    private String mail;

    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }

    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}", id, mail);
        guardedObject.complete(mail);
    }
}
/**
 * @author xiexu
 * @create 2022-01-31 10:38 AM
 */
@Slf4j(topic = "c.GuardedObjectTest")
public class GuardedObjectTest {

    public static void main(String[] args) {
        // 创建3个居民
        for (int i = 0; i < 3; i++) {
            new People().start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id, "内容" + id).start();
        }
    }

}

Untitled

异步模式之生产者/消费者 (重点)

定义

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应(一个生产一个消费)
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

Untitled

实现

/**
 * @author xiexu
 * @create 2022-01-31 12:36 PM
 */
@Slf4j(topic = "c.ProductConsumerTest")
public class ProductConsumerTest {

    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue(2);
        for (int i = 0; i < 3; i++) {
            int id = i;
            new Thread(() -> {
                queue.put(new Message(id, "值" + id));
            }, "生产者" + i).start();
        }

        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Message message = queue.take();
            }
        }, "消费者").start();
    }

}

// 消息队列类,在java线程之间通信
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
    // 消息的队列集合
    private LinkedList<Message> list = new LinkedList<>();
    // 队列的容量
    private int capcity;

    public MessageQueue(int capcity) {
        this.capcity = capcity;
    }

    // 获取消息
    public Message take() {
        // 检查队列是否为空
        synchronized (list) {
            while (list.isEmpty()) {
                try {
                    log.debug("队列为空, 消费者线程等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 从队列头部获取消息并返回
            Message message = list.removeFirst();
            log.debug("已消费消息 {}", message);
            list.notifyAll();
            return message;
        }
    }

    // 存入消息
    public void put(Message message) {
        synchronized (list) {
            // 检查队列是否已满
            while (list.size() == capcity) {
                try {
                    log.debug("队列已满, 生产者线程等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 将消息加入队列尾部
            list.addLast(message);
            log.debug("已生产消息 {}", message);
            list.notifyAll();
        }
    }

}

final class Message {
    private int id;
    private Object value;

    public Message(int id, Object value) {
        this.id = id;
        this.value = value;
    }

    public int getId() {
        return id;
    }

    public Object getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Message{" + "id=" + id + ", value=" + value + '}';
    }
}

Untitled

Park & Unpark

基本使用

  • 它们是 LockSupport 类中的方法
// 暂停当前线程
LockSupport.park(); 
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)
@Slf4j(topic = "c.TestParkUnpark")
public class TestParkUnpark {

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            log.debug("start...");
            sleep(1);
            log.debug("park...");
            LockSupport.park();
            log.debug("resume...");
        }, "t1");
        t1.start();

        sleep(2);
        log.debug("unpark...");
        LockSupport.unpark(t1);
    }

}

Untitled

@Slf4j(topic = "c.TestParkUnpark")
public class TestParkUnpark {

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            log.debug("start...");
            sleep(2); // 2秒后再调用park
            log.debug("park...");
            LockSupport.park();
            log.debug("resume...");
        }, "t1");
        t1.start();

        sleep(1); // 1秒后调用unpark
        log.debug("unpark...");
        LockSupport.unpark(t1);
    }

}

Untitled

特点

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

原理之 park & unpark

  • 线程就像一个旅人,Parker 就像他随身携带的背包,条件变量就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
  • 调用 park 就是要看需不需要停下来歇息
    • 如果备用干粮耗尽,那么钻进帐篷歇息
    • 如果备用干粮充足,那么不需停留,继续前进
  • 调用 unpark,就好比令干粮充足
    • 如果这时线程还在帐篷,就唤醒让他继续前进
    • 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
    • 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
  • 先调用park的情况
    • 当前线程调用 Unsafe.park( ) 方法
    • 检查 _counter, 本情况为0, 这时, 获得_mutex 互斥锁(mutex对象有个等待队列 _cond)
    • 线程进入 _cond 条件变量阻塞
    • 设置_counter = 0 (没干粮了)

Untitled

  • 调用unpark
    • 调用Unsafe.unpark(Thread_0)方法,设置_counter 为 1
    • 唤醒 _cond 条件变量中的 Thread_0
    • Thread_0 恢复运行
    • 设置 _counter 为 0

Untitled

  • 调用 Unsafe.unpark(Thread_0)方法,设置 _counter 为 1
  • 当前线程调用 Unsafe.park() 方法
  • 检查 _counter,本情况为 1,这时线程 无需阻塞,继续运行
  • 设置 _counter 为 0

Untitled

举报

相关推荐

0 条评论