0
点赞
收藏
分享

微信扫一扫

Storm详解二、写第一个Storm应用


Storm详解二、写第一个Storm应用



分类: Storm 2014-08-04 15:10  305人阅读  评论(0)  收藏  举报



storm demo 例子 实例



     在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm。


Storm运行模式:


  1. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试。
  2. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。

写一个HelloWord Storm



     我们现在创建这么一个应用,统计文本文件中的单词个数,详细学习过Hadoop的朋友都应该写过。那么我们需要具体创建这样一个Topology,用一个spout负责读取文本文件,用第一个bolt来解析成单词,用第二个bolt来对解析出的单词计数,整体结构如图所示:



可以从这里下载源码:https://github.com/storm-book/examples-ch02-getting_started/zipball/master




Storm详解二、写第一个Storm应用_ide


     写一个可运行的Demo很简单,我们只需要三步:


  1. 创建一个Spout读取数据
  2. 创建bolt处理数据
  3. 创建一个Topology提交到集群

下面我们就写一下,以下代码拷贝到eclipse(依赖的jar包到官网下载即可)即可运行。


1.创建一个Spout作为数据源


     Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。

 

1. package storm.demo.spout;  
2.   
3. import java.io.BufferedReader;  
4. import java.io.FileNotFoundException;  
5. import java.io.FileReader;  
6. import java.util.Map;  
7. import backtype.storm.spout.SpoutOutputCollector;  
8. import backtype.storm.task.TopologyContext;  
9. import backtype.storm.topology.IRichSpout;  
10. import backtype.storm.topology.OutputFieldsDeclarer;  
11. import backtype.storm.tuple.Fields;  
12. import backtype.storm.tuple.Values;  
13. public class WordReader implements IRichSpout {  
14. private static final long serialVersionUID = 1L;  
15. private SpoutOutputCollector collector;  
16. private FileReader fileReader;  
17. private boolean completed = false;  
18.   
19. public boolean isDistributed() {  
20. return false;  
21.     }  
22. /**
23.      * 这是第一个方法,里面接收了三个参数,第一个是创建Topology时的配置,
24.      * 第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt
25.      * **/  
26. @Override  
27. public void open(Map conf, TopologyContext context,  
28.             SpoutOutputCollector collector) {  
29. try {  
30. //获取创建Topology时指定的要读取的文件路径  
31. this.fileReader = new FileReader(conf.get("wordsFile").toString());  
32. catch (FileNotFoundException e) {  
33. throw new RuntimeException("Error reading file ["  
34. "wordFile") + "]");  
35.         }  
36. //初始化发射器  
37. this.collector = collector;  
38.   
39.     }  
40. /**
41.      * 这是Spout最主要的方法,在这里我们读取文本文件,并把它的每一行发射出去(给bolt)
42.      * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下
43.      * **/  
44. @Override  
45. public void nextTuple() {  
46. if (completed) {  
47. try {  
48. 1000);  
49. catch (InterruptedException e) {  
50. // Do nothing  
51.             }  
52. return;  
53.         }  
54.         String str;  
55. // Open the reader  
56. new BufferedReader(fileReader);  
57. try {  
58. // Read all lines  
59. while ((str = reader.readLine()) != null) {  
60. /**
61.                  * 发射每一行,Values是一个ArrayList的实现
62.                  */  
63. this.collector.emit(new Values(str), str);  
64.             }  
65. catch (Exception e) {  
66. throw new RuntimeException("Error reading tuple", e);  
67. finally {  
68. true;  
69.         }  
70.   
71.     }  
72. @Override  
73. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
74. new Fields("line"));  
75.   
76.     }  
77. @Override  
78. public void close() {  
79. // TODO Auto-generated method stub  
80.     }  
81.       
82. @Override  
83. public void activate() {  
84. // TODO Auto-generated method stub  
85.   
86.     }  
87. @Override  
88. public void deactivate() {  
89. // TODO Auto-generated method stub  
90.   
91.     }  
92. @Override  
93. public void ack(Object msgId) {  
94. "OK:" + msgId);  
95.     }  
96. @Override  
97. public void fail(Object msgId) {  
98. "FAIL:" + msgId);  
99.   
100.     }  
101. @Override  
102. public Map<String, Object> getComponentConfiguration() {  
103. // TODO Auto-generated method stub  
104. return null;  
105.     }  
106. }


