博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apache Storm技术实战之1 -- WordCountTopology
阅读量:6626 次
发布时间:2019-06-25

本文共 4219 字,大约阅读时间需要 14 分钟。

欢迎转载,转载请注意出处,徽沪一郎。

“源码走读系列”从代码层面分析了storm的具体实现,接下来通过具体的实例来说明storm的使用。因为目前storm已经正式迁移到Apache,文章系列也由twitter storm转为apache storm.

WordCountTopology 使用storm来统计文件中的每个单词的出现次数。

通过该例子来说明tuple发送时的几个要素

  1. source component   发送源
  2. destination component 接收者
  3. stream 消息通道
  4. tuple    消息本身

本文涉及到的开发环境搭建可以参考前面的两篇博文。

awk实现

其实对文件中的单词进行统计是Linux下一个很常见的任务,用awk就可以轻松的解决(如果文件不是太大的话),下面是进行word counting的awk脚本,将其保存为名为wordcount.awk文件。

wordcount.awk

{  for (i = 1; i<=NF; i++)    freq[$i]++}END{  for (word in freq)    printf "%s\t%d\n",word,freq[word]}

运行该脚本,对文件中的单词进行统计

gawk -f wordcount.awk filename

原始版本

从github上复制内容

git clone https://github.com/nathanmarz/storm-starter.git

编译运行

lein depslein compilejava -cp $(lein classpath) WordCountTopology

main函数

main函数的主要内容

TopologyBuilder builder = new TopologyBuilder();    builder.setSpout("spout", new RandomSentenceSpout(), 5);    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

注意:grouping操作的时候,如果没有显示指定stream id,则使用的是default stream. 如shuffleGrouping("spout")表示从名为spout的component中接收从default stream发送过来的tuple.

改进版本

在原始版本中,spout不停的向split bolt随机发送句子,Count bolt统计每个单词出现的次数。

那么能不能让Spout在读取完文件之后,通知下游的bolt显示最柊的统计结果呢?

要想达到上述的改进目标,采用如上图所示的结构即可。改变的地方如下,

  1. 在Spout中添加一个SUCCESS_STREAM
  2. 添加只有一个运行实例的statistics bolt
  3. 当spout读取完文件内容之后,通过SUCCESS_STREAM告诉statistics bolt,文件已经处理完毕,可以打印当前的统计结果

RandomSentenceSpout.java

declareOutputFields

添加SUCCESS_STREAM

@Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {    declarer.declare(new Fields("word"));    declarer.declareStream("SUCCESS_STREAM",new Fields("word"));  }

nextTuple

使用SUCCESS_STREAM通知下游,文件处理完毕

@Override  public void nextTuple() {    Utils.sleep(100);    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };    if ( count == sentences.length )     {      System.out.println(count+" try to emit tuple by success_stream");      _collector.emit("SUCCESS_STREAM",new Values(sentences[0]));      count++;    }else if ( count < sentences.length ){      _collector.emit(new Values(sentences[count]));      count++;    }  }

WordCountTopology.java

添加静态类WordCount2

public static class WordCount2 extends BaseBasicBolt {    Map
counts = new HashMap
(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { if ( tuple.getSourceStreamId() == "SUCCESS_STREAM" ) { System.out.println("prepare to print the statistics"); for (String key : counts.keySet()) { System.out.println(key+"\t"+counts.get(key)); } System.out.println("finish printing"); }else { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); } }

 

main函数

将spout的并行数由5改为1

builder.setSpout("spout", new RandomSentenceSpout(), 1);

在原有的Topology中添加WordCount2 Bolt

builder.setBolt("count2", new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");

WordCount2 Bolt会接收从Count Bolt通过default stream发送的tuple,同时接收Spout通过SUCCESS_STREAM发送的tuple,也就是说wordcount2会接收从两个stream来的数据。

编译

编译修改后的源文件

cd $STROM_STARTER lein compile storm.starter

可能会出现以下异常信息,该异常可以忽略。

Exception in thread "main" java.io.FileNotFoundException: Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:

运行

在local模式下运行修改后的WordCountTopology

java -cp $(lein classpath) storm.starter.WordCountTopology

如果一切正常,日志如下所示,线程的名字可能会有所不同。

moon    1 score    1 cow    1 doctor    1 over    1 nature    1 snow    1 four    1 keeps    1 with    1 a    1 white    1 dwarfs    1 at    1 the    4 and    2 i    1 two    1 away    1 seven    2 apple    1 am    1 an    1 jumped    1 day    1 years    1 ago    1

 结果验证

可以将WordCountTopology的运行结果和awk脚本的运行结果相比对,结果应该是一致的。

小技巧

  1. awk脚本的执行结果存为一个文件result1.log, WordCountTopology的输出中单词统计部分存为result2.log
  2. 用vim打开result1.log,进行sorting,保存结果;用vim打开result2.log,进行sorting,保存。
  3. 然后用vimdiff来进行比较 vimdiff result1.log result2.log

转载于:https://www.cnblogs.com/hseagle/p/3505938.html

你可能感兴趣的文章
php实现按utf8编码对字符串进行分割
查看>>
Ftp的断点下载实现
查看>>
[转载] ubuntu Authentication failure
查看>>
Ring0 - 链表
查看>>
修改数组之----splice
查看>>
a版本冲刺第五天
查看>>
Arduino示例教程超声波测距实验
查看>>
Redis操作hash
查看>>
轻松搞定个人虚拟桌面部署之5-在客户端测试远程桌面
查看>>
Linux中chkconfig使用介绍
查看>>
二进制方式快速安装MySQL数据库
查看>>
Centos5上部署udev
查看>>
挑战WORD极限排版之模板与加载项
查看>>
Tomcat配置多数据源
查看>>
(转)快速搭建PHP开发环境WAMP+ZendStudio+ZendDebugger
查看>>
js string format
查看>>
httpHandlers和httpModules接口介绍 (3)
查看>>
《大话数据结构》第9章 排序 9.1 开场白
查看>>
Xgcalendar 新增Php demo
查看>>
poj2774
查看>>