最新消息:天气越来越冷,记得加一件厚衣裳

JStorm集群搭建与应用入门

JStorm w3sun 1023浏览 0评论

Alibaba JStorm集群部署

部署JStorm集群主要分为2个步骤,首先部署JStorm引擎然后再部署JStorm UI。集群部署目前支持如下三种模式:

  • Standalone,JStorm集群单独部署不依赖外部系统,比如Yarn或Docker Swarm;
  • Hadoop Yarn,该模式下JStorm将运行在Yarn上,JStorm客户端会在Yarn提交一个Application, 这个Application就是一个JStorm 逻辑集群;
  • Docker,在集群中创建一批docker, 这批docker 组成一个JStorm 逻辑集群;

Standalone 部署方式

Standalone是最简单、 最轻量、 最稳定也是最常用的部署方式,集群整体规模不超过300台时建议采用该方式。安装步骤(假设依赖的Zookeeper集群已经搭建完毕)如下:

安装Java

强烈建议安装JDK8,如果机器上已经安装了JDK8则无需再安装。需要注意的是,(1)如果当前系统是64位系统则需要下载64位JDK,如果是32为系统则下载32位JDK;(2)JStorm 2.x 版本开始,要求JDK版本必须等于或高于JDK7。

检查java版本时在命令行下执行java -version即可, 如果找不到java或java版本低于7,则需要设置PATH环境变量或安装JDK7  (强烈建议JDK8)。

检查本机IP

检查机器ip是否正确在命令行执行hostname -i即可,如果返回“127.0.0.1”则说明机器没有配置正确的ip, 需要设定/etc/hosts或网卡配置直到hostname -i返回一个正确的ip;

部署JStorm集群

以jstorm-2.1.1.zip为例进行部署,解压二进制软件包,修改环境变量并保存:

[staff@workstation ~]$ unzip jstorm-2.1.1.zip	
[staff@workstation ~]$ vi ~/.bashrc
export JSTORM_HOME=/XXXXX/XXXX
export PATH=$PATH:$JSTORM_HOME/bin

编辑JStorm配置文件$JSTORM_HOME/conf/storm.yaml,基本配置项如下:

storm.zookeeper.servers: 表示Zookeeper的地址
storm.zookeeper.root:    表示JStorm在Zookeeper中的根目录,当多个JStorm共享一个Zookeeper时需要设置该选项,默认路径为"/jstorm"
nimbus.host:             表示nimbus的地址,填写ip
storm.local.dir:         表示JStorm临时数据存放目录,需要保证JStorm程序对该目录有写权限

其他详细配置,请参考配置详解:https://github.com/alibaba/jstorm/wiki/JStorm-Configuration。此外部署其他节点时,请确保其他节点的“临时数据存放”目录为空,“临时数据存放”目录通过$JSTORM_HOME/conf/storm.yaml中storm.local.dir选项指定。

启动JStorm集群

nimbus节点上执行 “nohup jstorm nimbus &”,查看$JSTORM_HOME/logs/nimbus.log检查有无错误。

supervisor节点上执行 “nohup jstorm supervisor &”,查看$JSTORM_HOME/logs/supervisor.log检查有无错误。

Hadoop Yarn 部署方式

当规模超过300台时,可以考虑基于Hadoop Yarn方式进行部署。

优点:

  • 天然支持多租户,一个物理集群上可以创建多个JStorm逻辑集群,每一个JStorm逻辑集群可以认为是一个bu或team服务,并且逻辑集群和逻辑集群之间天然支持资源隔离;
  • 提高资源利用率,大量的小standalone 集群, 必然出现有的集群利用率低有的集群利用率高的情况,并且利用率低的集群资源无法共享给利用率高的集群, 大家集中在一起必然可以削峰填谷以提供更多的资源;
  • 集群运维便利性, 创建新集群,以及后续升级、扩容及缩容都会极为方便,而且集群资源还可以和SparkStreaming、Flink等计算引擎混用;

