多读书多实践,勤思考善领悟

Storm编程模型

本文于1689天之前发表,文中内容可能已经过时。

一、简介

下图为Strom的运行流程图,在开发Storm流处理程序时,我们需要采用内置或自定义实现spout(数据源)和bolt(处理单元),并通过TopologyBuilder将它们之间进行关联,形成Topology

二、IComponent接口

IComponent接口定义了Topology中所有组件(spout/bolt)的公共方法,自定义的spout或bolt必须直接或间接实现这个接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface IComponent extends Serializable {

/**
* 声明此拓扑的所有流的输出模式。
* @param declarer这用于声明输出流id,输出字段以及每个输出流是否是直接流(direct stream)
*/
void declareOutputFields(OutputFieldsDeclarer declarer);

/**
* 声明此组件的配置。
*
*/
Map<String, Object> getComponentConfiguration();

}

三、Spout

3.1 ISpout接口

自定义的spout需要实现ISpout接口,它定义了spout的所有可用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public interface ISpout extends Serializable {
/**
* 组件初始化时候被调用
*
* @param conf ISpout的配置
* @param context 应用上下文,可以通过其获取任务ID和组件ID,输入和输出信息等。
* @param collector 用来发送spout中的tuples,它是线程安全的,建议保存为此spout对象的实例变量
*/
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

/**
* ISpout将要被关闭的时候调用。但是其不一定会被执行,如果在集群环境中通过kill -9 杀死进程时其就无法被执行。
*/
void close();

/**
* 当ISpout从停用状态激活时被调用
*/
void activate();

/**
* 当ISpout停用时候被调用
*/
void deactivate();

/**
* 这是一个核心方法,主要通过在此方法中调用collector将tuples发送给下一个接收器,这个方法必须是非阻塞的。
* nextTuple/ack/fail/是在同一个线程中执行的,所以不用考虑线程安全方面。当没有tuples发出时应该让
* nextTuple休眠(sleep)一下,以免浪费CPU。
*/
void nextTuple();

/**
* 通过msgId进行tuples处理成功的确认,被确认后的tuples不会再次被发送
*/
void ack(Object msgId);

/**
* 通过msgId进行tuples处理失败的确认,被确认后的tuples会再次被发送进行处理
*/
void fail(Object msgId);
}

3.2 BaseRichSpout抽象类

通常情况下,我们实现自定义的Spout时不会直接去实现ISpout接口,而是继承BaseRichSpoutBaseRichSpout继承自BaseCompont,同时实现了IRichSpout接口。

IRichSpout接口继承自ISpoutIComponent,自身并没有定义任何方法:

1
2
3
public interface IRichSpout extends ISpout, IComponent {

}

BaseComponent抽象类空实现了IComponentgetComponentConfiguration方法:

1
2
3
4
5
6
public abstract class BaseComponent implements IComponent {
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

BaseRichSpout继承自BaseCompont类并实现了IRichSpout接口,并且空实现了其中部分方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
@Override
public void close() {}

@Override
public void activate() {}

@Override
public void deactivate() {}

@Override
public void ack(Object msgId) {}

@Override
public void fail(Object msgId) {}
}

通过这样的设计,我们在继承BaseRichSpout实现自定义spout时,就只有三个方法必须实现:

  • open : 来源于ISpout,可以通过此方法获取用来发送tuples的SpoutOutputCollector
  • nextTuple :来源于ISpout,必须在此方法内部发送tuples;
  • declareOutputFields :来源于IComponent,声明发送的tuples的名称,这样下一个组件才能知道如何接受。

四、Bolt

bolt接口的设计与spout的类似:

4.1 IBolt 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 /**
* 在客户端计算机上创建的IBolt对象。会被被序列化到topology中(使用Java序列化),并提交给集群的主机(Nimbus)。
* Nimbus启动workers反序列化对象,调用prepare,然后开始处理tuples。
*/

public interface IBolt extends Serializable {
/**
* 组件初始化时候被调用
*
* @param conf storm中定义的此bolt的配置
* @param context 应用上下文,可以通过其获取任务ID和组件ID,输入和输出信息等。
* @param collector 用来发送spout中的tuples,它是线程安全的,建议保存为此spout对象的实例变量
*/
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

/**
* 处理单个tuple输入。
*
* @param Tuple对象包含关于它的元数据(如来自哪个组件/流/任务)
*/
void execute(Tuple input);

/**
* IBolt将要被关闭的时候调用。但是其不一定会被执行,如果在集群环境中通过kill -9 杀死进程时其就无法被执行。
*/
void cleanup();

4.2 BaseRichBolt抽象类

同样的,在实现自定义bolt时,通常是继承BaseRichBolt抽象类来实现。BaseRichBolt继承自BaseComponent抽象类并实现了IRichBolt接口。

IRichBolt接口继承自IBoltIComponent,自身并没有定义任何方法:

1
2
3
public interface IRichBolt extends IBolt, IComponent {

}

通过这样的设计,在继承BaseRichBolt实现自定义bolt时,就只需要实现三个必须的方法:

  • prepare: 来源于IBolt,可以通过此方法获取用来发送tuples的OutputCollector
  • execute:来源于IBolt,处理tuples和发送处理完成的tuples;
  • declareOutputFields :来源于IComponent,声明发送的tuples的名称,这样下一个组件才能知道如何接收。

五、词频统计案例

5.1 案例简介

这里我们使用自定义的DataSourceSpout产生词频数据,然后使用自定义的SplitBoltCountBolt来进行词频统计。

案例源码下载地址:storm-word-count

5.2 代码实现

1. 项目依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
</dependency>

2. DataSourceSpout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class DataSourceSpout extends BaseRichSpout {

private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

private SpoutOutputCollector spoutOutputCollector;

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}

@Override
public void nextTuple() {
// 模拟产生数据
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}


/**
* 模拟数据
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}

}

上面类使用productData方法来产生模拟数据,产生数据的格式如下:

1
2
3
4
5
6
7
8
9
Spark	HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm

3. SplitBolt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SplitBolt extends BaseRichBolt {

private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}

@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(new Values(word));
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

4. CountBolt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CountBolt extends BaseRichBolt {

private Map<String, Integer> counts = new HashMap<>();

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

}

@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 输出
System.out.print("当前实时统计结果:");
counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
System.out.println();
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}
}

5. LocalWordCountApp

通过TopologyBuilder将上面定义好的组件进行串联形成 Topology,并提交到本地集群(LocalCluster)运行。通常在开发中,可先用本地模式进行测试,测试完成后再提交到服务器集群运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class LocalWordCountApp {

public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("DataSourceSpout", new DataSourceSpout());

// 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");

// 指明将 SplitBolt 的数据发送到 CountBolt 中 处理
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

// 创建本地集群用于测试 这种模式不需要本机安装storm,直接运行该Main方法即可
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountApp",
new Config(), builder.createTopology());
}

}

6. 运行结果

启动WordCountApp的main方法即可运行,采用本地模式Storm会自动在本地搭建一个集群,所以启动的过程会稍慢一点,启动成功后即可看到输出日志。

六、提交到服务器集群运行

6.1 代码更改

提交到服务器的代码和本地代码略有不同,提交到服务器集群时需要使用StormSubmitter进行提交。主要代码如下:

为了结构清晰,这里新建ClusterWordCountApp类来演示集群模式的提交。实际开发中可以将两种模式的代码写在同一个类中,通过外部传参来决定启动何种模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ClusterWordCountApp {

public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("DataSourceSpout", new DataSourceSpout());

// 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");

// 指明将 SplitBolt 的数据发送到 CountBolt 中 处理
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

// 使用StormSubmitter提交Topology到服务器集群
try {
StormSubmitter.submitTopology("ClusterWordCountApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
}

}

6.2 打包上传

打包后上传到服务器任意位置,这里我打包后的名称为storm-word-count-1.0.jar

1
# mvn clean package -Dmaven.test.skip=true

6.3 提交Topology

使用以下命令提交Topology到集群:

1
2
# 命令格式: storm jar jar包位置 主类的全路径 ...可选传参
storm jar /usr/appjar/storm-word-count-1.0.jar com.myhhub.wordcount.ClusterWordCountApp

出现successfully则代表提交成功:

6.4 查看Topology与停止Topology(命令行方式)

1
2
3
4
5
# 查看所有Topology
storm list

# 停止 storm kill topology-name [-w wait-time-secs]
storm kill ClusterWordCountApp -w 3

6.5 查看Topology与停止Topology(界面方式)

使用UI界面同样也可进行停止操作,进入WEB UI界面(8080端口),在Topology Summary中点击对应Topology 即可进入详情页面进行操作。

七、关于项目打包的扩展说明

mvn package的局限性

在上面的步骤中,我们没有在POM中配置任何插件,就直接使用mvn package进行项目打包,这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方JAR包,就会出现问题,因为package打包后的JAR中是不含有依赖包的,如果此时你提交到服务器上运行,就会出现找不到第三方依赖的异常。

这时候可能大家会有疑惑,在我们的项目中不是使用了storm-core这个依赖吗?其实上面之所以我们能运行成功,是因为在Storm的集群环境中提供了这个JAR包,在安装目录的lib目录下:

为了说明这个问题我在Maven中引入了一个第三方的JAR包,并修改产生数据的方法:

1
2
3
4
5
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>

StringUtils.join()这个方法在commons.lang3storm-core中都有,原来的代码无需任何更改,只需要在import时指明使用commons.lang3

1
2
3
4
5
6
7
8
import org.apache.commons.lang3.StringUtils;

private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}

此时直接使用mvn clean package打包运行,就会抛出下图的异常。因此这种直接打包的方式并不适用于实际的开发,因为实际开发中通常都是需要第三方的JAR包。

想把依赖包一并打入最后的JAR中,maven提供了两个插件来实现,分别是maven-assembly-pluginmaven-shade-plugin。鉴于本篇文章篇幅已经比较长,且关于Storm打包还有很多需要说明的地方,所以关于Storm的打包方式单独整理至下一篇文章:

Storm三种打包方式对比分析

参考资料

  1. Running Topologies on a Production Cluster
  2. Pre-defined Descriptor Files