0
点赞
收藏
分享

微信扫一扫

深入学习Java多线程:wait和notify的全面解析

第一部分:wait和notify的基础

1.1 什么是wait和notify?

在Java中,waitnotify(包括notifyAll)是Object类中定义的本地方法,用于实现线程间的通信和协调,基于对象的监视器(Monitor)机制:

  • wait():使当前线程释放当前对象的锁并进入等待状态(WAITINGTIMED_WAITING),直到其他线程调用该对象的notify()notifyAll()方法,或者等待超时(若使用wait(long timeout))。
  • notify():唤醒一个在该对象监视器上等待的线程,使其进入BLOCKED状态,等待重新获取锁。
  • notifyAll():唤醒所有在该对象监视器上等待的线程。

关键特性

  • 必须在同步块或方法中调用waitnotify方法只能在持有对象锁的synchronized块或方法内调用,否则抛出IllegalMonitorStateException
  • 线程状态转换
  • 调用wait():线程释放锁,进入WAITING状态,加入对象的等待集(Wait Set)。
  • 调用notify():从等待集中随机唤醒一个线程。
  • 调用notifyAll():唤醒等待集中的所有线程。
  • 虚假唤醒:线程可能因系统调度或中断被意外唤醒,需在循环中检查条件。

基本示例

public class WaitNotifyExample {
    private final Object lock = new Object();
    private boolean condition = false;

    public void waitForCondition() throws InterruptedException {
        synchronized (lock) {
            while (!condition) {
                lock.wait(); // 释放锁,进入等待
            }
            System.out.println("Condition met, proceeding...");
        }
    }

    public void signalCondition() {
        synchronized (lock) {
            condition = true;
            lock.notify(); // 唤醒一个等待线程
        }
    }

