0
点赞
收藏
分享

微信扫一扫

Java并发编程: 使用CountDownLatch协调子线程

言午栩 2023-05-29 阅读 70


1. CountDownLatch工具类介绍 

CountDownLatch是一个同步工具类,它允许一个或多个线程处于等待状态直到在其它线程中运行的一组操作完成为止。CountDownLatch用一个给定的计数来实现初始化。Await方法会一直处于阻塞状态,直到countDown方法调用而使当前计数达到零。当计数为零之后,所有处于等待的线程将被释放,await的任何后续调用将立即返回。这种现象只出现一次,计数是不能被重置的。如果你需要一个可以重置计数的版本,需要考虑使用CyclicBarrie. 

上面的介绍来自于CountDownLatch类的注释。 



1. /**
2.  * A synchronization aid that allows one or more threads to wait until
3.  * a set of operations being performed in other threads completes.
4.  *
5.  * <p>A {@code CountDownLatch} is initialized with a given [i]count[/i].
6.  * The {@link #await await} methods block until the current count reaches
7.  * zero due to invocations of the {@link #countDown} method, after which
8.  * all waiting threads are released and any subsequent invocations of
9.  * {@link #await await} return immediately.  This is a one-shot phenomenon
10.  * -- the count cannot be reset.  If you need a version that resets the
11.  * count, consider using a {@link CyclicBarrier}.
12.  *
13.  */



CountDownLatch中定义了一个内部类Sync,该类继承AbstractQueuedSynchronizer。从代码中可以看出,CountDownLatch的await,countDown以及getCount方法都调用了Sync的方法。CountDownLatch工具类相关的类图以及详细代码如下: 



Java并发编程: 使用CountDownLatch协调子线程_ide

 




