0
点赞
收藏
分享

微信扫一扫

storm的代码实现

彪悍的鼹鼠 2022-06-17 阅读 39

先模拟产生一些数据

storm的代码实现_android

 

 storm的代码实现_ios_02

我把这些数据摘一部分下来

1 2017-06-10 18:25:56,092 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - Kafka version : 0.9.0.1
2 2017-06-10 18:25:56,092 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - Kafka commitId : 23c69d62a0cabf06
3 {"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":3936,"platform":"ios","timestamp":1497090356094}
4 {"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":6824,"platform":"android","timestamp":1497090356194}
5 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":9389,"platform":"ios","timestamp":1497090356294}
6 {"id":"865456863256326","vid":"1495267869123452","uid":"964226522333222","gold":3054,"platform":"ios","timestamp":1497090356394}
7 {"id":"865456863256329","vid":"1495267869123454","uid":"964226522333224","gold":1518,"platform":"android","timestamp":1497090356494}
8 {"id":"865456863256324","vid":"1495267869123452","uid":"964226522333222","gold":7668,"platform":"ios","timestamp":1497090356594}
9 {"id":"865456863256321","vid":"1495267869123454","uid":"964226522333224","gold":1665,"platform":"android","timestamp":1497090356694}
10 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":1727,"platform":"ios","timestamp":1497090356794}
11 {"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":6371,"platform":"ios","timestamp":1497090356894}
12 {"id":"865456863256328","vid":"1495267869123452","uid":"964226522333222","gold":495,"platform":"android","timestamp":1497090356994}
13 {"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":7543,"platform":"ios","timestamp":1497090417094}
14 {"id":"865456863256322","vid":"1495267869123454","uid":"964226522333224","gold":1901,"platform":"android","timestamp":1497090417194}
15 {"id":"865456863256329","vid":"1495267869123452","uid":"964226522333222","gold":8043,"platform":"ios","timestamp":1497090417294}
16 {"id":"865456863256321","vid":"1495267869123452","uid":"964226522333222","gold":9325,"platform":"ios","timestamp":1497090417394}
17 {"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":4408,"platform":"android","timestamp":1497090417494}
18 {"id":"865456863256320","vid":"1495267869123450","uid":"964226522333220","gold":8715,"platform":"android","timestamp":1497090417594}
19 {"id":"865456863256321","vid":"1495267869123450","uid":"964226522333220","gold":592,"platform":"ios","timestamp":1497090417694}
20 {"id":"865456863256321","vid":"1495267869123450","uid":"964226522333220","gold":4319,"platform":"android","timestamp":1497090417794}
21 {"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":416,"platform":"ios","timestamp":1497090417894}
22 {"id":"865456863256329","vid":"1495267869123454","uid":"964226522333224","gold":4410,"platform":"android","timestamp":1497090417994}
23 {"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":7197,"platform":"ios","timestamp":1497090478095}
24 {"id":"865456863256327","vid":"1495267869123451","uid":"964226522333221","gold":1737,"platform":"ios","timestamp":1497090478195}
25 {"id":"865456863256324","vid":"1495267869123453","uid":"964226522333223","gold":2425,"platform":"android","timestamp":1497090478295}
26 {"id":"865456863256326","vid":"1495267869123454","uid":"964226522333224","gold":6847,"platform":"ios","timestamp":1497090478395}
27 {"id":"865456863256322","vid":"1495267869123454","uid":"964226522333224","gold":1932,"platform":"android","timestamp":1497090478495}
28 {"id":"865456863256324","vid":"1495267869123454","uid":"964226522333224","gold":4428,"platform":"ios","timestamp":1497090478595}
29 {"id":"865456863256320","vid":"1495267869123453","uid":"964226522333223","gold":3708,"platform":"android","timestamp":1497090478695}
30 {"id":"865456863256321","vid":"1495267869123452","uid":"964226522333222","gold":5290,"platform":"ios","timestamp":1497090478795}
31 {"id":"865456863256328","vid":"1495267869123452","uid":"964226522333222","gold":5080,"platform":"android","timestamp":1497090478895}
32 {"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":9643,"platform":"android","timestamp":1497090478995}
33 {"id":"865456863256324","vid":"1495267869123452","uid":"964226522333222","gold":3766,"platform":"ios","timestamp":1497090539095}
34 {"id":"865456863256326","vid":"1495267869123451","uid":"964226522333221","gold":3758,"platform":"android","timestamp":1497090539195}
35 {"id":"865456863256328","vid":"1495267869123451","uid":"964226522333221","gold":2522,"platform":"android","timestamp":1497090539295}
36 {"id":"865456863256322","vid":"1495267869123450","uid":"964226522333220","gold":8746,"platform":"android","timestamp":1497090539395}
37 {"id":"865456863256328","vid":"1495267869123451","uid":"964226522333221","gold":7616,"platform":"ios","timestamp":1497090539495}
38 {"id":"865456863256325","vid":"1495267869123454","uid":"964226522333224","gold":527,"platform":"android","timestamp":1497090539595}
39 {"id":"865456863256327","vid":"1495267869123451","uid":"964226522333221","gold":3887,"platform":"ios","timestamp":1497090539695}
40 {"id":"865456863256325","vid":"1495267869123450","uid":"964226522333220","gold":2137,"platform":"ios","timestamp":1497090539795}
41 {"id":"865456863256329","vid":"1495267869123453","uid":"964226522333223","gold":6965,"platform":"android","timestamp":1497090539895}
42 {"id":"865456863256325","vid":"1495267869123451","uid":"964226522333221","gold":350,"platform":"android","timestamp":1497090539995}
43 {"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":863,"platform":"android","timestamp":1497090600096}
44 {"id":"865456863256320","vid":"1495267869123454","uid":"964226522333224","gold":9597,"platform":"ios","timestamp":1497090600196}
45 {"id":"865456863256324","vid":"1495267869123454","uid":"964226522333224","gold":9504,"platform":"ios","timestamp":1497090600296}
46 {"id":"865456863256322","vid":"1495267869123451","uid":"964226522333221","gold":1598,"platform":"ios","timestamp":1497090600396}
47 {"id":"865456863256325","vid":"1495267869123451","uid":"964226522333221","gold":1126,"platform":"android","timestamp":1497090600496}
48 {"id":"865456863256324","vid":"1495267869123453","uid":"964226522333223","gold":3606,"platform":"android","timestamp":1497090600596}
49 {"id":"865456863256326","vid":"1495267869123450","uid":"964226522333220","gold":1866,"platform":"ios","timestamp":1497090600696}
50 {"id":"865456863256323","vid":"1495267869123453","uid":"964226522333223","gold":1282,"platform":"android","timestamp":1497090600796}
51 {"id":"865456863256325","vid":"1495267869123450","uid":"964226522333220","gold":542,"platform":"ios","timestamp":1497090600896}
52 {"id":"865456863256326","vid":"1495267869123450","uid":"964226522333220","gold":4168,"platform":"android","timestamp":1497090600996}
53 {"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":4766,"platform":"android","timestamp":1497090661096}
54 {"id":"865456863256323","vid":"1495267869123451","uid":"964226522333221","gold":3867,"platform":"ios","timestamp":1497090661196}
55 {"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":7825,"platform":"ios","timestamp":1497090661296}
56 {"id":"865456863256320","vid":"1495267869123454","uid":"964226522333224","gold":4518,"platform":"ios","timestamp":1497090661396}
57 {"id":"865456863256326","vid":"1495267869123453","uid":"964226522333223","gold":4280,"platform":"ios","timestamp":1497090661496}
58 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":4909,"platform":"android","timestamp":1497090661596}
59 {"id":"865456863256325","vid":"1495267869123452","uid":"964226522333222","gold":7227,"platform":"ios","timestamp":1497090661696}
60 {"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":9937,"platform":"android","timestamp":1497090661796}
61 {"id":"865456863256321","vid":"1495267869123451","uid":"964226522333221","gold":7840,"platform":"ios","timestamp":1497090661896}
62 {"id":"865456863256326","vid":"1495267869123453","uid":"964226522333223","gold":2762,"platform":"ios","timestamp":1497090661996}
63 {"id":"865456863256322","vid":"1495267869123454","uid":"964226522333224","gold":7941,"platform":"ios","timestamp":1497090722097}
64 {"id":"865456863256320","vid":"1495267869123452","uid":"964226522333222","gold":6188,"platform":"android","timestamp":1497090722197}
65 {"id":"865456863256325","vid":"1495267869123454","uid":"964226522333224","gold":2387,"platform":"android","timestamp":1497090722297}
66 {"id":"865456863256322","vid":"1495267869123450","uid":"964226522333220","gold":2980,"platform":"ios","timestamp":1497090722397}
67 {"id":"865456863256321","vid":"1495267869123452","uid":"964226522333222","gold":9403,"platform":"android","timestamp":1497090722497}
68 {"id":"865456863256323","vid":"1495267869123453","uid":"964226522333223","gold":3482,"platform":"android","timestamp":1497090722597}
69 {"id":"865456863256324","vid":"1495267869123454","uid":"964226522333224","gold":3290,"platform":"android","timestamp":1497090722697}
70 {"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":1439,"platform":"android","timestamp":1497090722797}
71 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":6758,"platform":"ios","timestamp":1497090722897}
72 {"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":3501,"platform":"ios","timestamp":1497090722997}
73 {"id":"865456863256325","vid":"1495267869123454","uid":"964226522333224","gold":7904,"platform":"ios","timestamp":1497090783097}
74 {"id":"865456863256326","vid":"1495267869123453","uid":"964226522333223","gold":9900,"platform":"android","timestamp":1497090783197}
75 {"id":"865456863256320","vid":"1495267869123452","uid":"964226522333222","gold":1841,"platform":"ios","timestamp":1497090783297}
76 {"id":"865456863256322","vid":"1495267869123453","uid":"964226522333223","gold":8857,"platform":"ios","timestamp":1497090783397}
77 {"id":"865456863256328","vid":"1495267869123450","uid":"964226522333220","gold":7855,"platform":"android","timestamp":1497090783497}
78 {"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":7165,"platform":"android","timestamp":1497090783597}
79 {"id":"865456863256326","vid":"1495267869123450","uid":"964226522333220","gold":2247,"platform":"ios","timestamp":1497090783697}
80 {"id":"865456863256329","vid":"1495267869123454","uid":"964226522333224","gold":1742,"platform":"android","timestamp":1497090783797}
81 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":9122,"platform":"ios","timestamp":1497090783897}
82 {"id":"865456863256325","vid":"1495267869123453","uid":"964226522333223","gold":1623,"platform":"android","timestamp":1497090783997}
83 {"id":"865456863256324","vid":"1495267869123450","uid":"964226522333220","gold":8354,"platform":"ios","timestamp":1497090844098}
84 {"id":"865456863256321","vid":"1495267869123454","uid":"964226522333224","gold":3808,"platform":"ios","timestamp":1497090844198}
85 {"id":"865456863256326","vid":"1495267869123451","uid":"964226522333221","gold":9875,"platform":"android","timestamp":1497090844298}
86 {"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":2714,"platform":"ios","timestamp":1497090844398}
87 {"id":"865456863256326","vid":"1495267869123454","uid":"964226522333224","gold":3660,"platform":"ios","timestamp":1497090844498}
88 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":8545,"platform":"ios","timestamp":1497090844598}
89 {"id":"865456863256325","vid":"1495267869123453","uid":"964226522333223","gold":5757,"platform":"android","timestamp":1497090844698}
90 {"id":"865456863256320","vid":"1495267869123450","uid":"964226522333220","gold":7898,"platform":"android","timestamp":1497090844798}
91 {"id":"865456863256329","vid":"1495267869123453","uid":"964226522333223","gold":3633,"platform":"ios","timestamp":1497090844898}
92 {"id":"865456863256329","vid":"1495267869123452","uid":"964226522333222","gold":6500,"platform":"android","timestamp":1497090844998}
93 {"id":"865456863256323","vid":"1495267869123450","uid":"964226522333220","gold":8859,"platform":"ios","timestamp":1497090905098}
94 {"id":"865456863256322","vid":"1495267869123452","uid":"964226522333222","gold":3897,"platform":"android","timestamp":1497090905198}
95 {"id":"865456863256326","vid":"1495267869123451","uid":"964226522333221","gold":5786,"platform":"ios","timestamp":1497090905298}
96 {"id":"865456863256321","vid":"1495267869123451","uid":"964226522333221","gold":2667,"platform":"android","timestamp":1497090905398}
97 {"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":4038,"platform":"android","timestamp":1497090905499}
98 {"id":"865456863256328","vid":"1495267869123451","uid":"964226522333221","gold":361,"platform":"android","timestamp":1497090905599}
99 {"id":"865456863256326","vid":"1495267869123454","uid":"964226522333224","gold":7074,"platform":"android","timestamp":1497090905699}
100 {"id":"865456863256323","vid":"1495267869123451","uid":"964226522333221","gold":89,"platform":"android","timestamp":1497090905799}
101 {"id":"865456863256325","vid":"1495267869123450","uid":"964226522333220","gold":1354,"platform":"android","timestamp":1497090905899}
102 {"id":"865456863256326","vid":"1495267869123452","uid":"964226522333222","gold":221,"platform":"ios","timestamp":1497090905999}
103 {"id":"865456863256325","vid":"1495267869123451","uid":"964226522333221","gold":436,"platform":"android","timestamp":1497090966099}
104 {"id":"865456863256327","vid":"1495267869123451","uid":"964226522333221","gold":8000,"platform":"android","timestamp":1497090966199}
105 {"id":"865456863256324","vid":"1495267869123453","uid":"964226522333223","gold":9952,"platform":"android","timestamp":1497090966299}
106 {"id":"865456863256321","vid":"1495267869123451","uid":"964226522333221","gold":2216,"platform":"android","timestamp":1497090966400}
107 {"id":"865456863256320","vid":"1495267869123452","uid":"964226522333222","gold":2042,"platform":"android","timestamp":1497090966500}
108 {"id":"865456863256329","vid":"1495267869123451","uid":"964226522333221","gold":8739,"platform":"ios","timestamp":1497090966600}
109 {"id":"865456863256322","vid":"1495267869123452","uid":"964226522333222","gold":2500,"platform":"ios","timestamp":1497090966701}
110 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":9803,"platform":"ios","timestamp":1497090966801}
111 {"id":"865456863256328","vid":"1495267869123450","uid":"964226522333220","gold":7246,"platform":"android","timestamp":1497090966901}
112 {"id":"865456863256320","vid":"1495267869123454","uid":"964226522333224","gold":5220,"platform":"android","timestamp":1497090967001}

 

 

 参考代码KafkaProducer.java