    public static void main(String[] args) {
        WaitNotifyExample example = new WaitNotifyExample();
        Thread waiter = new Thread(() -> {
            try {
                example.waitForCondition();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        Thread signaler = new Thread(example::signalCondition);

        waiter.start();
        try { Thread.sleep(100); } catch (InterruptedException e) {}
        signaler.start();
    }
}

输出示例

Condition met, proceeding...

说明

  • waiter线程等待条件,释放lock并进入WAITING状态。
  • signaler线程设置条件并调用notify(),唤醒waiter

1.2 wait和notify与线程通信

线程通信是指线程间通过共享资源协调行为。waitnotify通过对象监视器实现:

  • 等待/通知模式:一个线程等待条件(wait),其他线程通知条件满足(notifynotifyAll)。
  • 典型场景
  • 生产者-消费者:生产者生产数据后通知消费者,消费者等待数据可用。
  • 线程池:任务队列为空时,工作线程等待新任务。
  • 状态同步:线程等待特定状态(如初始化完成)。

synchronized的关系

  • waitnotify依赖synchronized提供的互斥锁。
  • 锁确保线程安全,waitnotify实现条件同步。

1.3 wait和notify的核心挑战

  • 虚假唤醒:线程可能被意外唤醒,需在while循环中检查条件。
  • 死锁风险:不当使用可能导致线程无限等待。
  • 性能开销:频繁的waitnotify可能导致上下文切换。
  • 复杂性:多线程协调需要精确的状态管理。

1.4 Java对wait和notify的支持

Java通过以下机制支持waitnotify

  • Object类:所有对象继承waitnotifynotifyAll方法。
  • Monitor机制:JVM通过监视器管理锁和等待集。
  • 并发工具:如LockCondition,提供更灵活的替代方案。
  • 诊断工具:JVisualVM、JStack分析线程状态和等待情况。

第二部分:wait和notify的实现与原理

2.1 wait和notify的运行机制

waitnotify基于JVM的监视器锁(Monitor Lock)实现:

  • Monitor结构:每个对象有一个监视器,包含锁计数器、等待集(Wait Set)和进入队列(Entry List)。
  • wait执行过程
  1. 线程持有对象锁(通过synchronized)。
  2. 调用wait(),释放锁,进入等待集,线程状态变为WAITING
  3. 等待notifynotifyAll、超时或中断。
  • notify执行过程
  1. 线程持有对象锁。
  2. 调用notify(),从等待集中随机移出一个线程到进入队列。
  3. 被唤醒线程等待锁释放后重新竞争锁。
  • notifyAll执行过程:将等待集中的所有线程移到进入队列。

状态转换图

[RUNNING] --> [WAITING] (wait)
[WAITING] --> [BLOCKED] (notify/notifyAll)
[BLOCKED] --> [RUNNING] (获取锁)

2.2 底层原理

  • 本地方法waitnotifynotifyAllnative方法,依赖JVM实现。
  • 操作系统支持:通过操作系统条件变量(如pthread_cond_wait)实现等待和通知。
  • 字节码waitnotify在字节码中对应monitorentermonitorexit指令。
  • 锁优化:JVM使用偏向锁、轻量级锁和重量级锁优化性能。

字节码示例(简化):

synchronized (obj) {
    obj.wait();
}

对应字节码

0: monitorenter
1: aload_0
2: invokevirtual #2 // Method java/lang/Object.wait:()V
5: monitorexit

2.3 wait和notify的局限性

  • 单一条件:一个对象只能关联一个等待条件,难以支持复杂条件。
  • 虚假唤醒:需在while循环中检查条件。
  • 不可中断wait()不支持直接响应中断(需使用wait(long timeout))。
  • 低效通知notify()随机唤醒,可能导致不必要的线程竞争。

第三部分:wait和notify的实现方式

3.1 生产者-消费者模型

经典的生产者-消费者模型使用waitnotify实现:

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;
    private final Object lock = new Object();

    public void produce(int value) throws InterruptedException {
        synchronized (lock) {
            while (queue.size() == capacity) {
                lock.wait(); // 队列满,等待
            }
            queue.offer(value);
            System.out.println("Produced: " + value);
            lock.notifyAll(); // 通知消费者
        }
    }

    public int consume() throws InterruptedException {
        synchronized (lock) {
            while (queue.isEmpty()) {
                lock.wait(); // 队列空,等待
            }
            int value = queue.poll();
            System.out.println("Consumed: " + value);
            lock.notifyAll(); // 通知生产者
            return value;
        }
    }

    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    pc.produce(i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    pc.consume();
                    Thread.sleep(150);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

输出示例

Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
...

说明

  • 生产者等待队列不满,消费者等待队列不空,使用notifyAll确保所有线程被唤醒。

3.2 线程池任务等待

waitnotify用于线程池任务调度:

import java.util.LinkedList;
import java.util.Queue;

public class SimpleThreadPool {
    private final Queue<Runnable> tasks = new LinkedList<>();
    private final Object lock = new Object();
    private volatile boolean running = true;

    public void submit(Runnable task) {
        synchronized (lock) {
            tasks.offer(task);
            lock.notify(); // 通知工作线程
        }
    }

    public void startWorker() {
        new Thread(() -> {
            while (running) {
                Runnable task;
                synchronized (lock) {
                    while (tasks.isEmpty() && running) {
                        try {
                            lock.wait(); // 等待任务
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    task = tasks.poll();
                }
                if (task != null) {
                    task.run();
                }
            }
        }).start();
    }

    public void shutdown() {
        synchronized (lock) {
            running = false;
            lock.notifyAll();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SimpleThreadPool pool = new SimpleThreadPool();
        pool.startWorker();
        pool.submit(() -> System.out.println("Task 1 executed"));
        pool.submit(() -> System.out.println("Task 2 executed"));
        Thread.sleep(100);
        pool.shutdown();
    }
}

输出示例

Task 1 executed
Task 2 executed

说明

  • 工作线程等待任务,提交任务后通过notify唤醒。

3.3 wait与notifyAll的对比

  • notify:随机唤醒一个线程,可能导致其他线程饥饿。
  • notifyAll:唤醒所有等待线程,适合多条件场景。

示例(notifyAll优势)

public class NotifyAllExample {
    private final Object lock = new Object();
    private int resource = 0;

    public void producer() throws InterruptedException {
        synchronized (lock) {
            resource++;
            System.out.println("Produced: " + resource);
            lock.notifyAll(); // 唤醒所有消费者
        }
    }

    public void consumer(String name) throws InterruptedException {
        synchronized (lock) {
            while (resource == 0) {
                lock.wait();
            }
            System.out.println(name + " consumed: " + resource);
            resource--;
        }
    }

    public static void main(String[] args) {
        NotifyAllExample example = new NotifyAllExample();
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    example.producer();
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {}
        });
        Thread consumer1 = new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    example.consumer("C1");
                }
            } catch (InterruptedException e) {}
        });
        Thread consumer2 = new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    example.consumer("C2");
                }
            } catch (InterruptedException e) {}
        });

        producer.start();
        consumer1.start();
        consumer2.start();
    }
}

