目录
Alibaba JStorm集群部署
部署JStorm集群主要分为2个步骤,首先部署JStorm引擎然后再部署JStorm UI。集群部署目前支持如下三种模式:
- Standalone,JStorm集群单独部署不依赖外部系统,比如Yarn或Docker Swarm;
- Hadoop Yarn,该模式下JStorm将运行在Yarn上,JStorm客户端会在Yarn提交一个Application, 这个Application就是一个JStorm 逻辑集群;
- Docker,在集群中创建一批docker, 这批docker 组成一个JStorm 逻辑集群;
Standalone 部署方式
Standalone是最简单、 最轻量、 最稳定也是最常用的部署方式,集群整体规模不超过300台时建议采用该方式。安装步骤(假设依赖的Zookeeper集群已经搭建完毕)如下:
安装Java
强烈建议安装JDK8,如果机器上已经安装了JDK8则无需再安装。需要注意的是,(1)如果当前系统是64位系统则需要下载64位JDK,如果是32为系统则下载32位JDK;(2)JStorm 2.x 版本开始,要求JDK版本必须等于或高于JDK7。
检查java版本时在命令行下执行java -version即可, 如果找不到java或java版本低于7,则需要设置PATH环境变量或安装JDK7 (强烈建议JDK8)。
检查本机IP
检查机器ip是否正确在命令行执行hostname -i即可,如果返回“127.0.0.1”则说明机器没有配置正确的ip, 需要设定/etc/hosts或网卡配置直到hostname -i返回一个正确的ip;
部署JStorm集群
以jstorm-2.1.1.zip为例进行部署,解压二进制软件包,修改环境变量并保存:
[staff@workstation ~]$ unzip jstorm-2.1.1.zip [staff@workstation ~]$ vi ~/.bashrc export JSTORM_HOME=/XXXXX/XXXX export PATH=$PATH:$JSTORM_HOME/bin
编辑JStorm配置文件$JSTORM_HOME/conf/storm.yaml,基本配置项如下:
storm.zookeeper.servers: 表示Zookeeper的地址 storm.zookeeper.root: 表示JStorm在Zookeeper中的根目录,当多个JStorm共享一个Zookeeper时需要设置该选项,默认路径为"/jstorm" nimbus.host: 表示nimbus的地址,填写ip storm.local.dir: 表示JStorm临时数据存放目录,需要保证JStorm程序对该目录有写权限
其他详细配置,请参考配置详解:https://github.com/alibaba/jstorm/wiki/JStorm-Configuration。此外部署其他节点时,请确保其他节点的“临时数据存放”目录为空,“临时数据存放”目录通过$JSTORM_HOME/conf/storm.yaml中storm.local.dir选项指定。
启动JStorm集群
nimbus节点上执行 “nohup jstorm nimbus &”,查看$JSTORM_HOME/logs/nimbus.log检查有无错误。
supervisor节点上执行 “nohup jstorm supervisor &”,查看$JSTORM_HOME/logs/supervisor.log检查有无错误。
Hadoop Yarn 部署方式
当规模超过300台时,可以考虑基于Hadoop Yarn方式进行部署。
优点:
- 天然支持多租户,一个物理集群上可以创建多个JStorm逻辑集群,每一个JStorm逻辑集群可以认为是一个bu或team服务,并且逻辑集群和逻辑集群之间天然支持资源隔离;
- 提高资源利用率,大量的小standalone 集群, 必然出现有的集群利用率低有的集群利用率高的情况,并且利用率低的集群资源无法共享给利用率高的集群, 大家集中在一起必然可以削峰填谷以提供更多的资源;
- 集群运维便利性, 创建新集群,以及后续升级、扩容及缩容都会极为方便,而且集群资源还可以和SparkStreaming、Flink等计算引擎混用;
缺点:
- 稳定性比standalone有所下降,如果Yarn集群与离线应用混在一起,离线应用很容易将网卡打满进而抢占在线应用的计算资源,因此建议实时和离线应用依托的Yarn集群分开部署,实现物理集群隔离;
- 外部依赖比较重, 依赖一个重量级的Hadoop Yarn集群,集群规模至少在30台以上才有必要采用Yarn模式部署;
Docker部署方式
资源池比较林散且公司有成熟的docker 运维平台时,可以考虑容器化部署方式。容器化部署优缺点如下:
优点:
- 天然支持多租户,一个物理集群上可以创建多个JStorm逻辑集群,每个JStorm逻辑集群就可以认为是一个bu或team服务,并且逻辑集群和逻辑集群之间天然支持资源隔离;
- 提高资源利用率,大量的小standalone 集群, 必然出现有的集群利用率低有的集群利用率高的情况,并且利用率低的集群资源无法共享给利用率高的集群, 大家集中在一起必然可以削峰填谷以提供更多的资源;
- 集群运维便利性, 创建新集群,以及后续升级、扩容及缩容都会极为方便;
缺点:
- 稳定性比standalone有所下降,因为standalone是完全独占机器或资源,而Docker 很有可能会出现凌晨多个docker挤在一台机器上,而白天再部署到多台机器上,会有一个颠簸时间;
- 外部依赖比较重, 需要外部有一个功能完善的Docker运维管控平台才能完成;
- 相对直接在物理机上运行standalone的方式,Docker部署时性能会有一点点的损耗,因为Docker增加了一层虚拟化,不过只有在极限测试的情况下才能看得出来。真实情况下一般性能瓶颈在应用方是感受不出来, 因此使用方无需担心该问题。
目前公司采用的是Apache Mesos+Marathon+Docker方式来部署JStorm集群,其中Mesos+Marathon负责集群资源调度,后续将说明采用这种部署方式的优点。
JStorm WebUI 部署
概述
WebUI的安装部署和JStorm是完全独立,并不要求WebUI的机器必须在JStorm集群的机器上。JStorm 0.9.6.1 后一个Web UI可以管理多个集群,只需在Web UI的配置文件中增加新集群的配置即可。
随着集群规模不断增大, 集群的数量可能也会越来越多,尤其是一些小集群也逐渐增多,如果继续一个集群一个Web UI,PE的工作量自然非常巨大。对于一些线上应用每搭建一个Web UI需要层层审批, 比如VIP 申请、开放端口申请、域名申请等每一个环节都十分痛苦,因此强烈需要一个Web UI管理多个JStorm集群。
安装Web UI
tomcat必须使用7.x版本, 需要注意是不要忘记拷贝 ~/.jstorm/storm.yaml,此外Web UI可以和Nimbus不在同一个节点。
[staff@workstation ~]$ mkdir ~/.jstorm [staff@workstation ~]$ cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm # 下载tomcat 7.x (以apache-tomcat-7.0.37 为例) [staff@workstation ~]$ tar -xzf apache-tomcat-7.0.37.tar.gz [staff@workstation ~]$ cd apache-tomcat-7.0.37 [staff@workstation apache-tomcat-7.0.37]$ cd webapps [staff@workstation webapps]$ cp $JSTORM_HOME/jstorm-ui-2.1.1.war ./ [staff@workstation webapps]$ mv ROOT ROOT.old [staff@workstation webapps]$ ln -s jstorm-ui-2.1.1 ROOT #这个地方可能变化需要根据实际JStorm版本来确定,当0.9.6.1时执行ln -s jstorm-0.9.6.1 ROOT,而不是ln -s jstorm-ui-0.9.6.3.war ROOT 这个要注意 [staff@workstation webapps]$ cd ../bin [staff@workstation bin]$ ./startup.sh
WebUI增加新集群
Web UI版本必须和集群中JStorm最高的版本保持一致,运行Web UI的机器上需要修改~/.jstorm/storm.yaml,把默认的ui.clusters注释给去掉并补充:
# UI MultiCluster # Following is an example of multicluster UI configuration ui.clusters: - { name: "jstorm.share", zkRoot: "/jstorm", zkServers: [ "zk.test1.com", "zk.test2.com", "zk.test3.com"], zkPort: 2181, } - { name: "jstorm.bu1", zkRoot: "/jstorm.dw", zkServers: [ "10.125.100.101", "10.125.100.101", "10.125.100.101"], zkPort: 2181, }
其中每一个JSON节点都代表一个小的逻辑集群:
- { name: "jstorm.bu1", --- 这个是集群名字,每个集群名字必须不一样 zkRoot: "/jstorm.dw", --- 这个集群Zookeeper根节点,可以参考$JSTORM_HOME/con/storm.yaml中“storm.zookeeper.root”字段 zkServers: [ "10.125.100.101", "10.125.100.101", "10.125.100.101"], -- Zookeeper机器列表 zkPort: 2181, -- Zookeeper客户端端口 }
目前公司采用的就是一个Web UI同时管理多个JStorm逻辑集群的方式。
JStorm应用开发入门
本文帮助读者快速实现一个JStorm例子, 初学者可以试运行源码包里的Example,以获得更直接的感受:Example源码
生成Topology
Map conf = new HashMap(); //topology所有自定义的配置均放入这个Map TopologyBuilder builder = new TopologyBuilder(); //创建topology的生成器 int spoutParal = get("spout.parallel", 1); //获取spout的并发设置 SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,new SequenceSpout(), spoutParal); //创建Spout, 其中new SequenceSpout() 为真正spout对象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 为spout的名字,注意名字中不要含有空格 int boltParal = get("bolt.parallel", 1); //获取bolt的并发设置 BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME); //创建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 为bolt名字,TotalCount 为bolt对象,boltParal为bolt并发数, //shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), //表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的数据,并且以shuffle方式, //即每个spout随机轮询发送tuple到下一级bolt中 int ackerParal = get("acker.parallel", 1); Config.setNumAckers(conf, ackerParal); //设置表示acker的并发数 int workerNum = get("worker.num", 10); conf.put(Config.TOPOLOGY_WORKERS, workerNum); //表示整个topology将使用几个worker conf.put(Config.STORM_CLUSTER_MODE, "distributed"); //设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行 StormSubmitter.submitTopology(streamName, conf,builder.createTopology()); //提交topology
实现Spout接口
IRichSpout为最简单的Spout接口
IRichSpout{ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { } @Override public void close() { } @Override public void activate() { } @Override public void deactivate() { } @Override public void nextTuple() { } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; }
其中需要注意的是:
- JStorm默认采用Kryo进行序列化和反序列化,要求所有Spout、Bolt或Tuple必须实现Serializable接口;
- Spout可以有构造方法但是构造方法只会执行一次,是在提交任务时创建Spout对象,task被分配到具体Worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将Spout序列化到文件中去,在Worker起来时再将Spout从文件中反序列化出来)。
- open是当task起来后执行的初始化动作;
- close是当task被shutdown后执行的动作;
- activate是当task被激活时触发的动作;
- deactivate是task被deactive时触发的动作;
- nextTuple是Spout实现核心,nextuple完成自己的逻辑即每一次取消息后用collector 将消息emit出去;
- ack,当spout收到一条ack消息时,触发的动作,详情可以参考ack机制;
- fail,当spout收到一条fail消息时,触发的动作,详情可以参考ack机制;
- declareOutputFields,定义Spout发送数据中每个字段的含义;
- getComponentConfiguration获取本spout的component 配置;
实现Bolt接口
RichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
其中需要注意的是:
- JStorm默认采用Kryo进行序列化和反序列化,要求所有Spout、Bolt或Tuple必须实现Serializable接口;
- Bolt可以有构造方法但是构造方法只会执行一次,是在提交任务时创建Bolt对象,task分配到具体Worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将Bolt序列化到文件中去,在Worker起来时再将Bolt从文件中反序列化出来)。
- prepare是当task起来后执行的初始化动作,通常在此处进行一些黑/白名单,第三方组件客户端,公共变量的初始化工作;
- cleanup是当task被shutdown后执行的动作,资源关闭动作可以在此处进行;
- execute是bolt实现核心,接受每一次取消息并处理完后,有可能用collector 将产生的新消息emit出去;
- 在executor中,当程序处理一条消息时需要执行collector.ack,详情可以参考 ack机制
- 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail,详情可以参考 ack机制
- declareOutputFields,中每个字段的含义的含义;
- getComponentConfiguration 获取本Bolt的component配置;
编译打包代码
在Maven中配置JStorm GVA:
<dependency> <groupId>com.alibaba.jstorm</groupId> <artifactId>jstorm-core</artifactId> <version>${jstorm.version}</version> <scope>provided</scope> </dependency>
打包时需要将所有依赖打入到一个包中:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.7.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <!-- <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>${这里请定义好你的启动拓扑的全类名,例如:com.alibaba.aloha.pandora.PandoraTopology}</mainClass> </transformer> --> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
提交jar包到集群
jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter
- xxxx.jar 为打包后的jar包全名;
- com.alibaba.xxxx.xx 为入口类,即拥有main方法的任务类;
- parameter即为提交参数;
Apache Storm升级到JStorm
JStorm是Apache Storm API的超集,JStorm默认支持Storm 的所有客户端接口。基于Apache Storm 0.9.5 版本的应用可以直接运行在JStorm 2.1.0或2.1.1 的集群环境中。对于其他版本的Storm则需要把应用中pom.xml的依赖修改为JStorm对应的版本即可。
<dependency> <groupId>com.alibaba.jstorm</groupId> <artifactId>jstorm-core</artifactId> <version>2.1.1</version> <scope>provided</scope> </dependency>
替换完成后执行mvn clean package -DskipTest重新编译打包即可,其中版本2.1.1修改为JStorm集群的运行版本即可。
spout多线程方式
一般情况下JStorm Spout 内部执行ack/fail 和nextTuple是分别2个线程,而不是像Storm一样都运行在一个线程中。
当topology.max.spout.pending 设置不为1时(包括topology.max.spout.pending设置为null),Spout内部将额外启动一个线程单独执行ack或fail操作,从而使得nextTuple在单独一个线程中执行,此时允许在nextTuple中执行block动作。而原生的Storm,nextTuple/ack/fail都在一个线程中执行,当数据量不大时nextTuple立即返回,而ack/fail同样也容易没有数据进而导致CPU 大量空转白白浪费CPU,而在JStorm中nextTuple可以以block方式获取数据,比如从disruptor中或BlockingQueue中获取数据,当没有数据时直接block住节省了大量CPU。
当topology.max.spout.pending为1时, 恢复为spout一个线程即nextTuple/ack/fail 运行在一个线程中。
编译Alibaba JStorm
JStorm的编译相对于Spark来说还是相对简单的:
[staff@workstation ~] git clone https://github.com/alibaba/jstorm [staff@workstation ~] cd jstorm [staff@workstation jstorm] mvn clean package assembly:assembly -Dmaven.test.skip=true
参考资料
https://github.com/alibaba/jstorm
https://groups.google.com/g/jstorm-user
https://storm.apache.org/index.html
转载请注明:雪后西塘 » JStorm集群搭建与应用入门