2.创建两个bolt来处理Spout发射出的数据


     Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。


     Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用。


     第一个bolt:WordNormalizer

 

1. package storm.demo.bolt;  
2. import java.util.ArrayList;  
3. import java.util.List;  
4. import java.util.Map;  
5. import backtype.storm.task.OutputCollector;  
6. import backtype.storm.task.TopologyContext;  
7. import backtype.storm.topology.IRichBolt;  
8. import backtype.storm.topology.OutputFieldsDeclarer;  
9. import backtype.storm.tuple.Fields;  
10. import backtype.storm.tuple.Tuple;  
11. import backtype.storm.tuple.Values;  
12. public class WordNormalizer implements IRichBolt {  
13. private OutputCollector collector;  
14. @Override  
15. public void prepare(Map stormConf, TopologyContext context,  
16.             OutputCollector collector) {  
17. this.collector = collector;  
18.     }  
19. /**这是bolt中最重要的方法,每当接收到一个tuple时,此方法便被调用
20.      * 这个方法的作用就是把文本文件中的每一行切分成一个个单词,并把这些单词发射出去(给下一个bolt处理)
21.      * **/  
22. @Override  
23. public void execute(Tuple input) {  
24. 0);  
25. " ");  
26. for (String word : words) {  
27.             word = word.trim();  
28. if (!word.isEmpty()) {  
29.                 word = word.toLowerCase();  
30. // Emit the word  
31. new ArrayList();  
32.                 a.add(input);  
33. new Values(word));  
34.             }  
35.         }  
36. //确认成功处理一个tuple  
37.         collector.ack(input);  
38.     }  
39. @Override  
40. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
41. new Fields("word"));  
42.   
43.     }  
44. @Override  
45. public void cleanup() {  
46. // TODO Auto-generated method stub  
47.   
48.     }  
49. @Override  
50. public Map<String, Object> getComponentConfiguration() {  
51. // TODO Auto-generated method stub  
52. return null;  
53.     }  
54. }


     第二个bolt:WordCounter


 

1. package storm.demo.bolt;  
2. import java.util.HashMap;  
3. import java.util.Map;  
4. import backtype.storm.task.OutputCollector;  
5. import backtype.storm.task.TopologyContext;  
6. import backtype.storm.topology.IRichBolt;  
7. import backtype.storm.topology.OutputFieldsDeclarer;  
8. import backtype.storm.tuple.Tuple;  
9.   
10. public class WordCounter implements IRichBolt {  
11.     Integer id;  
12.     String name;  
13.     Map<String, Integer> counters;  
14. private OutputCollector collector;  
15.   
16. @Override  
17. public void prepare(Map stormConf, TopologyContext context,  
18.             OutputCollector collector) {  
19. this.counters = new HashMap<String, Integer>();  
20. this.collector = collector;  
21. this.name = context.getThisComponentId();  
22. this.id = context.getThisTaskId();  
23.   
24.     }  
25. @Override  
26. public void execute(Tuple input) {  
27. 0);  
28. if (!counters.containsKey(str)) {  
29. 1);  
30. else {  
31. 1;  
32.             counters.put(str, c);  
33.         }  
34. // 确认成功处理一个tuple  
35.         collector.ack(input);  
36.     }  
37. /**
38.      * Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里
39.      * 因为这只是个Demo,我们用它来打印我们的计数器
40.      * */  
41. @Override  
42. public void cleanup() {  
43. "-- Word Counter [" + name + "-" + id + "] --");  
44. for (Map.Entry<String, Integer> entry : counters.entrySet()) {  
45. ": " + entry.getValue());  
46.         }  
47.         counters.clear();  
48.     }  
49. @Override  
50. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
51. // TODO Auto-generated method stub  
52.   
53.     }  
54. @Override  
55. public Map<String, Object> getComponentConfiguration() {  
56. // TODO Auto-generated method stub  
57. return null;  
58.     }  
59. }