输出示例

Produced: 1
C1 consumed: 1
Produced: 1
C2 consumed: 1
...

说明

  • notifyAll确保两个消费者都能被唤醒,避免饥饿。

3.4 wait和notify与Lock/Condition对比

LockCondition提供更灵活的替代方案:

import java.util.concurrent.locks.*;

public class LockConditionExample {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean ready = false;

    public void waitForCondition() throws InterruptedException {
        lock.lock();
        try {
            while (!ready) {
                condition.await(); // 等待条件
            }
            System.out.println("Condition met");
        } finally {
            lock.unlock();
        }
    }

    public void signalCondition() {
        lock.lock();
        try {
            ready = true;
            condition.signalAll(); // 通知所有线程
        } finally {
            lock.unlock();
        }
    }
}

对比

特性

wait/notify

Lock/Condition

灵活性

单一条件

支持多条件

可中断性

需使用wait(timeout)

支持awaitInterruptibly

锁管理

依赖synchronized

显式锁管理

复杂性

简单

需手动管理锁

第四部分:wait和notify的性能分析

4.1 wait和notify的性能开销

以下测试wait/notifyLock/Condition的性能:

import java.util.concurrent.locks.*;

public class WaitNotifyPerformance {
    private final Object syncLock = new Object();
    private final Lock reentrantLock = new ReentrantLock();
    private final Condition condition = reentrantLock.newCondition();
    private boolean conditionMet = false;

    public void waitNotifyTest() throws InterruptedException {
        synchronized (syncLock) {
            while (!conditionMet) {
                syncLock.wait();
            }
        }
    }

    public void notifyTest() {
        synchronized (syncLock) {
            conditionMet = true;
            syncLock.notifyAll();
        }
    }

    public void lockConditionTest() throws InterruptedException {
        reentrantLock.lock();
        try {
            while (!conditionMet) {
                condition.await();
            }
        } finally {
            reentrantLock.unlock();
        }
    }

    public void signalTest() {
        reentrantLock.lock();
        try {
            conditionMet = true;
            condition.signalAll();
        } finally {
            reentrantLock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        WaitNotifyPerformance test = new WaitNotifyPerformance();
        int iterations = 1000;

        // wait/notify性能
        long startTime = System.nanoTime();
        Thread waiter = new Thread(() -> {
            try {
                for (int i = 0; i < iterations; i++) {
                    test.waitNotifyTest();
                }
            } catch (InterruptedException e) {}
        });
        Thread signaler = new Thread(() -> {
            for (int i = 0; i < iterations; i++) {
                test.notifyTest();
            }
        });
        waiter.start();
        signaler.start();
        waiter.join();
        signaler.join();
        long waitNotifyTime = System.nanoTime() - startTime;

        // Lock/Condition性能
        test.conditionMet = false;
        startTime = System.nanoTime();
        waiter = new Thread(() -> {
            try {
                for (int i = 0; i < iterations; i++) {
                    test.lockConditionTest();
                }
            } catch (InterruptedException e) {}
        });
        signaler = new Thread(() -> {
            for (int i = 0; i < iterations; i++) {
                test.signalTest();
            }
        });
        waiter.start();
        signaler.start();
        waiter.join();
        signaler.join();
        long lockConditionTime = System.nanoTime() - startTime;

        System.out.println("wait/notify Time: " + waitNotifyTime / 1_000_000 + " ms");
        System.out.println("Lock/Condition Time: " + lockConditionTime / 1_000_000 + " ms");
    }
}

输出示例(实际时间因硬件而异):

wait/notify Time: 150 ms
Lock/Condition Time: 170 ms

分析