1 package yehua.kafkaDemo;
2
3 import java.util.Properties;
4 import java.util.Random;
5
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8
9 public class KafkaProducer {
10
11 public static void main(String[] args) throws Exception {
12 Properties props = new Properties();
13 props.put("bootstrap.servers", "master:9092");
14 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
15 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
16 //String topic = "gold_log_r2p5";
17 String topic = "test";
18
19 Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
20 int count = 0;
21 //{"id":"865456863256326","vid":"1495267869123456","uid":"965406863256326","gold":150,"platform":"ios","timestamp":1495267869}
22 //模拟送礼人id
23 String[] idArr = {"865456863256320","865456863256321","865456863256322","865456863256323","865456863256324","865456863256325","865456863256326","865456863256327","865456863256328","865456863256329"};
24 //模拟直播间视频id
25 String[] vidArr = {"1495267869123450","1495267869123451","1495267869123452","1495267869123453","1495267869123454"};
26 //模拟直播用户id
27 String[] uidArr = {"964226522333220","964226522333221","964226522333222","964226522333223","964226522333224"};
28 //模拟用户手机平台
29 String[] platformArr = {"android","ios"};
30 Random random = new Random();
31 while(true){
32 int rint1 = random.nextInt(10);
33 int rint2 = random.nextInt(5);
34 int rint3 = random.nextInt(2);
35 String log = "{\"id\":\""+idArr[rint1]+"\",\"vid\":\""+vidArr[rint2]+"\",\"uid\":\""+uidArr[rint2]+"\",\"gold\":"+random.nextInt(10000)+",\"platform\":\""+platformArr[rint3]+"\",\"timestamp\":"+System.currentTimeMillis()+"}";
36 //producer.send(new ProducerRecord<String, String>(topic, log));
37 System.out.println(log);
38 count++;
39 Thread.sleep(100);
40 if(count%10 == 0){
41 //break;
42 Thread.sleep(1000*60);
43 }
44 }
45 }
46
47 }

 

 

 