1. /*
2.  * @(#)CountDownLatch.java  1.5 04/02/09
3.  *
4.  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5.  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6.  */
7.   
8. package
9. import
10. import
11.   
12. /**
13.  * A synchronization aid that allows one or more threads to wait until
14.  * a set of operations being performed in other threads completes.
15.  *
16.  * <p>A <tt>CountDownLatch</tt> is initialized with a given
17.  * [i]count[/i].  The {@link #await await} methods block until the current
18.  * {@link #getCount count} reaches zero due to invocations of the
19.  * {@link #countDown} method, after which all waiting threads are
20.  * released and any subsequent invocations of {@link #await await} return
21.  * immediately. This is a one-shot phenomenon -- the count cannot be
22.  * reset.  If you need a version that resets the count, consider using
23.  * a {@link CyclicBarrier}.
24.  *
25.  * <p>A <tt>CountDownLatch</tt> is a versatile synchronization tool
26.  * and can be used for a number of purposes.  A
27.  * <tt>CountDownLatch</tt> initialized with a count of one serves as a
28.  * simple on/off latch, or gate: all threads invoking {@link #await await}
29.  * wait at the gate until it is opened by a thread invoking {@link
30.  * #countDown}.  A <tt>CountDownLatch</tt> initialized to [i]N[/i]
31.  * can be used to make one thread wait until [i]N[/i] threads have
32.  * completed some action, or some action has been completed N times.
33.  * <p>A useful property of a <tt>CountDownLatch</tt> is that it
34.  * doesn't require that threads calling <tt>countDown</tt> wait for
35.  * the count to reach zero before proceeding, it simply prevents any
36.  * thread from proceeding past an {@link #await await} until all
37.  * threads could pass.
38.  *
39.  * <p><b>Sample usage:</b> Here is a pair of classes in which a group
40.  * of worker threads use two countdown latches:
41.  * [list]
42.  * <li>The first is a start signal that prevents any worker from proceeding
43.  * until the driver is ready for them to proceed;
44.  * <li>The second is a completion signal that allows the driver to wait
45.  * until all workers have completed.
46.  * [/list]
47.  *
48.  * <pre>
49.  * class Driver { // ...
50.  *   void main() throws InterruptedException {
51.  *     CountDownLatch startSignal = new CountDownLatch(1);
52.  *     CountDownLatch doneSignal = new CountDownLatch(N);
53.  *
54.  *     for (int i = 0; i < N; ++i) // create and start threads
55.  *       new Thread(new Worker(startSignal, doneSignal)).start();
56.  *
57.  *     doSomethingElse();            // don't let run yet
58.  *     startSignal.countDown();      // let all threads proceed
59.  *     doSomethingElse();
60.  *     doneSignal.await();           // wait for all to finish
61.  *   }
62.  * }
63.  *
64.  * class Worker implements Runnable {
65.  *   private final CountDownLatch startSignal;
66.  *   private final CountDownLatch doneSignal;
67.  *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
68.  *      this.startSignal = startSignal;
69.  *      this.doneSignal = doneSignal;
70.  *   }
71.  *   public void run() {
72.  *      try {
73.  *        startSignal.await();
74.  *        doWork();
75.  *        doneSignal.countDown();
76.  *      } catch (InterruptedException ex) {} // return;
77.  *   }
78.  *
79.  *   void doWork() { ... }
80.  * }
81.  *
82.  * </pre>
83.  *
84.  * <p>Another typical usage would be to divide a problem into N parts,
85.  * describe each part with a Runnable that executes that portion and
86.  * counts down on the latch, and queue all the Runnables to an
87.  * Executor.  When all sub-parts are complete, the coordinating thread
88.  * will be able to pass through await. (When threads must repeatedly
89.  * count down in this way, instead use a {@link CyclicBarrier}.)
90.  *
91.  * <pre>
92.  * class Driver2 { // ...
93.  *   void main() throws InterruptedException {
94.  *     CountDownLatch doneSignal = new CountDownLatch(N);
95.  *     Executor e = ...
96.  *
97.  *     for (int i = 0; i < N; ++i) // create and start threads
98.  *       e.execute(new WorkerRunnable(doneSignal, i));
99.  *
100.  *     doneSignal.await();           // wait for all to finish
101.  *   }
102.  * }
103.  *
104.  * class WorkerRunnable implements Runnable {
105.  *   private final CountDownLatch doneSignal;
106.  *   private final int i;
107.  *   WorkerRunnable(CountDownLatch doneSignal, int i) {
108.  *      this.doneSignal = doneSignal;
109.  *      this.i = i;
110.  *   }
111.  *   public void run() {
112.  *      try {
113.  *        doWork(i);
114.  *        doneSignal.countDown();
115.  *      } catch (InterruptedException ex) {} // return;
116.  *   }
117.  *
118.  *   void doWork() { ... }
119.  * }
120.  *
121.  * </pre>
122.  *
123.  * @since 1.5
124.  * @author Doug Lea
125.  */
126. public class
127. /**
128.      * Synchronization control For CountDownLatch.
129.      * Uses AQS state to represent count.
130.      */
131. private static final class Sync extends
132. int
133.             setState(count);   
134.         }  
135.           
136. int
137. return
138.         }  
139.   
140. public int tryAcquireShared(int
141. return getState() == 0? 1 : -1;  
142.         }  
143.           
144. public boolean tryReleaseShared(int
145. // Decrement count; signal when transition to zero
146. for
147. int
148. if (c == 0)  
149. return false;  
150. int nextc = c-1;  
151. if
152. return nextc == 0;  
153.             }  
154.         }  
155.     }  
156.   
157. private final
158. /**
159.      * Constructs a <tt>CountDownLatch</tt> initialized with the given
160.      * count.
161.      * 
162.      * @param count the number of times {@link #countDown} must be invoked
163.      * before threads can pass through {@link #await}.
164.      *
165.      * @throws IllegalArgumentException if <tt>count</tt> is less than zero.
166.      */
167. public CountDownLatch(int
168. if (count < 0) throw new IllegalArgumentException("count < 0");  
169. this.sync = new
170.     }  
171.   
172. /**
173.      * Causes the current thread to wait until the latch has counted down to 
174.      * zero, unless the thread is {@link Thread#interrupt interrupted}.
175.      *
176.      * <p>If the current {@link #getCount count} is zero then this method
177.      * returns immediately.
178.      * <p>If the current {@link #getCount count} is greater than zero then
179.      * the current thread becomes disabled for thread scheduling 
180.      * purposes and lies dormant until one of two things happen:
181.      * [list]
182.      * <li>The count reaches zero due to invocations of the
183.      * {@link #countDown} method; or
184.      * <li>Some other thread {@link Thread#interrupt interrupts} the current
185.      * thread.
186.      * [/list]
187.      * <p>If the current thread:
188.      * [list]
189.      * <li>has its interrupted status set on entry to this method; or 
190.      * <li>is {@link Thread#interrupt interrupted} while waiting, 
191.      * [/list]
192.      * then {@link InterruptedException} is thrown and the current thread's 
193.      * interrupted status is cleared. 
194.      *
195.      * @throws InterruptedException if the current thread is interrupted
196.      * while waiting.
197.      */
198. public void await() throws
199. 1);  
200.     }  
201.   
202. /**
203.      * Causes the current thread to wait until the latch has counted down to 
204.      * zero, unless the thread is {@link Thread#interrupt interrupted},
205.      * or the specified waiting time elapses.
206.      *
207.      * <p>If the current {@link #getCount count} is zero then this method
208.      * returns immediately with the value <tt>true</tt>.
209.      *
210.      * <p>If the current {@link #getCount count} is greater than zero then
211.      * the current thread becomes disabled for thread scheduling 
212.      * purposes and lies dormant until one of three things happen:
213.      * [list]
214.      * <li>The count reaches zero due to invocations of the
215.      * {@link #countDown} method; or
216.      * <li>Some other thread {@link Thread#interrupt interrupts} the current
217.      * thread; or
218.      * <li>The specified waiting time elapses.
219.      * [/list]
220.      * <p>If the count reaches zero then the method returns with the
221.      * value <tt>true</tt>.
222.      * <p>If the current thread:
223.      * [list]
224.      * <li>has its interrupted status set on entry to this method; or 
225.      * <li>is {@link Thread#interrupt interrupted} while waiting, 
226.      * [/list]
227.      * then {@link InterruptedException} is thrown and the current thread's 
228.      * interrupted status is cleared. 
229.      *
230.      * <p>If the specified waiting time elapses then the value <tt>false</tt>
231.      * is returned.
232.      * If the time is 
233.      * less than or equal to zero, the method will not wait at all.
234.      *
235.      * @param timeout the maximum time to wait
236.      * @param unit the time unit of the <tt>timeout</tt> argument.
237.      * @return <tt>true</tt> if the count reached zero  and <tt>false</tt>
238.      * if the waiting time elapsed before the count reached zero.
239.      *
240.      * @throws InterruptedException if the current thread is interrupted
241.      * while waiting.
242.      */
243. public boolean await(long
244. throws
245. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));  
246.     }  
247.   
248. /**
249.      * Decrements the count of the latch, releasing all waiting threads if
250.      * the count reaches zero.
251.      * <p>If the current {@link #getCount count} is greater than zero then
252.      * it is decremented. If the new count is zero then all waiting threads
253.      * are re-enabled for thread scheduling purposes.
254.      * <p>If the current {@link #getCount count} equals zero then nothing
255.      * happens.
256.      */
257. public void
258. 1);  
259.     }  
260.   
261. /**
262.      * Returns the current count.
263.      * <p>This method is typically used for debugging and testing purposes.
264.      * @return the current count.
265.      */
266. public long
267. return
268.     }  
269.   
270. /**
271.      * Returns a string identifying this latch, as well as its state.
272.      * The state, in brackets, includes the String 
273.      * "Count =" followed by the current count.
274.      * @return a string identifying this latch, as well as its
275.      * state
276.      */
277. public
278. return super.toString() + "[Count = " + sync.getCount() + "]";  
279.     }  
280.   
281. }


