0
点赞
收藏
分享

微信扫一扫

Java多线程--使用阻塞队列实现顺序消费--方法/实例


简介

说明

        本文用示例介绍使用阻塞队列来实现顺序消费。

需求

机器要对手机按顺序做如下任务:生产、打包、发货。消费者等待收货。

代码

手机产品

package org.example.a;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Phone {
/**
* 手机的状态:
* PRODUCED: 已生产
* PACKED: 已打包
* DELIVERED: 已发货
* <p>手机的状态只能由PRODUCED->PACKED->DELIVERED转变
*/
public enum Status {
PRODUCED, PACKED, DELIVERED
}

// 默认状态为PRODUCED
private Status status = Status.PRODUCED;

private final int id;

public Phone(int id) {
this.id = id;
}

public void pack() {
status = Status.PACKED;
}

public void deliver() {
status = Status.DELIVERED;
}

public Status getStatus() {
return status;
}

public int getId() {
return id;
}

public String toString() {
return "Phone id: " + id + ", status: " + status;
}
}

队列

import java.util.concurrent.LinkedBlockingQueue;

public class PhoneQueue extends LinkedBlockingQueue<Phone> {
}

任务

生产手机的任务

/**
* 生产手机的任务。
*/
public class Producer implements Runnable {
private PhoneQueue phoneQueue;

private int count = 0;

private Random random = new Random(47);

public Producer(PhoneQueue queue) {
this.phoneQueue = queue;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(300 + random.nextInt(500));
//生产一部手机,这些手机是有序的
Phone phone = new Phone(count++);
System.out.println(phone);
//放到PhoneQueue中
phoneQueue.put(phone);
}
} catch (InterruptedException e) {
System.out.println("Producer interrupted.");
}
System.out.println("Producer off.");
}
}

打包的任务

/**
* 打包的任务
*/
public class Packer implements Runnable {
private PhoneQueue producedQueue;
private PhoneQueue packedQueue;

public Packer(PhoneQueue producedQueue, PhoneQueue packedQueue) {
this.producedQueue = producedQueue;
this.packedQueue = packedQueue;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
//在取得下一个手机之前会一直阻塞
Phone phone = producedQueue.take();
phone.pack();
System.out.println(phone);
packedQueue.put(phone);
}
} catch (InterruptedException e) {
System.out.println("Packer interrupted.");
}
System.out.println("Packer off.");
}
}

发货的任务

/**
* 发货的任务
*/
public class Delivery implements Runnable {
private PhoneQueue butteredQueue;
private PhoneQueue finishedQueue;

public Delivery(PhoneQueue butteredQueue, PhoneQueue finishedQueue) {
this.finishedQueue = finishedQueue;
this.butteredQueue = butteredQueue;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
//在取得下一个手机之前会一直阻塞
Phone phone = butteredQueue.take();
phone.deliver();
System.out.println(phone);
finishedQueue.put(phone);
}
} catch (InterruptedException e) {
System.out.println("Deliverer interrupted.");
}
System.out.println("Deliverer off.");
}
}

消费者(买手机的人)

/**
* 买手机的人,消费者。
*/
public class Consumer implements Runnable {
private PhoneQueue finishedQueue;
private int count = 0;

public Consumer(PhoneQueue finishedQueue) {
this.finishedQueue = finishedQueue;
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
//在取得下一个手机之前会一直阻塞
Phone phone = finishedQueue.take();
//验证取得的手机是有序的,而且状态是DELIVERED的
if (phone.getId() != count++
|| phone.getStatus() != Phone.Status.DELIVERED) {
System.out.println("Error -> " + phone);
System.exit(-1);
} else {
//使用手机
System.out.println(phone + "->Use");
}
}
} catch (InterruptedException e) {
System.out.println("Consumer interrupted.");
}
System.out.println("Consumer off.");
}
}

主类

public class Demo {
public static void main(String[] args) {
PhoneQueue producedQueue = new PhoneQueue();
PhoneQueue packedQueue = new PhoneQueue();
PhoneQueue deliveredQueue = new PhoneQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Producer(producedQueue));
exec.execute(new Packer(producedQueue, packedQueue));
exec.execute(new org.example.a.Delivery(packedQueue, deliveredQueue));
exec.execute(new Consumer(deliveredQueue));
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
exec.shutdownNow();
}
}

 

执行结果

Phone id: 0, status: PRODUCED
Phone id: 0, status: PACKED
Phone id: 0, status: DELIVERED
Phone id: 0, status: DELIVERED->Use
Phone id: 1, status: PRODUCED
Phone id: 1, status: PACKED
Phone id: 1, status: DELIVERED
Phone id: 1, status: DELIVERED->Use
Phone id: 2, status: PRODUCED
Phone id: 2, status: PACKED
Phone id: 2, status: DELIVERED
Phone id: 2, status: DELIVERED->Use
Phone id: 3, status: PRODUCED
Phone id: 3, status: PACKED
Phone id: 3, status: DELIVERED
Phone id: 3, status: DELIVERED->Use
Phone id: 4, status: PRODUCED
Phone id: 4, status: PACKED
Phone id: 4, status: DELIVERED
Phone id: 4, status: DELIVERED->Use
Phone id: 5, status: PRODUCED
Phone id: 5, status: PACKED
Phone id: 5, status: DELIVERED
Phone id: 5, status: DELIVERED->Use
Phone id: 6, status: PRODUCED
Phone id: 6, status: PACKED
Phone id: 6, status: DELIVERED
Phone id: 6, status: DELIVERED->Use
Phone id: 7, status: PRODUCED
Phone id: 7, status: PACKED
Phone id: 7, status: DELIVERED
Phone id: 7, status: DELIVERED->Use
Consumer interrupted.
Packer interrupted.
Producer interrupted.
Producer off.
Deliverer interrupted.
Packer off.
Consumer off.
Deliverer off.

举报

相关推荐

0 条评论