storm的代码实现_android_03

 

 先在kafka创建topic

storm的代码实现_android_04

两个副本5个分区

 storm的代码实现_android_05

可以看到topic创建成功

storm的代码实现_ios_06

总结一下前面的流程

storm的代码实现_ios_07

下面再新建一个maven项目stormpProject0521

依赖文件:

storm的代码实现_android_08

storm的代码实现_apache_09

 

 依赖文件pom.xml参考代码

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>yehua</groupId>
<artifactId>stormpProject0521</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>stormpProject0521</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<!-- 这个依赖只在编译时有用,运行时就不需要了,因为storm集群中有 -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<!-- 表示这个依赖只在编译代码的时候使用,打包的时候不用 -->
<scope>provided</scope>
</dependency>
<!-- 主要为了使用kafkaspout -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
<!-- 过滤掉 slf4j-log4j12-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 注意,kafka0.9开始需要在这里配置kafka client依赖,否则会报错 java.lang.NoSuchMethodError:
org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.1</version>
</dependency>
<!-- dbunits -->
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.6</version>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.29</version>
</dependency>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.10</version>
</dependency>
<!-- redis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

  

参考代码LogProcessTopology.java

1 package yehua.stormpProject0521;
2
3 import org.apache.storm.Config;
4 import org.apache.storm.LocalCluster;
5 import org.apache.storm.StormSubmitter;
6 import org.apache.storm.generated.AlreadyAliveException;
7 import org.apache.storm.generated.AuthorizationException;
8 import org.apache.storm.generated.InvalidTopologyException;
9 import org.apache.storm.generated.StormTopology;
10 import org.apache.storm.kafka.BrokerHosts;
11 import org.apache.storm.kafka.KafkaSpout;
12 import org.apache.storm.kafka.SpoutConfig;
13 import org.apache.storm.kafka.StringScheme;
14 import org.apache.storm.kafka.ZkHosts;
15 import org.apache.storm.spout.SchemeAsMultiScheme;
16 import org.apache.storm.topology.TopologyBuilder;
17
18 import yehua.stormpProject0521.bolt.LogProcessBolt1;
19 import yehua.stormpProject0521.bolt.LogProcessBolt2;
20 import yehua.stormpProject0521.bolt.ParseLogBolt;
21
22 public class LogProcessTopology {
23
24 public static void main(String[] args) {
25 TopologyBuilder topologyBuilder = new TopologyBuilder();
26 String topology_name = LogProcessTopology.class.getSimpleName();
27 String SPOUT_ID = KafkaSpout.class.getSimpleName();
28 String BOLT_ID_1 = ParseLogBolt.class.getSimpleName();
29 String BOLT_ID_2 = LogProcessBolt1.class.getSimpleName();
30 String BOLT_ID_3 = LogProcessBolt2.class.getSimpleName();
31
32
33
34
35 BrokerHosts hosts = new ZkHosts("hadoop100:2181");//设置zk地址,为了找到kafka
36 String topic = "gold_log_r2p5";//topic
37 String zkRoot = "/kafkaSpout";//storm会通过kafkaspout消费kafka中的数据,具体消费的offset信息会保存到这个节点下面
38 String id = "consumer_gold_log";//可以理解为groupid
39 SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
40 //表示吧spout输出的数据使用字符串进行解析,这样在bolt中取数据的时候,就可以之间获取字符串了
41 spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
42 //注意:第一次消费数据的话,默认会从topic的最早的数据进行消费
43 //storm通过kafkaspout消费topic里面数据的时候,如果zkRoot中没有保存消费的offset,那么久会根据startOffsetTime的值来消费topic中的数据
44 //spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();/最早的数据
45 spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();//最新的数据
46
47
48
49 topologyBuilder.setSpout(SPOUT_ID,new KafkaSpout(spoutConf),5);
50 //可以实现多个kafkaspout
51 //topologyBuilder.setSpout("newSpout",new KafkaSpout(spoutConfNew),5);
52
53 topologyBuilder.setBolt(BOLT_ID_1, new ParseLogBolt(),2).setNumTasks(6).shuffleGrouping(SPOUT_ID);
54 //LogProcessBolt1 这个bolt只能使用一个线程执行 globalGrouping可以保证数据只让一个线程去处理
55 topologyBuilder.setBolt(BOLT_ID_2, new LogProcessBolt1()).globalGrouping(BOLT_ID_1);
56 topologyBuilder.setBolt(BOLT_ID_3, new LogProcessBolt2(),2).shuffleGrouping(BOLT_ID_1);
57
58 Config config = new Config();
59 //config.setNumWorkers(2);//使用两个worker
60 config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx1024m");//给worker指定内存
61 config.setMaxSpoutPending(1000);//限制内存中未处理的tuple个数最多为1000
62 StormTopology createTopology = topologyBuilder.createTopology();
63
64 if(args.length==0){
65 LocalCluster localCluster = new LocalCluster();
66 localCluster.submitTopology(topology_name, config, createTopology);
67 }else{
68 try {
69 StormSubmitter.submitTopology(topology_name, config, createTopology);
70 } catch (AlreadyAliveException e) {
71 e.printStackTrace();
72 } catch (InvalidTopologyException e) {
73 e.printStackTrace();
74 } catch (AuthorizationException e) {
75 e.printStackTrace();
76 }
77 }
78
79 }
80
81 }

 

 

