0
点赞
收藏
分享

微信扫一扫

"相对平均"分布算法


  有一个项目用来负责调度集群中的"cron任务",比如一个application中可以配置N个定时任务,这些任务信息最终注册到zookeeper上,并开发了一系列代码用于维护这些任务的"活性";当applicaton中一个server故障,那么这个server上接管的任务,需要迁移到其他server上,如果多个server存活的话,还需要这些任务能够"均衡"的分布.

   其中"负载均衡",很好理解,比如有6个任务,3个server,那么就需要每个server上尽可能的运行2个任务;其实这个事情想起来很简单,但是做起来似乎有些不得不考虑的问题:

    1) "相对平均"怎么设计

    2) 迁移任务时,是否会丢失任务的触发时机;比如一个任务凌晨3点执行,刚好此时运行了一次"均衡",任务在原来的server上没有触发,在新的server上又过了时间..

    3) 迁移任务时,还需要考虑"最少移动"次数,不能大面积迁移任务;只能从"负载高"的server上迁移到"负载低"的.

 

例如:

    sid1: w1 w2 w3 w4

    sid2: w5

    sid3:w6

期望迁移之后:

    sid1:w1 w2

    sid2:w5 w3

    sid3:w4 w6

 

而不是(这种结果迁移的面积太大,只需要把"多余"的任务迁移出去即可,而不是重新洗牌再均衡)

    sid1:w6 w5

    sid2:w2 w3

    sid3:w1 w4

   

经过提取,"相对平均"的设计代码如下,仅作备忘:


1. package
2.   
3. import
4. import
5. import
6.   
7. public class
8. private List<String> servers = new
9. private Map<String, List<String>> current = new
10. private Set<String> workers = new
11.   
12. public static void
13. new BufferedReader(new
14.         String line;  
15. new
16. new
17. try
18. while ((line = br.readLine()) != null) {  
19. if (line.startsWith("addWorker")) {  
20.                     balancer.addWorkers(line);  
21. else if (line.startsWith("addServer")) {  
22.                     balancer.addServers(line);  
23. else
24. "???");  
25. continue;  
26.                 }  
27.                 balancer.rebalance();  
28.             }  
29. catch
30.             e.printStackTrace();  
31.         }  
32. "--END---");  
33.     }  
34.   
35. public void
36. int index = source.indexOf(" ");  
37. if (index == -1) {  
38. return;  
39.         }  
40. 1).split(" ");  
41. if (values == null || values.length == 0) {  
42. return;  
43.         }  
44. for
45.             servers.add(server);  
46. if(current.get(server) == null){  
47. new
48.             }  
49.         }  
50.     }  
51.   
52. public void
53. int index = source.indexOf(" ");  
54. if (index == -1) {  
55. return;  
56.         }  
57. 1).split(" ");  
58. if (values == null || values.length == 0) {  
59. return;  
60.         }  
61. //当有新的worker提交时,将咱有一台机器接管
62. 0);  
63.         List<String> sw = current.get(sid);  
64. if(sw == null){  
65. new
66.         }  
67. for
68.             workers.add(worker);  
69.             sw.add(worker);  
70.         }  
71.   
72.     }  
73.   
74. public void
75. try
76. if
77. return;  
78.             }  
79. for
80. if (current.get(sid) == null) {  
81. new
82.                 }  
83.             }  
84. //根据每个sid上的worker个数,整理成一个排序的map
85. new
86. for
87. int
88.                 List<String> sl = counterMap.get(total);  
89. if (sl == null) {  
90. new
91.                     counterMap.put(total, sl);  
92.                 }  
93. //sid
94.             }  
95. int
96. int
97. int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数
98. while (true) {  
99. //大于平均数的列表, >= avg + 1
100. //与平均数差值为2的 <= arg  - 1
101. //允许任务个数与avg上线浮动1各个,不是绝对的平均
102.   
103. if (gt == null || lt == null) {  
104. break;  
105.                 }  
106.                 Integer gtKey = gt.getKey();  
107.                 Integer ltKey = lt.getKey();  
108. if (gt.getKey() - lt.getKey() < 2) {  
109. break;  
110.                 }  
111. if (gt.getValue().size() == 0) {  
112.                     counterMap.remove(gt.getKey());  
113.                 }  
114. if (lt.getValue().size() == 0) {  
115.                     counterMap.remove(lt.getKey());  
116.                 }  
117. //sid列表
118. while
119.                     String _fromSid = it.next();  
120.                     List<String> _currentWorkers = current.get(_fromSid);  
121. if (_currentWorkers == null
122.                         it.remove();  
123.                         current.remove(_fromSid);  
124. continue;  
125.                     }  
126.                     List<String> _ltServers = lt.getValue();  
127. if
128.                         counterMap.remove(ltKey);  
129. break;  
130.                     }  
131. //取出需要交换出去的任务id
132. int
133. 1);  
134. 0);  
135. //从_fromSid的worker列表中移除低workerId
136. //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部
137. //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数"
138. 1);  
139.                     it.remove();  
140. 0);  
141. //将此workerId添加到_toSid的worker列表中
142.                     List<String> _ltWorkers = current.get(_toSid);  
143. if (_ltWorkers == null) {  
144. new
145.                         current.put(_toSid, _ltWorkers);  
146.                     }  
147.                     _ltWorkers.add(_wid);  
148. //将gt的key降低一个数字
149. 1);  
150. if (_next == null) {  
151. new
152. 1, _next);  
153.                     }  
154.                     _next.add(_fromSid);  
155. //将lt的key提升一个数字
156. 1);  
157. //从lt的countMap中移除,因为它将被放置在key + 1的新位置
158.                     Iterator<String> _ltIt = _ltServers.iterator();  
159. while
160. if
161.                             _ltIt.remove();  
162. break;  
163.                         }  
164.                     }  
165. if (_prev == null) {  
166. new
167. 1, _prev);  
168.                     }  
169.                     _prev.add(_toSid);  
170.                 }  
171.             }  
172. //dump info
173. for
174. "Sid:"
175.                 System.out.println(entry.getValue().toString());  
176.             }  
177. catch
178.             e.printStackTrace();  

举报

相关推荐

css 文本平均分布

滑动平均滤波算法

0 条评论