2. CountDownLatch工具类的使用案例 

CountDownLatch的作用是控制一个计数器,每个线程在运行完毕后执行countDown,表示自己运行结束,这对于多个子任务的计算特别有效,比如一个异步任务需要拆分成10个子任务执行,主任务必须知道子任务是否完成,所有子任务完成后才能进行合并计算,从而保证了一个主任务逻辑的正确性。(此段摘自于<<改善Java程序的151个建议>>, P254) 

CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。 

本实例主要使用CountDownLatch工具类来实现10个线程对1~100的求和,每个线程对10个数进行求和。第一个线程对1 – 10求和 
第二个线程对 11 – 20求和 
第三个线程对21 – 30 求和 

….. 
第十个线程对91 – 100求和。 

具体的代码如下: 

1. package
2.   
3. import
4. import
5.   
6. public class Calculator implements
7.   
8. //开始信号
9. private final
10.       
11. //结束信号
12. private final
13.       
14. private int groupNumber = 0;  
15.   
16. /**
17.      * @param startSignal
18.      * @param endSignal
19.      * @param groupId
20.      */
21. public
22. int
23. this.startSignal = startSignal;  
24. this.doneSignal = doneSignal;  
25. this.groupNumber = groupNumber;  
26.     }  
27.   
28. public Integer call() throws
29.   
30.         startSignal.await();  
31.   
32.         Integer result = sum(groupNumber);  
33.   
34.         printCompleteInfor(groupNumber,result);  
35.           
36.         doneSignal.countDown();  
37.   
38. return
39.     }  
40.   
41. private Integer sum(int
42. if (groupNumber < 1) {  
43. throw new
44.         }  
45.   
46. int sum = 0;  
47. int start = (groupNumber - 1) * 10 + 1;  
48. int end = groupNumber * 10;  
49. for (int
50.             sum += i;  
51.         }  
52. return
53.     }  
54.       
55. private void printCompleteInfor(int groupNumber, int
56.     {  
57. "Group %d is finished, the sum in this gropu is %d", groupNumber, sum));  
58.     }  
59.   
60. }