参考代码ParseLogBolt.java

1 package yehua.stormpProject0521.bolt;
2
3 import java.util.HashMap;
4 import java.util.Map;
5
6 import org.apache.storm.Config;
7 import org.apache.storm.Constants;
8 import org.apache.storm.task.OutputCollector;
9 import org.apache.storm.task.TopologyContext;
10 import org.apache.storm.topology.OutputFieldsDeclarer;
11 import org.apache.storm.topology.base.BaseRichBolt;
12 import org.apache.storm.tuple.Fields;
13 import org.apache.storm.tuple.Tuple;
14 import org.apache.storm.tuple.Values;
15
16 import com.alibaba.fastjson.JSON;
17 import com.alibaba.fastjson.JSONObject;
18
19 /**
20 * 主要对数据进行解析,把关键字段解析出来,发射出去
21 * @author yehua
22 *
23 */
24 public class ParseLogBolt extends BaseRichBolt {
25 private Map stormConf;
26 private TopologyContext context;
27 private OutputCollector collector;
28 private Map<String, String> idCountryMap;
29 @Override
30 public void prepare(Map stormConf, TopologyContext context,
31 OutputCollector collector) {
32 this.stormConf = stormConf;
33 this.context = context;
34 this.collector = collector;
35 //在初始化的时候从redis中把送礼人id和省份信息加载过来,后期在storm的定时任务中每半个小时同步一次,把新注册用户的信息拉取过来
36 /*RedisUtils redisUtils = new RedisUtils();
37 List<String> list = redisUtils.lrange("all_id_province", 0, -1);
38 for (String id_country : list) {
39 String[] splits = id_country.split("\t");
40 idCountryMap.put(splits[0], splits[1]);
41 }
42 redisUtils.close();*/
43 idCountryMap = new HashMap<String, String>();
44 idCountryMap.put("865456863256320", "京");
45 idCountryMap.put("865456863256321", "津");
46 idCountryMap.put("865456863256322", "冀");
47 idCountryMap.put("865456863256323", "晋");
48 idCountryMap.put("865456863256324", "辽");
49 idCountryMap.put("865456863256325", "黑");
50 idCountryMap.put("865456863256326", "沪");
51 idCountryMap.put("865456863256327", "苏");
52 idCountryMap.put("865456863256328", "浙");
53 idCountryMap.put("865456863256329", "皖");
54 }
55
56 @Override
57 public void execute(Tuple input) {
58 if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
59 //执行定时同步用户静态信息的代码
60 //定时向idCountryMap中更新数据,每次更新只需要把新增的数据读取过来即可,属于增量读取
61 /*RedisUtils redisUtils = new RedisUtils();
62 String poll = redisUtils.poll("new_id_country");
63 while(poll!=null){
64 String[] splits = poll.split("\t");
65 idCountryMap.put(splits[0], splits[1]);
66 poll = redisUtils.poll("new_id_country");
67 }*/
68 }else{
69 try {
70 //String log = new String(input.getBinaryByField("bytes"));
71 String log = input.getStringByField("str");
72 JSONObject logObj = JSON.parseObject(log);
73 String id = logObj.getString("id");
74 String province = idCountryMap.getOrDefault(id, "其它");//用户省份信息
75 Integer gold = logObj.getInteger("gold");//金币
76 this.collector.emit(new Values(province,gold));
77 this.collector.ack(input);
78 } catch (Exception e) {
79 this.collector.fail(input);
80 }
81 }
82 }
83
84 @Override
85 public void declareOutputFields(OutputFieldsDeclarer declarer) {
86 declarer.declare(new Fields("province","gold"));
87 }
88
89 @Override
90 public Map<String, Object> getComponentConfiguration() {
91 HashMap<String, Object> hashMap = new HashMap<String, Object>();
92 hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60*30);
93 return hashMap;
94 }
95
96
97
98 }

 

