使用synchronized-notifyAll-wait实现
public class ProducerConsumerDemo {
// produce和consume都有synchronized修饰,不需要使用volatile保证内存可见性
private int num = 0;
synchronized public void produce() {
while (num != 0) {
try {
// 禁止生产
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 开始生产
++num;
System.out.println(Thread.currentThread().getName() + ": " + num);
// 返回通知其它所有阻塞的生产者线程
this.notifyAll();
}
synchronized public void consume() {
while (num == 0) {
try {
// 禁止消费
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 开始消费
--num;
System.out.println(Thread.currentThread().getName() + ": " + num);
// // 返回通知其它所有阻塞的生产者线程
this.notifyAll();
}
public static void main(String[] args) {
ProducerConsumerDemo cs1 = new ProducerConsumerDemo();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(100);
cs1.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "+++生产者线程AAA").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(100);
cs1.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "+++生产者线程BBB").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(200);
cs1.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "---消费者线程CCC").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(200);
cs1.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "---消费者线程DDD").start();
}
}
使用lock-signalAll-await实现
public class ProducerConsumerDemo {
private int num = 0;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void produce() {
lock.lock();
try {
while (num != 0) {
try {
// 不能生产
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 开始生产
++num;
System.out.println(Thread.currentThread().getName() + ": " + num);
condition.signalAll();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
while (0 == num) {
try {
// 不能消费
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 开始消费
--num;
System.out.println(Thread.currentThread().getName() + ": " + num);
condition.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ProducerConsumerDemo cs2 = new ProducerConsumerDemo();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
cs2.produce();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "+++生产者线程AAA").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
cs2.produce();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "+++生产者线程BBB").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
cs2.consume();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "---消费者线程CCC").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
cs2.consume();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "---消费者线程DDD").start();
}
}
使用阻塞队列实现
public class ProducerConsumerDemo {
private volatile boolean isWork = true;
private BlockingQueue<String> blockingQueue;
private AtomicInteger ai = new AtomicInteger();
public ProducerConsumerDemo(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void stop() {
isWork = false;
}
public void produce() {
String data;
while (isWork) {
data = ai.incrementAndGet() + "";
try {
boolean res = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
String name = Thread.currentThread().getName();
if (res) {
System.out.println(name + ":添加元素:" + data + "成功");
} else {
System.out.println(name + ":添加元素:" + data + "失败");
}
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":停止工作");
}
public void consume() {
String res;
while (isWork) {
try {
res = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (res == null || "".equals(res)) {
isWork = false;
System.out.println(Thread.currentThread().getName() + ":超过两秒未获取数据,即将退出");
return;
}
Thread.sleep(200);
System.out.println(Thread.currentThread().getName() + ":取出元素:" + res + "成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ProducerConsumerDemo cs3 = new ProducerConsumerDemo(new LinkedBlockingQueue<>(3));
new Thread(cs3::produce, "+++生产者线程AAA").start();
new Thread(cs3::produce, "+++生产者线程BBB").start();
new Thread(cs3::consume, "---生产者线程CCC").start();
new Thread(cs3::consume, "---生产者线程DDD").start();
Thread.sleep(8 * 1000);
System.out.println("终止生产者-消费者线程");
cs3.stop();
}
}