3.在main函数中创建一个Topology


     在这里我们要创建一个Topology和一个LocalCluster对象,还有一个Config对象做一些配置。   


 

1. package storm.demo;  
2.   
3. import storm.demo.bolt.WordCounter;  
4. import storm.demo.bolt.WordNormalizer;  
5. import storm.demo.spout.WordReader;  
6. import backtype.storm.Config;  
7. import backtype.storm.LocalCluster;  
8. import backtype.storm.topology.TopologyBuilder;  
9. import backtype.storm.tuple.Fields;  
10. public class WordCountTopologyMain {  
11. public static void main(String[] args) throws InterruptedException {  
12. //定义一个Topology  
13. new TopologyBuilder();  
14. "word-reader",new WordReader());  
15. "word-normalizer", new WordNormalizer())  
16. "word-reader");  
17. "word-counter", new WordCounter(),2)  
18. "word-normalizer", new Fields("word"));  
19. //配置  
20. new Config();  
21. "wordsFile", "d:/text.txt");  
22. false);  
23. //提交Topology  
24. 1);  
25. //创建一个本地模式cluster  
26. new LocalCluster();  
27. "Getting-Started-Toplogie", conf,  
28.         builder.createTopology());  
29. 1000);  
30.         cluster.shutdown();  
31.     }  
32. }


     运行这个函数我们即可看到后台打印出来的单词个数。


    (ps:因为是Local模式,运行开始可能会打印很多错误log,这个先不用管)


 


Storm详解二、写第一个Storm应用_ide_02


 


  • 上一篇Storm详解一、Storm 概述
  • 下一篇Storm专题一、Storm DRPC 分布式计算

0


     在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm。

Storm运行模式:

  1. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试。
  2. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。

写一个HelloWord Storm


     我们现在创建这么一个应用,统计文本文件中的单词个数,详细学习过Hadoop的朋友都应该写过。那么我们需要具体创建这样一个Topology,用一个spout负责读取文本文件,用第一个bolt来解析成单词,用第二个bolt来对解析出的单词计数,整体结构如图所示:


可以从这里下载源码:https://github.com/storm-book/examples-ch02-getting_started/zipball/master


Storm详解二、写第一个Storm应用_ide


     写一个可运行的Demo很简单,我们只需要三步:


  1. 创建一个Spout读取数据
  2. 创建bolt处理数据
  3. 创建一个Topology提交到集群

下面我们就写一下,以下代码拷贝到eclipse(依赖的jar包到官网下载即可)即可运行。


1.创建一个Spout作为数据源


     Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。


 