参考代码LogProcessBolt1.java

1 package yehua.stormpProject0521.bolt;
2
3 import java.sql.Connection;
4 import java.sql.SQLException;
5 import java.sql.Statement;
6 import java.util.Date;
7 import java.util.HashMap;
8 import java.util.Map;
9
10 import org.apache.storm.Config;
11 import org.apache.storm.Constants;
12 import org.apache.storm.task.OutputCollector;
13 import org.apache.storm.task.TopologyContext;
14 import org.apache.storm.topology.OutputFieldsDeclarer;
15 import org.apache.storm.topology.base.BaseRichBolt;
16 import org.apache.storm.trident.operation.builtin.Sum;
17 import org.apache.storm.tuple.Tuple;
18
19 import yehua.stormpProject0521.utils.MyDateUtils;
20 import yehua.stormpProject0521.utils.MyDbUtils;
21
22 /**
23 * 统计一下全网金币消耗数据(2分钟)(折线图)
24 * 每隔两分钟统计一下全网金币消耗数据(2分钟)(折线图)
25 * 1 1526 2017-01-01 00:00:00
26 * 2 2560 2017-01-01 00:02:00
27 * 3 1560 2017-01-01 00:04:00
28 * 4 1960 2017-01-01 00:06:00
29 * @author yehua
30 *
31 */
32 public class LogProcessBolt1 extends BaseRichBolt {
33
34 @Override
35 public void prepare(Map stormConf, TopologyContext context,
36 OutputCollector collector) {
37
38 }
39 int sum = 0;
40 private Connection connection = null;
41 @Override
42 public void execute(Tuple input) {
43 if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
44 //定时任务
45 try {
46 String curr_time = MyDateUtils.formatDate2(new Date());
47 connection = MyDbUtils.getConnection();
48 Statement state = connection.createStatement();
49 state.executeUpdate("insert into result1 (gold,time) values("+sum+",'"+curr_time+"')");
50 System.out.println("入库成功!");
51 sum = 0;//注意,需要把sum重置为0
52 } catch (SQLException e) {
53 System.out.println("执行错误!");
54 }finally{
55 if(connection!=null){
56 try {
57 connection.close();
58 } catch (SQLException e) {
59 e.printStackTrace();
60 }
61 }
62 }
63 }else{
64 Integer gold = input.getIntegerByField("gold");
65 sum+=gold;
66 }
67
68 }
69
70 @Override
71 public void declareOutputFields(OutputFieldsDeclarer declarer) {
72
73 }
74
75 @Override
76 public Map<String, Object> getComponentConfiguration() {
77 HashMap<String, Object> hashMap = new HashMap<String, Object>();
78 hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60*2);
79 return hashMap;
80 }
81
82
83 }

 

 