1. package
2.   
3. import
4. import
5. import
6. import
7. import
8. import
9. import
10.   
11. public class
12.   
13. public static void main(String[] args) throws
14. /**
15.          * 1-100求和,分10个线程来计算,每个线程对10个数求和。
16.          */
17. int numOfGroups = 10;  
18. new CountDownLatch(1);  
19.           
20. new
21.           
22.         ExecutorService service = Executors.newFixedThreadPool(numOfGroups);  
23. new
24.   
25.         submit(futures, numOfGroups, service, startSignal, doneSignal);  
26.           
27. /**
28.          * 开始,让所有的求和计算线程运行
29.          */
30.         startSignal.countDown();  
31.           
32. /**
33.          * 阻塞,知道所有计算线程完成计算
34.          */
35.         doneSignal.await();  
36.   
37.         shutdown(service);  
38.           
39.         printResult(futures);  
40.     }  
41.   
42. private static void submit(List<Future<Integer>> futures, int
43.             ExecutorService service, CountDownLatch startSignal,  
44.             CountDownLatch doneSignal) {  
45. for (int groupNumber = 1; groupNumber <= numOfGroups; groupNumber++) {  
46. new
47.                     groupNumber)));  
48.         }  
49.     }  
50.   
51. private static int
52. throws
53. int result = 0;  
54. for
55.             result += f.get();  
56.         }  
57. return
58.     }  
59.   
60. private static void
61. throws
62. "[1,100] Sum is :"
63.     }  
64.       
65. private static void
66.     {  
67.         service.shutdown();  
68.     }  
69.   
70. }


一次的执行结果如下: 
Group 8 is finished, the sum in this gropu is 755 
Group 2 is finished, the sum in this gropu is 155 
Group 10 is finished, the sum in this gropu is 955 
Group 5 is finished, the sum in this gropu is 455 
Group 7 is finished, the sum in this gropu is 655 
Group 3 is finished, the sum in this gropu is 255 
Group 9 is finished, the sum in this gropu is 855 
Group 1 is finished, the sum in this gropu is 55 
Group 4 is finished, the sum in this gropu is 355 
Group 6 is finished, the sum in this gropu is 555 
[1,100] Sum is :5050 

举报

相关推荐

0 条评论