1. package storm.demo.spout;  
2.   
3. import java.io.BufferedReader;  
4. import java.io.FileNotFoundException;  
5. import java.io.FileReader;  
6. import java.util.Map;  
7. import backtype.storm.spout.SpoutOutputCollector;  
8. import backtype.storm.task.TopologyContext;  
9. import backtype.storm.topology.IRichSpout;  
10. import backtype.storm.topology.OutputFieldsDeclarer;  
11. import backtype.storm.tuple.Fields;  
12. import backtype.storm.tuple.Values;  
13. public class WordReader implements IRichSpout {  
14. private static final long serialVersionUID = 1L;  
15. private SpoutOutputCollector collector;  
16. private FileReader fileReader;  
17. private boolean completed = false;  
18.   
19. public boolean isDistributed() {  
20. return false;  
21.     }  
22. /**
23.      * 这是第一个方法,里面接收了三个参数,第一个是创建Topology时的配置,
24.      * 第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt
25.      * **/  
26. @Override  
27. public void open(Map conf, TopologyContext context,  
28.             SpoutOutputCollector collector) {  
29. try {  
30. //获取创建Topology时指定的要读取的文件路径  
31. this.fileReader = new FileReader(conf.get("wordsFile").toString());  
32. catch (FileNotFoundException e) {  
33. throw new RuntimeException("Error reading file ["  
34. "wordFile") + "]");  
35.         }  
36. //初始化发射器  
37. this.collector = collector;  
38.   
39.     }  
40. /**
41.      * 这是Spout最主要的方法,在这里我们读取文本文件,并把它的每一行发射出去(给bolt)
42.      * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下
43.      * **/  
44. @Override  
45. public void nextTuple() {  
46. if (completed) {  
47. try {  
48. 1000);  
49. catch (InterruptedException e) {  
50. // Do nothing  
51.             }  
52. return;  
53.         }  
54.         String str;  
55. // Open the reader  
56. new BufferedReader(fileReader);  
57. try {  
58. // Read all lines  
59. while ((str = reader.readLine()) != null) {  
60. /**
61.                  * 发射每一行,Values是一个ArrayList的实现
62.                  */  
63. this.collector.emit(new Values(str), str);  
64.             }  
65. catch (Exception e) {  
66. throw new RuntimeException("Error reading tuple", e);  
67. finally {  
68. true;  
69.         }  
70.   
71.     }  
72. @Override  
73. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
74. new Fields("line"));  
75.   
76.     }  
77. @Override  
78. public void close() {  
79. // TODO Auto-generated method stub  
80.     }  
81.       
82. @Override  
83. public void activate() {  
84. // TODO Auto-generated method stub  
85.   
86.     }  
87. @Override  
88. public void deactivate() {  
89. // TODO Auto-generated method stub  
90.   
91.     }  
92. @Override  
93. public void ack(Object msgId) {  
94. "OK:" + msgId);  
95.     }  
96. @Override  
97. public void fail(Object msgId) {  
98. "FAIL:" + msgId);  
99.   
100.     }  
101. @Override  
102. public Map<String, Object> getComponentConfiguration() {  
103. // TODO Auto-generated method stub  
104. return null;  
105.     }  
106. }

2.创建两个bolt来处理Spout发射出的数据


     Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。


     Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用。


     第一个bolt:WordNormalizer


1. package storm.demo.bolt;  
2. import java.util.ArrayList;  
3. import java.util.List;  
4. import java.util.Map;  
5. import backtype.storm.task.OutputCollector;  
6. import backtype.storm.task.TopologyContext;  
7. import backtype.storm.topology.IRichBolt;  
8. import backtype.storm.topology.OutputFieldsDeclarer;  
9. import backtype.storm.tuple.Fields;  
10. import backtype.storm.tuple.Tuple;  
11. import backtype.storm.tuple.Values;  
12. public class WordNormalizer implements IRichBolt {  
13. private OutputCollector collector;  
14. @Override  
15. public void prepare(Map stormConf, TopologyContext context,  
16.             OutputCollector collector) {  
17. this.collector = collector;  
18.     }  
19. /**这是bolt中最重要的方法,每当接收到一个tuple时,此方法便被调用
20.      * 这个方法的作用就是把文本文件中的每一行切分成一个个单词,并把这些单词发射出去(给下一个bolt处理)
21.      * **/  
22. @Override  
23. public void execute(Tuple input) {  
24. 0);  
25. " ");  
26. for (String word : words) {  
27.             word = word.trim();  
28. if (!word.isEmpty()) {  
29.                 word = word.toLowerCase();  
30. // Emit the word  
31. new ArrayList();  
32.                 a.add(input);  
33. new Values(word));  
34.             }  
35.         }  
36. //确认成功处理一个tuple  
37.         collector.ack(input);  
38.     }  
39. @Override  
40. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
41. new Fields("word"));  
42.   
43.     }  
44. @Override  
45. public void cleanup() {  
46. // TODO Auto-generated method stub  
47.   
48.     }  
49. @Override  
50. public Map<String, Object> getComponentConfiguration() {  
51. // TODO Auto-generated method stub  
52. return null;  
53.     }  
54. }


     第二个bolt:WordCounter


 

