0
点赞
收藏
分享

微信扫一扫

生产者消费者模式

Java架构领域 2021-10-15 阅读 51
技术分享
使用synchronized-notifyAll-wait实现
public class ProducerConsumerDemo {
    // produce和consume都有synchronized修饰,不需要使用volatile保证内存可见性
    private int num = 0;

    synchronized public void produce() {
        while (num != 0) {
            try {
                // 禁止生产
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 开始生产
        ++num;
        System.out.println(Thread.currentThread().getName() + ": " + num);
        // 返回通知其它所有阻塞的生产者线程
        this.notifyAll();
    }

    synchronized public void consume() {
        while (num == 0) {
            try {
                // 禁止消费
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 开始消费
        --num;
        System.out.println(Thread.currentThread().getName() + ": " + num);
        // // 返回通知其它所有阻塞的生产者线程
        this.notifyAll();
    }

    public static void main(String[] args) {
        ProducerConsumerDemo cs1 = new ProducerConsumerDemo();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(100);
                    cs1.produce();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "+++生产者线程AAA").start();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(100);
                    cs1.produce();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "+++生产者线程BBB").start();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(200);
                    cs1.consume();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "---消费者线程CCC").start();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(200);
                    cs1.consume();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "---消费者线程DDD").start();
    }
}
使用lock-signalAll-await实现
public class ProducerConsumerDemo {
    private int num = 0;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void produce() {
        lock.lock();
        try {
            while (num != 0) {
                try {
                    // 不能生产
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 开始生产
            ++num;
            System.out.println(Thread.currentThread().getName() + ": " + num);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void consume() {
        lock.lock();
        try {
            while (0 == num) {
                try {
                    // 不能消费
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 开始消费
            --num;
            System.out.println(Thread.currentThread().getName() + ": " + num);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ProducerConsumerDemo cs2 = new ProducerConsumerDemo();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                cs2.produce();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "+++生产者线程AAA").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                cs2.produce();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "+++生产者线程BBB").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                cs2.consume();
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "---消费者线程CCC").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                cs2.consume();
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "---消费者线程DDD").start();
    }
}
使用阻塞队列实现
public class ProducerConsumerDemo {
    private volatile boolean isWork = true;
    private BlockingQueue<String> blockingQueue;
    private AtomicInteger ai = new AtomicInteger();
    public ProducerConsumerDemo(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }
    public void stop() {
        isWork = false;
    }
    public void produce() {
        String data;
        while (isWork) {
            data = ai.incrementAndGet() + "";
            try {
                boolean res = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
                String name = Thread.currentThread().getName();
                if (res) {
                    System.out.println(name + ":添加元素:" + data + "成功");
                } else {
                    System.out.println(name + ":添加元素:" + data + "失败");
                }
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + ":停止工作");
    }

    public void consume() {
        String res;
        while (isWork) {
            try {
                res = blockingQueue.poll(2L, TimeUnit.SECONDS);
                if (res == null || "".equals(res)) {
                    isWork = false;
                    System.out.println(Thread.currentThread().getName() + ":超过两秒未获取数据,即将退出");
                    return;
                }
                Thread.sleep(200);
                System.out.println(Thread.currentThread().getName() + ":取出元素:" + res + "成功");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ProducerConsumerDemo cs3 = new ProducerConsumerDemo(new LinkedBlockingQueue<>(3));
        new Thread(cs3::produce, "+++生产者线程AAA").start();
        new Thread(cs3::produce, "+++生产者线程BBB").start();
        new Thread(cs3::consume, "---生产者线程CCC").start();
        new Thread(cs3::consume, "---生产者线程DDD").start();
        Thread.sleep(8 * 1000);
        System.out.println("终止生产者-消费者线程");
        cs3.stop();
    }
}
举报

相关推荐

0 条评论