  • wait/notify性能略优,因其直接基于JVM原生实现。
  • Lock/Condition提供更多功能,但锁管理增加开销。

4.2 不同线程数下的性能

测试多线程场景下的wait/notify性能:

public class MultiThreadWaitNotify {
    private final Object lock = new Object();
    private boolean ready = false;

    public void waitTask() throws InterruptedException {
        synchronized (lock) {
            while (!ready) {
                lock.wait();
            }
        }
    }

    public void notifyTask() {
        synchronized (lock) {
            ready = true;
            lock.notifyAll();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MultiThreadWaitNotify test = new MultiThreadWaitNotify();
        int[] threadCounts = {2, 4, 8};
        long[] times = new long[threadCounts.length];

        for (int i = 0; i < threadCounts.length; i++) {
            int threadCount = threadCounts[i];
            Thread[] waiters = new Thread[threadCount];
            long startTime = System.nanoTime();
            for (int j = 0; j < threadCount; j++) {
                waiters[j] = new Thread(() -> {
                    try {
                        test.waitTask();
                    } catch (InterruptedException e) {}
                });
                waiters[j].start();
            }
            Thread.sleep(100);
            test.notifyTask();
            for (Thread t : waiters) {
                t.join();
            }
            times[i] = System.nanoTime() - startTime;
            System.out.println("Threads: " + threadCount + ", Time: " + times[i] / 1_000_000 + " ms");
        }
    }
}

输出示例

Threads: 2, Time: 120 ms
Threads: 4, Time: 150 ms
Threads: 8, Time: 200 ms

4.3 性能分析图表

以下是不同线程数下wait/notify性能的柱状图:

{
  "type": "bar",
  "data": {
    "labels": ["2 Threads", "4 Threads", "8 Threads"],
    "datasets": [{
      "label": "Execution Time (ms)",
      "data": [120, 150, 200],
      "backgroundColor": ["#FF6384", "#36A2EB", "#FFCE56"],
      "borderColor": ["#FF6384", "#36A2EB", "#FFCE56"],
      "borderWidth": 1
    }]
  },
  "options": {
    "scales": {
      "y": {
        "beginAtZero": true,
        "title": {
          "display": true,
          "text": "Time (ms)"
        }
      },
      "x": {
        "title": {
          "display": true,
          "text": "Number of Threads"
        }
      }
    }
  }
}

分析

  • 线程数增加导致通知和锁竞争开销上升。
  • notifyAll在多线程场景下开销较大,需优化。

第五部分:wait和notify的实际应用

5.1 生产者-消费者系统

在消息队列系统中,waitnotify实现生产者-消费者:

import java.util.LinkedList;
import java.util.Queue;

public class MessageQueue {
    private final Queue<String> messages = new LinkedList<>();
    private final int capacity = 5;
    private final Object lock = new Object();

    public void send(String message) throws InterruptedException {
        synchronized (lock) {
            while (messages.size() == capacity) {
                lock.wait();
            }
            messages.offer(message);
            System.out.println("Sent: " + message);
            lock.notifyAll();
        }
    }

    public String receive() throws InterruptedException {
        synchronized (lock) {
            while (messages.isEmpty()) {
                lock.wait();
            }
            String message = messages.poll();
            System.out.println("Received: " + message);
            lock.notifyAll();
            return message;
        }
    }

    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue();
        Thread sender = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    queue.send("Message " + i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {}
        });
        Thread receiver = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    queue.receive();
                    Thread.sleep(150);
                }
            } catch (InterruptedException e) {}
        });

        sender.start();
        receiver.start();
    }
}

