一、Storm集成HDFS
1.1 项目结构
本用例源码下载地址:storm-hdfs-integration
1.2 项目主要依赖
项目主要依赖如下,有两个地方需要注意:
- 这里由于我服务器上安装的是CDH版本的Hadoop,在导入依赖时引入的也是CDH版本的依赖,需要使用
<repository>
标签指定CDH的仓库地址;
hadoop-common
、hadoop-client
、hadoop-hdfs
均需要排除slf4j-log4j12
依赖,原因是storm-core
中已经有该依赖,不排除的话有JAR包冲突的风险;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| <properties> <storm.version>1.2.2</storm.version> </properties>
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>
<dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.15.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.15.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.15.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
|
1.3 DataSourceSpout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; }
@Override public void nextTuple() { String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); }
@Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); }
private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); }
}
|
产生的模拟数据格式如下:
1 2 3 4 5 6 7 8 9
| Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm
|
1.4 将数据存储到HDFS
这里HDFS的地址和数据存储路径均使用了硬编码,在实际开发中可以通过外部传参指定,这样程序更为灵活。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public class DataToHdfsApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; private static final String HDFS_BOLT = "hdfsBolt";
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "root");
RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|");
SyncPolicy syncPolicy = new CountSyncPolicy(100);
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/storm-hdfs/");
HdfsBolt hdfsBolt = new HdfsBolt() .withFsUrl("hdfs://hadoop001:8020") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy);
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout()); builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);
if (args.length > 0 && args[0].equals("cluster")) { try { StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalDataToHdfsApp", new Config(), builder.createTopology()); } } }
|
1.5 启动测试
可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用maven-shade-plugin
进行打包,打包命令如下:
1
| # mvn clean package -D maven.test.skip=true
|
运行后,数据会存储到HDFS的/storm-hdfs
目录下。使用以下命令可以查看目录内容:
1 2 3 4
| hadoop fs -ls /storm-hdfs
hadoop fs -tail -f /strom-hdfs/文件名
|
二、Storm集成HBase
2.1 项目结构
集成用例: 进行词频统计并将最后的结果存储到HBase,项目主要结构如下:
本用例源码下载地址:storm-hbase-integration
2.2 项目主要依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <properties> <storm.version>1.2.2</storm.version> </properties>
<dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hbase</artifactId> <version>${storm.version}</version> </dependency> </dependencies>
|
2.3 DataSourceSpout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; }
@Override public void nextTuple() { String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); }
@Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); }
private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); }
}
|
产生的模拟数据格式如下:
1 2 3 4 5 6 7 8 9
| Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm
|
2.4 SplitBolt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; }
@Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split("\t"); for (String word : words) { collector.emit(tuple(word, 1)); } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
|
2.5 CountBolt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; }
@Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); collector.emit(new Values(word, String.valueOf(count)));
}
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
|
2.6 WordCountToHBaseApp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
public class WordCountToHBaseApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; private static final String SPLIT_BOLT = "splitBolt"; private static final String COUNT_BOLT = "countBolt"; private static final String HBASE_BOLT = "hbaseBolt";
public static void main(String[] args) {
Config config = new Config();
Map<String, Object> hbConf = new HashMap<>(); hbConf.put("hbase.rootdir", "hdfs://hadoop001:8020/hbase"); hbConf.put("hbase.zookeeper.quorum", "hadoop001:2181");
config.put("hbase.conf", hbConf);
SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField("word") .withColumnFields(new Fields("word","count")) .withColumnFamily("info");
HBaseBolt hbase = new HBaseBolt("WordCount", mapper) .withConfigKey("hbase.conf");
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1); builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT); builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT); builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT);
if (args.length > 0 && args[0].equals("cluster")) { try { StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountToRedisApp", config, builder.createTopology()); } } }
|
2.7 启动测试
可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用maven-shade-plugin
进行打包,打包命令如下:
1
| # mvn clean package -D maven.test.skip=true
|
运行后,数据会存储到HBase的WordCount
表中。使用以下命令查看表的内容:
1
| hbase > scan 'WordCount'
|
2.8 withCounterFields
在上面的用例中我们是手动编码来实现词频统计,并将最后的结果存储到HBase中。其实也可以在构建SimpleHBaseMapper
的时候通过withCounterFields
指定count字段,被指定的字段会自动进行累加操作,这样也可以实现词频统计。需要注意的是withCounterFields指定的字段必须是Long类型,不能是String类型。
1 2 3 4 5
| SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField("word") .withColumnFields(new Fields("word")) .withCounterFields(new Fields("count")) .withColumnFamily("cf");
|
参考资料
- Apache HDFS Integration
- Apache HBase Integration