实质上,很多后台服务程序并发控制的基本原理都可以归纳为生产者/消费者模式,而这是恰恰是在本科操作系统课堂上老师反复讲解,而我们却视而不见不以为然的。在博文《一种面向作业流(工作流)的轻量级可复用的异步流水开发框架的设计与实现》中将介绍一种生产者/消费者模式的具体应用。
生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。解决生产者/消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步;(2)在生产者和消费者之间建立一个管道。第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。因此本文只介绍同步机制实现的生产者/消费者问题。
同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。Java语言在多线程编程上实现了完全对象化,提供了对同步机制的良好支持。在Java中一共有四种方法支持同步,其中前三个是同步方法,一个是管道方法。
(1)wait() / notify()方法
(2)await() / signal()方法
(3)BlockingQueue阻塞队列方法
(4)PipedInputStream / PipedOutputStream
本文只介绍最常用的前三种,第四种暂不做讨论,有兴趣的读者可以自己去网上找答案。
一、wait() / notify()方法
wait() / nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。
notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
光看文字可能不太好理解,咱来段代码就明白了:
1. import java.util.LinkedList;
2.
3. /**
4. * 仓库类Storage实现缓冲区
5. *
6. *
7. * @author MONKEY.D.MENG 2011-03-15
8. *
9. */
10. public class Storage
11. {
12. // 仓库最大存储量
13. private final int MAX_SIZE = 100;
14.
15. // 仓库存储的载体
16. private LinkedList<Object> list = new LinkedList<Object>();
17.
18. // 生产num个产品
19. public void produce(int num)
20. {
21. // 同步代码段
22. synchronized (list)
23. {
24. // 如果仓库剩余容量不足
25. while (list.size() + num > MAX_SIZE)
26. {
27. "【要生产的产品数量】:" + num + "/t【库存量】:"
28. "/t暂时不能执行生产任务!");
29. try
30. {
31. // 由于条件不满足,生产阻塞
32. list.wait();
33. }
34. catch (InterruptedException e)
35. {
36. e.printStackTrace();
37. }
38. }
39.
40. // 生产条件满足情况下,生产num个产品
41. for (int i = 1; i <= num; ++i)
42. {
43. new Object());
44. }
45.
46. "【已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());
47.
48. list.notifyAll();
49. }
50. }
51.
52. // 消费num个产品
53. public void consume(int num)
54. {
55. // 同步代码段
56. synchronized (list)
57. {
58. // 如果仓库存储量不足
59. while (list.size() < num)
60. {
61. "【要消费的产品数量】:" + num + "/t【库存量】:"
62. "/t暂时不能执行生产任务!");
63. try
64. {
65. // 由于条件不满足,消费阻塞
66. list.wait();
67. }
68. catch (InterruptedException e)
69. {
70. e.printStackTrace();
71. }
72. }
73.
74. // 消费条件满足情况下,消费num个产品
75. for (int i = 1; i <= num; ++i)
76. {
77. list.remove();
78. }
79.
80. "【已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());
81.
82. list.notifyAll();
83. }
84. }
85.
86. // get/set方法
87. public LinkedList<Object> getList()
88. {
89. return list;
90. }
91.
92. public void setList(LinkedList<Object> list)
93. {
94. this.list = list;
95. }
96.
97. public int getMAX_SIZE()
98. {
99. return MAX_SIZE;
100. }
101. }
102. /**
103. * 生产者类Producer继承线程类Thread
104. *
105. * Email:530025983@qq.com
106. *
107. * @author MONKEY.D.MENG 2011-03-15
108. *
109. */
110. public class Producer extends Thread
111. {
112. // 每次生产的产品数量
113. private int num;
114.
115. // 所在放置的仓库
116. private Storage storage;
117.
118. // 构造函数,设置仓库
119. public Producer(Storage storage)
120. {
121. this.storage = storage;
122. }
123.
124. // 线程run函数
125. public void run()
126. {
127. produce(num);
128. }
129.
130. // 调用仓库Storage的生产函数
131. public void produce(int num)
132. {
133. storage.produce(num);
134. }
135.
136. // get/set方法
137. public int getNum()
138. {
139. return num;
140. }
141.
142. public void setNum(int num)
143. {
144. this.num = num;
145. }
146.
147. public Storage getStorage()
148. {
149. return storage;
150. }
151.
152. public void setStorage(Storage storage)
153. {
154. this.storage = storage;
155. }
156. }
157. /**
158. * 消费者类Consumer继承线程类Thread
159. *
160. * Email:530025983@qq.com
161. *
162. * @author MONKEY.D.MENG 2011-03-15
163. *
164. */
165. public class Consumer extends Thread
166. {
167. // 每次消费的产品数量
168. private int num;
169.
170. // 所在放置的仓库
171. private Storage storage;
172.
173. // 构造函数,设置仓库
174. public Consumer(Storage storage)
175. {
176. this.storage = storage;
177. }
178.
179. // 线程run函数
180. public void run()
181. {
182. consume(num);
183. }
184.
185. // 调用仓库Storage的生产函数
186. public void consume(int num)
187. {
188. storage.consume(num);
189. }
190.
191. // get/set方法
192. public int getNum()
193. {
194. return num;
195. }
196.
197. public void setNum(int num)
198. {
199. this.num = num;
200. }
201.
202. public Storage getStorage()
203. {
204. return storage;
205. }
206.
207. public void setStorage(Storage storage)
208. {
209. this.storage = storage;
210. }
211. }
212. /**
213. * 测试类Test
214. *
215. * Email:530025983@qq.com
216. *
217. * @author MONKEY.D.MENG 2011-03-15
218. *
219. */
220. public class Test
221. {
222. public static void main(String[] args)
223. {
224. // 仓库对象
225. new Storage();
226.
227. // 生产者对象
228. new Producer(storage);
229. new Producer(storage);
230. new Producer(storage);
231. new Producer(storage);
232. new Producer(storage);
233. new Producer(storage);
234. new Producer(storage);
235.
236. // 消费者对象
237. new Consumer(storage);
238. new Consumer(storage);
239. new Consumer(storage);
240.
241. // 设置生产者产品生产数量
242. 10);
243. 10);
244. 10);
245. 10);
246. 10);
247. 10);
248. 80);
249.
250. // 设置消费者产品消费数量
251. 50);
252. 20);
253. 30);
254.
255. // 线程开始执行
256. c1.start();
257. c2.start();
258. c3.start();
259. p1.start();
260. p2.start();
261. p3.start();
262. p4.start();
263. p5.start();
264. p6.start();
265. p7.start();
266. }
267. }
268. 【要消费的产品数量】:50 【库存量】:0 暂时不能执行生产任务!
269. 【要消费的产品数量】:30 【库存量】:0 暂时不能执行生产任务!
270. 【要消费的产品数量】:20 【库存量】:0 暂时不能执行生产任务!
271. 【已经生产产品数】:10 【现仓储量为】:10
272. 【要消费的产品数量】:20 【库存量】:10 暂时不能执行生产任务!
273. 【要消费的产品数量】:30 【库存量】:10 暂时不能执行生产任务!
274. 【要消费的产品数量】:50 【库存量】:10 暂时不能执行生产任务!
275. 【已经生产产品数】:10 【现仓储量为】:20
276. 【要消费的产品数量】:50 【库存量】:20 暂时不能执行生产任务!
277. 【要消费的产品数量】:30 【库存量】:20 暂时不能执行生产任务!
278. 【已经消费产品数】:20 【现仓储量为】:0
279. 【已经生产产品数】:10 【现仓储量为】:10
280. 【已经生产产品数】:10 【现仓储量为】:20
281. 【已经生产产品数】:80 【现仓储量为】:100
282. 【要生产的产品数量】:10 【库存量】:100 暂时不能执行生产任务!
283. 【已经消费产品数】:30 【现仓储量为】:70
284. 【已经消费产品数】:50 【现仓储量为】:20
285. 【已经生产产品数】:10 【现仓储量为】:30
286. 【已经生产产品数】:10 【现仓储量为】:40
看完上述代码,对wait() / notify()方法实现的同步有了了解。你可能会对Storage类中为什么要定义public void produce(int num);和public void consume(int num);方法感到不解,为什么不直接在生产者类Producer和消费者类Consumer中实现这两个方法,却要调用Storage类中的实现呢?淡定,后文会有解释。我们先往下走。
二、await() / signal()方法
在JDK5.0之后,Java提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。await()和signal()就是其中用来做同步的两种方法,它们的功能基本上和wait() / nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。下面来看代码:
1. import java.util.LinkedList;
2. import java.util.concurrent.locks.Condition;
3. import java.util.concurrent.locks.Lock;
4. import java.util.concurrent.locks.ReentrantLock;
5.
6. /**
7. * 仓库类Storage实现缓冲区
8. *
9. *
10. * @author MONKEY.D.MENG 2011-03-15
11. *
12. */
13. public class Storage
14. {
15. // 仓库最大存储量
16. private final int MAX_SIZE = 100;
17.
18. // 仓库存储的载体
19. private LinkedList<Object> list = new LinkedList<Object>();
20.
21. // 锁
22. private final Lock lock = new ReentrantLock();
23.
24. // 仓库满的条件变量
25. private final Condition full = lock.newCondition();
26.
27. // 仓库空的条件变量
28. private final Condition empty = lock.newCondition();
29.
30. // 生产num个产品
31. public void produce(int num)
32. {
33. // 获得锁
34. lock.lock();
35.
36. // 如果仓库剩余容量不足
37. while (list.size() + num > MAX_SIZE)
38. {
39. "【要生产的产品数量】:" + num + "/t【库存量】:" + list.size()
40. "/t暂时不能执行生产任务!");
41. try
42. {
43. // 由于条件不满足,生产阻塞
44. full.await();
45. }
46. catch (InterruptedException e)
47. {
48. e.printStackTrace();
49. }
50. }
51.
52. // 生产条件满足情况下,生产num个产品
53. for (int i = 1; i <= num; ++i)
54. {
55. new Object());
56. }
57.
58. "【已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());
59.
60. // 唤醒其他所有线程
61. full.signalAll();
62. empty.signalAll();
63.
64. // 释放锁
65. lock.unlock();
66. }
67.
68. // 消费num个产品
69. public void consume(int num)
70. {
71. // 获得锁
72. lock.lock();
73.
74. // 如果仓库存储量不足
75. while (list.size() < num)
76. {
77. "【要消费的产品数量】:" + num + "/t【库存量】:" + list.size()
78. "/t暂时不能执行生产任务!");
79. try
80. {
81. // 由于条件不满足,消费阻塞
82. empty.await();
83. }
84. catch (InterruptedException e)
85. {
86. e.printStackTrace();
87. }
88. }
89.
90. // 消费条件满足情况下,消费num个产品
91. for (int i = 1; i <= num; ++i)
92. {
93. list.remove();
94. }
95.
96. "【已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());
97.
98. // 唤醒其他所有线程
99. full.signalAll();
100. empty.signalAll();
101.
102. // 释放锁
103. lock.unlock();
104. }
105.
106. // set/get方法
107. public int getMAX_SIZE()
108. {
109. return MAX_SIZE;
110. }
111.
112. public LinkedList<Object> getList()
113. {
114. return list;
115. }
116.
117. public void setList(LinkedList<Object> list)
118. {
119. this.list = list;
120. }
121. }
122. 【要消费的产品数量】:50 【库存量】:0 暂时不能执行生产任务!
123. 【要消费的产品数量】:30 【库存量】:0 暂时不能执行生产任务!
124. 【已经生产产品数】:10 【现仓储量为】:10
125. 【已经生产产品数】:10 【现仓储量为】:20
126. 【要消费的产品数量】:50 【库存量】:20 暂时不能执行生产任务!
127. 【要消费的产品数量】:30 【库存量】:20 暂时不能执行生产任务!
128. 【已经生产产品数】:10 【现仓储量为】:30
129. 【要消费的产品数量】:50 【库存量】:30 暂时不能执行生产任务!
130. 【已经消费产品数】:20 【现仓储量为】:10
131. 【已经生产产品数】:10 【现仓储量为】:20
132. 【要消费的产品数量】:30 【库存量】:20 暂时不能执行生产任务!
133. 【已经生产产品数】:80 【现仓储量为】:100
134. 【要生产的产品数量】:10 【库存量】:100 暂时不能执行生产任务!
135. 【已经消费产品数】:50 【现仓储量为】:50
136. 【已经生产产品数】:10 【现仓储量为】:60
137. 【已经消费产品数】:30 【现仓储量为】:30
138. 【已经生产产品数】:10 【现仓储量为】:40
只需要更新仓库类Storage的代码即可,生产者Producer、消费者Consumer、测试类Test的代码均不需要进行任何更改。这样我们就知道为神马我要在Storage类中定义public void produce(int num);和public void consume(int num);方法,并在生产者类Producer和消费者类Consumer中调用Storage类中的实现了吧。将可能发生的变化集中到一个类中,不影响原有的构架设计,同时无需修改其他业务层代码。无意之中,我们好像使用了某种设计模式,具体是啥我忘记了,啊哈哈,等我想起来再告诉大家~
三、BlockingQueue阻塞队列方法
BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。
put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
关于BlockingQueue的内容网上有很多,大家可以自己搜,我在这不多介绍。下面直接看代码,跟以往一样,我们只需要更改仓库类Storage的代码即可:
1. import java.util.concurrent.LinkedBlockingQueue;
2.
3. /**
4. * 仓库类Storage实现缓冲区
5. *
6. *
7. * @author MONKEY.D.MENG 2011-03-15
8. *
9. */
10. public class Storage
11. {
12. // 仓库最大存储量
13. private final int MAX_SIZE = 100;
14.
15. // 仓库存储的载体
16. private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(
17. 100);
18.
19. // 生产num个产品
20. public void produce(int num)
21. {
22. // 如果仓库剩余容量为0
23. if (list.size() == MAX_SIZE)
24. {
25. "【库存量】:" + MAX_SIZE + "/t暂时不能执行生产任务!");
26. }
27.
28. // 生产条件满足情况下,生产num个产品
29. for (int i = 1; i <= num; ++i)
30. {
31. try
32. {
33. // 放入产品,自动阻塞
34. new Object());
35. }
36. catch (InterruptedException e)
37. {
38. e.printStackTrace();
39. }
40.
41. "【现仓储量为】:" + list.size());
42. }
43. }
44.
45. // 消费num个产品
46. public void consume(int num)
47. {
48. // 如果仓库存储量不足
49. if (list.size() == 0)
50. {
51. "【库存量】:0/t暂时不能执行生产任务!");
52. }
53.
54. // 消费条件满足情况下,消费num个产品
55. for (int i = 1; i <= num; ++i)
56. {
57. try
58. {
59. // 消费产品,自动阻塞
60. list.take();
61. }
62. catch (InterruptedException e)
63. {
64. e.printStackTrace();
65. }
66. }
67.
68. "【现仓储量为】:" + list.size());
69. }
70.
71. // set/get方法
72. public LinkedBlockingQueue<Object> getList()
73. {
74. return list;
75. }
76.
77. public void setList(LinkedBlockingQueue<Object> list)
78. {
79. this.list = list;
80. }
81.
82. public int getMAX_SIZE()
83. {
84. return MAX_SIZE;
85. }
86. }
87. 【库存量】:0 暂时不能执行生产任务!
88. 【库存量】:0 暂时不能执行生产任务!
89. 【现仓储量为】:1
90. 【现仓储量为】:1
91. 【现仓储量为】:3
92. 【现仓储量为】:4
93. 【现仓储量为】:5
94. 【现仓储量为】:6
95. 【现仓储量为】:7
96. 【现仓储量为】:8
97. 【现仓储量为】:9
98. 【现仓储量为】:10
99. 【现仓储量为】:11
100. 【现仓储量为】:1
101. 【现仓储量为】:2
102. 【现仓储量为】:13
103. 【现仓储量为】:14
104. 【现仓储量为】:17
105. 【现仓储量为】:19
106. 【现仓储量为】:20
107. 【现仓储量为】:21
108. 【现仓储量为】:22
109. 【现仓储量为】:23
110. 【现仓储量为】:24
111. 【现仓储量为】:25
112. 【现仓储量为】:26
113. 【现仓储量为】:12
114. 【现仓储量为】:1
115. 【现仓储量为】:1
116. 【现仓储量为】:2
117. 【现仓储量为】:3
118. 【现仓储量为】:4
119. 【现仓储量为】:5
120. 【现仓储量为】:6
121. 【现仓储量为】:7
122. 【现仓储量为】:27
123. 【现仓储量为】:8
124. 【现仓储量为】:6
125. 【现仓储量为】:18
126. 【现仓储量为】:2
127. 【现仓储量为】:3
128. 【现仓储量为】:4
129. 【现仓储量为】:5
130. 【现仓储量为】:6
131. 【现仓储量为】:7
132. 【现仓储量为】:8
133. 【现仓储量为】:9
134. 【现仓储量为】:10
135. 【现仓储量为】:16
136. 【现仓储量为】:11
137. 【现仓储量为】:12
138. 【现仓储量为】:13
139. 【现仓储量为】:14
140. 【现仓储量为】:15
141. 【现仓储量为】:1
142. 【现仓储量为】:2
143. 【现仓储量为】:3
144. 【现仓储量为】:3
145. 【现仓储量为】:15
146. 【现仓储量为】:1
147. 【现仓储量为】:0
148. 【现仓储量为】:1
149. 【现仓储量为】:1
150. 【现仓储量为】:1
151. 【现仓储量为】:2
152. 【现仓储量为】:3
153. 【现仓储量为】:4
154. 【现仓储量为】:0
155. 【现仓储量为】:1
156. 【现仓储量为】:5
157. 【现仓储量为】:6
158. 【现仓储量为】:7
159. 【现仓储量为】:8
160. 【现仓储量为】:9
161. 【现仓储量为】:10
162. 【现仓储量为】:11
163. 【现仓储量为】:12
164. 【现仓储量为】:13
165. 【现仓储量为】:14
166. 【现仓储量为】:15
167. 【现仓储量为】:16
168. 【现仓储量为】:17
169. 【现仓储量为】:1
170. 【现仓储量为】:1
171. 【现仓储量为】:2
172. 【现仓储量为】:3
173. 【现仓储量为】:4
174. 【现仓储量为】:5
175. 【现仓储量为】:6
176. 【现仓储量为】:3
177. 【现仓储量为】:3
178. 【现仓储量为】:1
179. 【现仓储量为】:2
180. 【现仓储量为】:3
181. 【现仓储量为】:4
182. 【现仓储量为】:5
183. 【现仓储量为】:6
184. 【现仓储量为】:7
185. 【现仓储量为】:8
186. 【现仓储量为】:9
187. 【现仓储量为】:10
188. 【现仓储量为】:11
189. 【现仓储量为】:12
190. 【现仓储量为】:13
191. 【现仓储量为】:14
192. 【现仓储量为】:15
193. 【现仓储量为】:16
194. 【现仓储量为】:17
195. 【现仓储量为】:18
196. 【现仓储量为】:19
197. 【现仓储量为】:6
198. 【现仓储量为】:7
199. 【现仓储量为】:8
200. 【现仓储量为】:9
201. 【现仓储量为】:10
202. 【现仓储量为】:11
203. 【现仓储量为】:12
204. 【现仓储量为】:13
205. 【现仓储量为】:14
206. 【现仓储量为】:15
207. 【现仓储量为】:16
208. 【现仓储量为】:17
209. 【现仓储量为】:18
210. 【现仓储量为】:19
211. 【现仓储量为】:20
212. 【现仓储量为】:21
213. 【现仓储量为】:22
214. 【现仓储量为】:23
215. 【现仓储量为】:24
216. 【现仓储量为】:25
217. 【现仓储量为】:26
218. 【现仓储量为】:27
219. 【现仓储量为】:28
220. 【现仓储量为】:29
221. 【现仓储量为】:30
222. 【现仓储量为】:31
223. 【现仓储量为】:32
224. 【现仓储量为】:33
225. 【现仓储量为】:34
226. 【现仓储量为】:35
227. 【现仓储量为】:36
228. 【现仓储量为】:37
229. 【现仓储量为】:38
230. 【现仓储量为】:39
231. 【现仓储量为】:40
当然,你会发现这时对于public void produce(int num);和public void consume(int num);方法业务逻辑上的实现跟前面两个例子不太一样,没关系,这个例子只是为了说明BlockingQueue阻塞队列的使用。
有时使用BlockingQueue可能会出现put()和System.out.println()输出不匹配的情况,这是由于它们之间没有同步造成的。当缓冲区已满,生产者在put()操作时,put()内部调用了await()方法,放弃了线程的执行,然后消费者线程执行,调用take()方法,take()内部调用了signal()方法,通知生产者线程可以执行,致使在消费者的println()还没运行的情况下生产者的println()先被执行,所以有了输出不匹配的情况。
对于BlockingQueue大家可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。