输出示例

Sent: Message 0
Received: Message 0
Sent: Message 1
Received: Message 1
...

说明

  • waitnotifyAll确保消息队列的线程安全操作。

5.2 任务调度系统

waitnotify用于任务调度:

public class TaskScheduler {
    private final Object lock = new Object();
    private boolean taskAvailable = false;

    public void scheduleTask(Runnable task) {
        synchronized (lock) {
            taskAvailable = true;
            lock.notify();
            task.run();
        }
    }

    public void waitForTask() throws InterruptedException {
        synchronized (lock) {
            while (!taskAvailable) {
                lock.wait();
            }
            taskAvailable = false;
        }
    }

    public static void main(String[] args) {
        TaskScheduler scheduler = new TaskScheduler();
        Thread worker = new Thread(() -> {
            try {
                scheduler.waitForTask();
                System.out.println("Task executed");
            } catch (InterruptedException e) {}
        });
        worker.start();
        scheduler.scheduleTask(() -> System.out.println("Running task"));
    }
}

输出示例

Running task
Task executed

说明

  • 工作线程等待任务,调度线程通知执行。

5.3 数据库连接池

waitnotify用于管理数据库连接:

import java.util.LinkedList;
import java.util.Queue;

public class ConnectionPool {
    private final Queue<String> connections = new LinkedList<>();
    private final int maxConnections = 3;
    private final Object lock = new Object();

    public ConnectionPool() {
        connections.offer("Conn1");
        connections.offer("Conn2");
        connections.offer("Conn3");
    }

    public String getConnection() throws InterruptedException {
        synchronized (lock) {
            while (connections.isEmpty()) {
                lock.wait();
            }
            String conn = connections.poll();
            System.out.println("Acquired: " + conn);
            return conn;
        }
    }

    public void releaseConnection(String conn) {
        synchronized (lock) {
            connections.offer(conn);
            System.out.println("Released: " + conn);
            lock.notifyAll();
        }
    }

    public static void main(String[] args) {
        ConnectionPool pool = new ConnectionPool();
        Runnable task = () -> {
            try {
                String conn = pool.getConnection();
                Thread.sleep(100);
                pool.releaseConnection(conn);
            } catch (InterruptedException e) {}
        };
        Thread[] threads = new Thread[5];
        for (int i = 0; i < 5; i++) {
            threads[i] = new Thread(task);
            threads[i].start();
        }
    }
}

输出示例

Acquired: Conn1
Acquired: Conn2
Acquired: Conn3
Released: Conn1
Acquired: Conn1
...

说明

  • 连接池使用waitnotifyAll管理连接的分配和释放。

第六部分:wait和notify的最佳实践

6.1 使用while循环检查条件

避免虚假唤醒:

synchronized (lock) {
    while (!condition) {
        lock.wait();
    }
}

6.2 优先使用notifyAll

notifyAll避免线程饥饿:

synchronized (lock) {
    condition = true;
    lock.notifyAll();
}

6.3 最小化锁持有时间

仅在必要时持有锁:

synchronized (lock) {
    // 关键操作
    lock.notify();
}

6.4 结合中断处理

处理中断异常:

try {
    lock.wait();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

6.5 使用Lock和Condition替代

在复杂场景中,优先使用ReentrantLockCondition

lock.lock();
try {
    while (!condition) {
        condition.await();
    }
} finally {
    lock.unlock();
}

第七部分:总结与展望

7.1 总结

本文全面解析了Java中waitnotify的机制,涵盖原理、实现、性能、应用和最佳实践。waitnotify是线程通信的基础工具,适合简单场景,但在复杂场景中需结合LockCondition

7.2 展望

随着Project Loom的虚拟线程和结构化并发的引入,线程通信机制将更加高效。开发者应关注无锁编程和响应式编程的进展。

举报

相关推荐

0 条评论