缺点:

  • 稳定性比standalone有所下降,如果Yarn集群与离线应用混在一起,离线应用很容易将网卡打满进而抢占在线应用的计算资源,因此建议实时和离线应用依托的Yarn集群分开部署,实现物理集群隔离;
  • 外部依赖比较重, 依赖一个重量级的Hadoop Yarn集群,集群规模至少在30台以上才有必要采用Yarn模式部署;

Docker部署方式

资源池比较林散且公司有成熟的docker 运维平台时,可以考虑容器化部署方式。容器化部署优缺点如下:

优点:

  • 天然支持多租户,一个物理集群上可以创建多个JStorm逻辑集群,每个JStorm逻辑集群就可以认为是一个bu或team服务,并且逻辑集群和逻辑集群之间天然支持资源隔离;
  • 提高资源利用率,大量的小standalone 集群, 必然出现有的集群利用率低有的集群利用率高的情况,并且利用率低的集群资源无法共享给利用率高的集群, 大家集中在一起必然可以削峰填谷以提供更多的资源;
  • 集群运维便利性, 创建新集群,以及后续升级、扩容及缩容都会极为方便;

缺点:

  • 稳定性比standalone有所下降,因为standalone是完全独占机器或资源,而Docker 很有可能会出现凌晨多个docker挤在一台机器上,而白天再部署到多台机器上,会有一个颠簸时间;
  • 外部依赖比较重, 需要外部有一个功能完善的Docker运维管控平台才能完成;
  • 相对直接在物理机上运行standalone的方式,Docker部署时性能会有一点点的损耗,因为Docker增加了一层虚拟化,不过只有在极限测试的情况下才能看得出来。真实情况下一般性能瓶颈在应用方是感受不出来, 因此使用方无需担心该问题。

目前公司采用的是Apache Mesos+Marathon+Docker方式来部署JStorm集群,其中Mesos+Marathon负责集群资源调度,后续将说明采用这种部署方式的优点。

JStorm WebUI 部署

概述

WebUI的安装部署和JStorm是完全独立,并不要求WebUI的机器必须在JStorm集群的机器上。JStorm 0.9.6.1 后一个Web UI可以管理多个集群,只需在Web UI的配置文件中增加新集群的配置即可。

随着集群规模不断增大, 集群的数量可能也会越来越多,尤其是一些小集群也逐渐增多,如果继续一个集群一个Web UI,PE的工作量自然非常巨大。对于一些线上应用每搭建一个Web UI需要层层审批, 比如VIP 申请、开放端口申请、域名申请等每一个环节都十分痛苦,因此强烈需要一个Web UI管理多个JStorm集群。

安装Web UI

tomcat必须使用7.x版本, 需要注意是不要忘记拷贝 ~/.jstorm/storm.yaml,此外Web UI可以和Nimbus不在同一个节点。

[staff@workstation ~]$ mkdir ~/.jstorm
[staff@workstation ~]$ cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm
# 下载tomcat 7.x (以apache-tomcat-7.0.37 为例)
[staff@workstation ~]$ tar -xzf apache-tomcat-7.0.37.tar.gz
[staff@workstation ~]$ cd apache-tomcat-7.0.37
[staff@workstation apache-tomcat-7.0.37]$ cd webapps
[staff@workstation webapps]$ cp $JSTORM_HOME/jstorm-ui-2.1.1.war ./
[staff@workstation webapps]$ mv ROOT ROOT.old
[staff@workstation webapps]$ ln -s jstorm-ui-2.1.1 ROOT  #这个地方可能变化需要根据实际JStorm版本来确定,当0.9.6.1时执行ln -s jstorm-0.9.6.1 ROOT,而不是ln -s jstorm-ui-0.9.6.3.war ROOT 这个要注意
[staff@workstation webapps]$ cd ../bin
[staff@workstation bin]$ ./startup.sh

WebUI增加新集群