1. package storm.demo.bolt;  
2. import java.util.HashMap;  
3. import java.util.Map;  
4. import backtype.storm.task.OutputCollector;  
5. import backtype.storm.task.TopologyContext;  
6. import backtype.storm.topology.IRichBolt;  
7. import backtype.storm.topology.OutputFieldsDeclarer;  
8. import backtype.storm.tuple.Tuple;  
9.   
10. public class WordCounter implements IRichBolt {  
11.     Integer id;  
12.     String name;  
13.     Map<String, Integer> counters;  
14. private OutputCollector collector;  
15.   
16. @Override  
17. public void prepare(Map stormConf, TopologyContext context,  
18.             OutputCollector collector) {  
19. this.counters = new HashMap<String, Integer>();  
20. this.collector = collector;  
21. this.name = context.getThisComponentId();  
22. this.id = context.getThisTaskId();  
23.   
24.     }  
25. @Override  
26. public void execute(Tuple input) {  
27. 0);  
28. if (!counters.containsKey(str)) {  
29. 1);  
30. else {  
31. 1;  
32.             counters.put(str, c);  
33.         }  
34. // 确认成功处理一个tuple  
35.         collector.ack(input);  
36.     }  
37. /**
38.      * Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里
39.      * 因为这只是个Demo,我们用它来打印我们的计数器
40.      * */  
41. @Override  
42. public void cleanup() {  
43. "-- Word Counter [" + name + "-" + id + "] --");  
44. for (Map.Entry<String, Integer> entry : counters.entrySet()) {  
45. ": " + entry.getValue());  
46.         }  
47.         counters.clear();  
48.     }  
49. @Override  
50. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
51. // TODO Auto-generated method stub  
52.   
53.     }  
54. @Override  
55. public Map<String, Object> getComponentConfiguration() {  
56. // TODO Auto-generated method stub  
57. return null;  
58.     }  
59. }


3.在main函数中创建一个Topology


     在这里我们要创建一个Topology和一个LocalCluster对象,还有一个Config对象做一些配置。   


 

1. package storm.demo;  
2.   
3. import storm.demo.bolt.WordCounter;  
4. import storm.demo.bolt.WordNormalizer;  
5. import storm.demo.spout.WordReader;  
6. import backtype.storm.Config;  
7. import backtype.storm.LocalCluster;  
8. import backtype.storm.topology.TopologyBuilder;  
9. import backtype.storm.tuple.Fields;  
10. public class WordCountTopologyMain {  
11. public static void main(String[] args) throws InterruptedException {  
12. //定义一个Topology  
13. new TopologyBuilder();  
14. "word-reader",new WordReader());  
15. "word-normalizer", new WordNormalizer())  
16. "word-reader");  
17. "word-counter", new WordCounter(),2)  
18. "word-normalizer", new Fields("word"));  
19. //配置  
20. new Config();  
21. "wordsFile", "d:/text.txt");  
22. false);  
23. //提交Topology  
24. 1);  
25. //创建一个本地模式cluster  
26. new LocalCluster();  
27. "Getting-Started-Toplogie", conf,  
28.         builder.createTopology());  
29. 1000);  
30.         cluster.shutdown();  
31.     }  
32. }

     运行这个函数我们即可看到后台打印出来的单词个数。


    (ps:因为是Local模式,运行开始可能会打印很多错误log,这个先不用管)


 


Storm详解二、写第一个Storm应用_ide_02


举报

相关推荐

0 条评论