参考代码LogProcessBolt2.java

1 package yehua.stormpProject0521.bolt;
2
3 import java.sql.Connection;
4 import java.sql.ResultSet;
5 import java.sql.SQLException;
6 import java.sql.Statement;
7 import java.util.Date;
8 import java.util.HashMap;
9 import java.util.Map;
10 import java.util.Map.Entry;
11
12 import org.apache.commons.collections.MapUtils;
13 import org.apache.storm.Config;
14 import org.apache.storm.Constants;
15 import org.apache.storm.task.OutputCollector;
16 import org.apache.storm.task.TopologyContext;
17 import org.apache.storm.topology.OutputFieldsDeclarer;
18 import org.apache.storm.topology.base.BaseRichBolt;
19 import org.apache.storm.tuple.Tuple;
20
21 import yehua.stormpProject0521.utils.DistributedLock;
22 import yehua.stormpProject0521.utils.MyDateUtils;
23 import yehua.stormpProject0521.utils.MyDbUtils;
24
25 /**
26 * 统计不同省份的金币消耗数据(1分钟)(柱状图)
27 * 1 京 9200 2017-01-01
28 * 2 津 5508 2017-01-01
29 * 3 京 8562 2017-01-02
30 * 4 津 4586 2017-01-02
31 * 5 京 8954 2017-01-03
32 * 6 津 2563 2017-01-03
33 *
34 * @author yehua
35 *
36 */
37 public class LogProcessBolt2 extends BaseRichBolt {
38 private DistributedLock lock;
39 @Override
40 public void prepare(Map stormConf, TopologyContext context,
41 OutputCollector collector) {
42 this.lock = new DistributedLock("hadoop100:2181","test");
43 }
44 private Map<String, Integer> province_gold_map = new HashMap<String, Integer>();
45 private Connection connection = null;
46 @Override
47 public void execute(Tuple input) {
48 if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
49 //定时任务
50 String curr_time = MyDateUtils.formatDate4(new Date());
51 try {
52 connection = MyDbUtils.getConnection();
53 Statement state = connection.createStatement();
54 lock.lock();//上锁
55 for (Entry<String, Integer> entry : province_gold_map.entrySet()) {
56 String province = entry.getKey();
57 Integer gold = entry.getValue();
58 // 入库之前,需要先查询一下,如果有数据,则执行更新操作,如果没有,则插入
59 state.execute("select id,province,gold from result2 where province = '"+province+"' and time = '"+curr_time+"'");
60 ResultSet resultSet = state.getResultSet();
61 if(resultSet.next()){//有数据
62 int id = resultSet.getInt(1);
63 int count = resultSet.getInt(3);
64 count+=gold;
65 state.executeUpdate("update result2 set gold = "+count+" where id = "+id);
66 }else{
67 state.executeUpdate("insert into result2(province,gold,time) values('"+province+"',"+gold+",'"+curr_time+"')");
68 }
69 }
70 System.out.println("执行入库成功: "+province_gold_map.size());
71 province_gold_map.clear();//注意,一定要把临时结果清空
72 } catch (SQLException e) {
73 e.printStackTrace();
74 }finally{
75 if(connection!=null){
76 try {
77 connection.close();
78 } catch (SQLException e) {
79 e.printStackTrace();
80 }
81 }
82 lock.unlock();//释放锁
83 }
84
85 }else{
86 String province = input.getStringByField("province");//省份信息
87 Integer gold = input.getIntegerByField("gold");
88 province_gold_map.put(province, MapUtils.getInteger(province_gold_map, province, 0)+gold);
89 }
90
91 }
92
93 @Override
94 public void declareOutputFields(OutputFieldsDeclarer declarer) {
95
96 }
97
98 @Override
99 public Map<String, Object> getComponentConfiguration() {
100 HashMap<String, Object> hashMap = new HashMap<String, Object>();
101 hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
102 return hashMap;
103 }
104
105 @Override
106 public void cleanup() {
107 this.lock.closeZk();//关闭分布式共享锁使用的zk链接
108 }
109
110
111
112 }

 

举报

相关推荐

0 条评论