Web UI版本必须和集群中JStorm最高的版本保持一致,运行Web UI的机器上需要修改~/.jstorm/storm.yaml,把默认的ui.clusters注释给去掉并补充:

# UI MultiCluster
# Following is an example of multicluster UI configuration
 ui.clusters:
     - {
         name: "jstorm.share",
         zkRoot: "/jstorm",
         zkServers:
             [ "zk.test1.com", "zk.test2.com", "zk.test3.com"],
         zkPort: 2181,
       }
     - {
         name: "jstorm.bu1",
         zkRoot: "/jstorm.dw",
         zkServers:
             [ "10.125.100.101", "10.125.100.101", "10.125.100.101"],
         zkPort: 2181,
       }

其中每一个JSON节点都代表一个小的逻辑集群:

- {
         name: "jstorm.bu1",    --- 这个是集群名字,每个集群名字必须不一样
         zkRoot: "/jstorm.dw",  --- 这个集群Zookeeper根节点,可以参考$JSTORM_HOME/con/storm.yaml中“storm.zookeeper.root”字段
         zkServers:
             [ "10.125.100.101", "10.125.100.101", "10.125.100.101"],   -- Zookeeper机器列表
         zkPort: 2181,                                                  -- Zookeeper客户端端口
  }

目前公司采用的就是一个Web UI同时管理多个JStorm逻辑集群的方式。


JStorm应用开发入门

本文帮助读者快速实现一个JStorm例子, 初学者可以试运行源码包里的Example,以获得更直接的感受:Example源码

生成Topology

Map conf = new HashMap();
//topology所有自定义的配置均放入这个Map

TopologyBuilder builder = new TopologyBuilder();
//创建topology的生成器

int spoutParal = get("spout.parallel", 1);
//获取spout的并发设置

SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,new SequenceSpout(), spoutParal);
//创建Spout, 其中new SequenceSpout() 为真正spout对象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 为spout的名字,注意名字中不要含有空格

int boltParal = get("bolt.parallel", 1);
//获取bolt的并发设置

BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
//创建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 为bolt名字,TotalCount 为bolt对象,boltParal为bolt并发数,
//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), 
//表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的数据,并且以shuffle方式,
//即每个spout随机轮询发送tuple到下一级bolt中

int ackerParal = get("acker.parallel", 1);
Config.setNumAckers(conf, ackerParal);
//设置表示acker的并发数

int workerNum = get("worker.num", 10);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);
//表示整个topology将使用几个worker

conf.put(Config.STORM_CLUSTER_MODE, "distributed");
//设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行

StormSubmitter.submitTopology(streamName, conf,builder.createTopology());
//提交topology

实现Spout接口

IRichSpout为最简单的Spout接口

IRichSpout{

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    }

    @Override
    public void close() {
    }

    @Override
    public void activate() {
    }

    @Override
    public void deactivate() {
    }

    @Override
    public void nextTuple() {
    }

    @Override
    public void ack(Object msgId) {
    }

    @Override
    public void fail(Object msgId) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

其中需要注意的是:

  • JStorm默认采用Kryo进行序列化和反序列化,要求所有Spout、Bolt或Tuple必须实现Serializable接口;
  • Spout可以有构造方法但是构造方法只会执行一次,是在提交任务时创建Spout对象,task被分配到具体Worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将Spout序列化到文件中去,在Worker起来时再将Spout从文件中反序列化出来)。
  • open是当task起来后执行的初始化动作;
  • close是当task被shutdown后执行的动作;
  • activate是当task被激活时触发的动作;
  • deactivate是task被deactive时触发的动作;
  • nextTuple是Spout实现核心,nextuple完成自己的逻辑即每一次取消息后用collector 将消息emit出去;
  • ack,当spout收到一条ack消息时,触发的动作,详情可以参考ack机制;
  • fail,当spout收到一条fail消息时,触发的动作,详情可以参考ack机制;
  • declareOutputFields,定义Spout发送数据中每个字段的含义;
  • getComponentConfiguration获取本spout的component 配置;

实现Bolt接口

