0
点赞
收藏
分享

微信扫一扫

Storm应用系列之——Topology部署


本系列源码地址: https://github.com/EdisonXu/storm-samples

https://github.com/baijian/storm-java

https://github.com/ashrithr/storm-helloworld

根据前文介绍,我们知道,storm的任务是包装在topology类中,由nimbus提交分配到整个cluster。

Topology有两种大类提交部署方式:

 

  • 提交到本地模式,一般用于调试。该模式下由于是起在同一个JVM进程下,所以不要让其负载太高。
  • 提交到集群模式。

提交到本地模式



这个非常的简单。



1. 编写代码


public static void main(String[] args) throws Exception {  
  
new LocalRunningTopology();  
new Config();  
true);  
  
new LocalCluster();  
"test", conf, topo.buildTopology());  
100000);  
"test");  
        cluster.shutdown();  
    }  
}



提交到集群



1. 编写代码


public static void main(String[] args) throws Exception {  
  
"test";  
          
new ClusterRunningTopology();  
new Config();  
true);  
  
3);  
  
        StormSubmitter.submitTopology(topoName, conf, topo.buildTopology());  
    }  
}



 



2. 编译jar包



mvn clean install 生成jar



 



3. 上传至nimbus部署


./storm jar storm-samples.jar com.edi.storm.topos.ClusterRunningTopology



实际开发时常用提交模式



实际开发时,我们往往是把本地和集群混绑在一起,用传入参数以示区别


public static void main(String[] args) throws Exception {  
          
new ExclaimBasicTopo();  
new Config();  
false);  
  
if (args != null && args.length > 0) {  
3);  
  
0], conf, topo.buildTopology());  
else {  
  
new LocalCluster();  
"test", conf, topo.buildTopology());  
100000);  
"test");  
            cluster.shutdown();  
        }  
    }



./storm jar storm-samples.jar com.edi.storm.topos.ClusterRunningTopology <TOPO_NAME>



填上一个集群唯一的<TOPO_NAME>即可。





有人又说了,这样还不是很方便,我能不能直接在IDE里面提交到storm集群?

可以。

 

IDE直接提交至集群

修改上面提交集群的代码如下:

 

public static void main(String[] args) throws Exception {  
  
"test";  
new ExclaimBasicTopo();  
new Config();  
false);  
  
class.getClassLoader().getResource("").getPath());  
        ClassLoader classLoader = EJob.getClassLoader();  
        Thread.currentThread().setContextClassLoader(classLoader);  
          
//System.setProperty("storm.jar", Class.forName("com.edi.storm.topos.RemoteRunningTopology").getProtectionDomain().getCodeSource().getLocation().getPath());  
"storm.jar", jarFile.toString());  
5);  
false);  
"10.1.110.24");  
//conf.put(Config.NIMBUS_THRIFT_PORT, 8889);  
        StormSubmitter.submitTopology(topoName, conf, topo.buildTopology());  
    }


起作用的部分主要有三点:

 

1. 设置系统变量"storm.jar"。这个变量的值代表要部署的Topology的jar包地址。

这个地址必须是文件,所以,我们就可以写完代码后自己打个jar包放在某个固定位置,然后IDE直接运行该topology去集群提交部署。

当然,也可以在代码中打jar,所以我这里的代码中加入了一个打包的Utilities类,EJob。

 

2. 设置参数Config.NIMBUS_HOST,其值为nimbus的hostname或ip地址。

3. 设置参数Config.NIMBUS_THRIFT_PORT,其值为nimbus上Thrift接口的地址,也就是nimbus的conf/storm.yaml中参数nimbus.thrift.port的值,前提是你配了。如果没配,可以不设。

 

这样就可以直接在IDE里面运行提交上去了。

 

Topology提交原理

Topology提交后发生了什么呢?这个原理要放在这里讲了。因为这直接关系到对Strom运行概念的理解。

1. Nimbus$Iface的beginFileUpload,uploadChunk以及finishFileUpload方法将运行的包上传至其数据目录(storm.yaml中storm.local.dir对应的目录)下的inbox目录。

 

/{storm.local.dir}  
  |  
  | - /nimbus  
        |  
        | - /inbox  
              |  
              | - /stormjar-{uuid}.jar

 

不论上传的包名字是什么,最终会变成stormjar-{uuid}.jar。

2. Nimbus$Iface的submitTopology方法会负责对这个topology进行处理,首先是对Storm本身及topology进行一些校验:

a. 检查Storm状态是否active

b. 检查是否有同名topology在运行

c. 检查是否有同id的spout和bolt,以及其id是否合法。任何一个id都不能以"_"开头,这种命名方式是系统保留的。

3. 建立topology的本地目录

 



/{storm.local.dir}  
    |  
    | - /nimbus  
          |  
          | - /inbox  
          | - /stormdist  
                |  
                | - /{topology-id}  
                        |  
                        | - /stormjar.jar -- 包含这个topology所有代码的jar包(从nimbus/inbox挪过来)  
                        |  
                        | -/stormcode.ser -- 这个topology对象的序列化  
                        | -/stormconf.ser -- 运行这个topology的配置

 

4. 建立该topology在zookeeper上的心跳目录

nimbus老兄是个有责任心的人,它虽然最终会把任务分成一个个task让supervisor去做,但是它时刻在关注着大家的情况,所以它要求每个task每隔一定时间就要给它打个招呼(心跳信息),让它知道事情还在正常发展。如果有task超时不打招呼,nimbus会人为这个task不行了,然后进行重新分配。zookeeper上的心跳目录:

 



/<span style="font-family: Consolas, 'Liberation Mono', Courier, monospace;">{storm.zookeeper.root}</span>  
  
  |  
  | - /workerbeats  
         |  
         | - {topology-id}  
                |  
                | - /{task-id}  -- task的心跳信息,包括心跳的时间,task运行时间以及一些统计信息


 

 

5. 计算topology的工作量

nimbus会根据topology中给的parallelism hint参数,来给spout/bolt设定task数目,并分配相应的task-id,然后把分配号的task信息写到zookeeper上去:

 

/{storm.zookeeper.root}  
  |  
  | - /assignments  
        |  
        | - /{topology-id}



6. 保存toplogy信息到zookeeper

 

/{storm.zookeeper.root}  
    |  
    | - /storms  
          |  
          | - /{topology-id}


 

7. supervisor因为监听了zookeeper上的目录,所以当它发现有topology时,会先把所有的topology的信息如jar等下到本地,并删除不再运行的topology的本地信息

 

/{storm.local.dir}  
    |  
    | - /supervisor  
          |  
          | - stormdist  
               |  
               | - {topology-id}  
                      |  
                      | - stormcode.ser  
                      | - stormconf.ser  
                      | - stormjar.jar



8. supervisor根据分配的任务,去启动worker去处理assignment

 

9. worker启动后,会去zookeeper上找其对应的task。同时根据task的outbound信息建立对外的socket连接,将来发送tuple就是从这些socket连接发出去的。

到这里,一个topology就已经完全部署和运转起来了。

 

举报

相关推荐

0 条评论