RichBolt {

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    }

    @Override
    public void execute(Tuple input) {
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

其中需要注意的是:

  • JStorm默认采用Kryo进行序列化和反序列化,要求所有Spout、Bolt或Tuple必须实现Serializable接口;
  • Bolt可以有构造方法但是构造方法只会执行一次,是在提交任务时创建Bolt对象,task分配到具体Worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将Bolt序列化到文件中去,在Worker起来时再将Bolt从文件中反序列化出来)。
  • prepare是当task起来后执行的初始化动作,通常在此处进行一些黑/白名单,第三方组件客户端,公共变量的初始化工作;
  • cleanup是当task被shutdown后执行的动作,资源关闭动作可以在此处进行;
  • execute是bolt实现核心,接受每一次取消息并处理完后,有可能用collector 将产生的新消息emit出去;
    • 在executor中,当程序处理一条消息时需要执行collector.ack,详情可以参考 ack机制
    • 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail,详情可以参考 ack机制
  • declareOutputFields,中每个字段的含义的含义;
  • getComponentConfiguration 获取本Bolt的component配置;

编译打包代码

在Maven中配置JStorm GVA:

<dependency>
   <groupId>com.alibaba.jstorm</groupId>
   <artifactId>jstorm-core</artifactId>
   <version>${jstorm.version}</version>
   <scope>provided</scope>
</dependency>

打包时需要将所有依赖打入到一个包中:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>1.7.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <!-- 
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>${这里请定义好你的启动拓扑的全类名,例如:com.alibaba.aloha.pandora.PandoraTopology}</mainClass>
                                </transformer>
                            -->
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

提交jar包到集群

jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter

  • xxxx.jar 为打包后的jar包全名;
  • com.alibaba.xxxx.xx 为入口类,即拥有main方法的任务类;
  • parameter即为提交参数;

Apache Storm升级到JStorm

JStorm是Apache Storm API的超集,JStorm默认支持Storm 的所有客户端接口。基于Apache Storm 0.9.5 版本的应用可以直接运行在JStorm 2.1.0或2.1.1 的集群环境中。对于其他版本的Storm则需要把应用中pom.xml的依赖修改为JStorm对应的版本即可。

<dependency>
    <groupId>com.alibaba.jstorm</groupId>
    <artifactId>jstorm-core</artifactId>
    <version>2.1.1</version>
    <scope>provided</scope>
</dependency>

替换完成后执行mvn clean package -DskipTest重新编译打包即可,其中版本2.1.1修改为JStorm集群的运行版本即可。

spout多线程方式

一般情况下JStorm Spout 内部执行ack/fail 和nextTuple是分别2个线程,而不是像Storm一样都运行在一个线程中。

当topology.max.spout.pending 设置不为1时(包括topology.max.spout.pending设置为null),Spout内部将额外启动一个线程单独执行ack或fail操作,从而使得nextTuple在单独一个线程中执行,此时允许在nextTuple中执行block动作。而原生的Storm,nextTuple/ack/fail都在一个线程中执行,当数据量不大时nextTuple立即返回,而ack/fail同样也容易没有数据进而导致CPU 大量空转白白浪费CPU,而在JStorm中nextTuple可以以block方式获取数据,比如从disruptor中或BlockingQueue中获取数据,当没有数据时直接block住节省了大量CPU。

当topology.max.spout.pending为1时, 恢复为spout一个线程即nextTuple/ack/fail 运行在一个线程中。


编译Alibaba JStorm

JStorm的编译相对于Spark来说还是相对简单的:

[staff@workstation ~] git clone https://github.com/alibaba/jstorm
[staff@workstation ~] cd jstorm
[staff@workstation jstorm] mvn clean package assembly:assembly -Dmaven.test.skip=true

参考资料

https://github.com/alibaba/jstorm

https://groups.google.com/g/jstorm-user

https://storm.apache.org/index.html

转载请注明:雪后西塘 » JStorm集群搭建与应用入门

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址