Storm是一个分布式调度系统,用户按照接口实现一个计算任务,然后把这个任务交给Storm系统。Storm将这个任务跑起来,并且7*24小时运行,一旦中间一个Worker挂了,调度器会立即重启一个Work替换这个失效的Worker。
本文所说的Storm均指阿里开发的JStorm。
把任务交给Storm的好处有:接口简单,开发快;当速度慢时,配置一下并发数,实现性能的线性扩展;健壮性好,自动进行故障恢复;可以启动Ack机制,保证数据不丢失;延时低。
Storm最适合的场景是无状态计算,即计算依赖的数据全部在接收的消息里,且一个消息的计算不依赖于另一个消息。典型应用场景有:日志分析、管道系统、消息转化器、统计分析器。
nextTuple
,用于获取下一条消息。Storm框架会不断地调用这个接口,从数据源拉取数据,发送给下游Bolt。execute
,内容是具体业务逻辑的实现。Spout和Bolt组合起来就是一个任务,称为Topology。向Storm集群提交一个Topology后,框架会对它进行调度和执行。
提交Topology的时候,用户可以指定总worker数,即系统处理这个任务所使用的进程数。用户还可以指定每个组件的并行度。比如一个Topology含有一个Spout和一个Bolt,Spout的并行度是5,Bolt的并行度是10。那么我们最终会有15个线程,5个Spout线程,10个Bolt线程。Storm中,每个执行线程都有一个task id,从1开始递增,每一个组件的task id是连续的。
于是这个Topology共有3个进程和15个线程。线程会分布在3个进程中。Storm使用了一种均匀的调度算法,因此在执行的时候,你会看到,每个进程分别都各有5个线程在执行。当然,由于spout是5个线程,不能均匀地分配到3个进程中,会出现一个进程只有1个spout线程的情况;同样地,也会出现一个进程中有4个bolt线程的情况。
Spout的消息会发送给特定的Bolt,Bolt也可以发送给其他的Bolt,这之间的通信机制如下:从Spout发送消息的时候,Storm会计算出消息要发送的目标task id列表,然后看目标task id是在本进程中还是在其他进程中。如果在本进程中,直接走进程内部通信;如果是跨进程,会通过netty将消息发送到目标task中。
fieldsGrouping
:保证相同的key的数据会发送到相同的task,原理是对某个或某几个字段做hash,然后用hash结果求模得到目标task id。globalGrouping
:发送到目标组件的第一个task。shuffleGrouping
:平均分配tuple到目标组件的各个task。noneGrouping
:随机分配tuple到目标组件的各个task,无法保证平均。allGrouping
:发送给目标组件的所有task。directGrouping
:发送指定目标task。Maven的pom.xml如下:
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-core</artifactId>
<version>2.1.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-nop</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-jdk14</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
SentenceSpout.java:
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
private int index = 0;
private static final String[] sentences = {
"The logic for a realtime application is packaged into a Storm topology",
"A Storm topology is analogous to a MapReduce job",
"One key difference is that a MapReduce job eventually finishes whereas a topology runs forever",
" A topology is a graph of spouts and bolts that are connected with stream groupings"
};
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
public void nextTuple() {
spoutOutputCollector.emit(new Values(sentences[index]));
++index;
if(index >= sentences.length) {
index = 0;
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentences"));
}
}
SPlitSentenceBolt.java
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector outputCollector;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentences");
String[] words = sentence.split(" ");
for(String word: words) {
outputCollector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
WordCount.java
public class WordCountBolt extends BaseRichBolt {
private Map<String, Long> wordCount = null;
private OutputCollector outputCollector;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
wordCount = new HashMap<String, Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = wordCount.get(word);
if(count == null) {
count = 0L;
}
++count;
wordCount.put(word, count);
outputCollector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "count"));
}
}
ReportBolt.java
public class ReportBolt extends BaseRichBolt {
private static final Logger log = LoggerFactory.getLogger(ReportBolt.class);
private Map<String, Long> counts = null;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
counts = new HashMap<String, Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = tuple.getLongByField("count");
counts.put(word, count);
printReport();
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
private void printReport() {
log.info("-------------begin------------");
Set<String> words = counts.keySet();
for(String word: words) {
log.info("@report-bolt@: " + word + " ---> " + counts.get(word));
}
log.info("-------------end--------------");
}
}
WordCountTopology.java
public class WordCountTopology {
private static final Logger log = LoggerFactory.getLogger(WordCountTopology.class);
private final static String SETENCE_SPOUT_ID = "sentence-spout";
private final static String SPLIT_SENTENCE_BOLT_ID = "split-bolt";
private final static String WORD_COUNT_BOLT_ID = "count-bolt";
private final static String REPORT_BOLT_ID = "report-bolt";
private final static String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception{
log.info(".......begining.......");
SentenceSpout sentenceSpout = new SentenceSpout();
SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();
WordCountBolt wordCountBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout(SETENCE_SPOUT_ID, sentenceSpout);
topologyBuilder.setBolt(SPLIT_SENTENCE_BOLT_ID, splitSentenceBolt).shuffleGrouping(SETENCE_SPOUT_ID);
topologyBuilder.setBolt(WORD_COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_SENTENCE_BOLT_ID, new Fields("word"));
topologyBuilder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(WORD_COUNT_BOLT_ID);
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology());
TimeUnit.DAYS.sleep(365);
cluster.shutdown();
}
}
第一种情况,发送相同的tuple到下一级不同的bolt。这个很简单,对每个bolt设定接收同一个源即可。
第二种情况,发送不同的tuple到下一级不同的bolt。这个时候,需要引入stream的概念,发送方发送a消息给接收方A’时使用stream A,发送b消息给B’时使用stream B。
在Topology提交时:
builder.setBolt(SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
SEQUENCE_SPOUT_NAME);
builder.setBolt(TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SPLIT_BOLT_NAME, // --- 发送方名字
TRADE_STREAM_ID); // --- 接收发送方该stream 的tuple
builder.setBolt(CUSTOMER_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
SPLIT_BOLT_NAME, // --- 发送方名字
CUSTOMER_STREAM_ID); // --- 接收发送方该stream 的tuple
在发送消息时,需要注明消息属于哪个流:
public void execute(Tuple tuple, BasicOutputCollector collector) {
Long tupleId = tuple.getLong(0);
Object obj = tuple.getValue(1);
if (obj instanceof TradeCustomer) {
TradeCustomer tradeCustomer = (TradeCustomer)obj;
Pair trade = tradeCustomer.getTrade();
Pair customer = tradeCustomer.getCustomer();
collector.emit(TRADE_STREAM_ID,
new Values(tupleId, trade)); //TRADE_STREAM_ID就是流名称
collector.emit(CUSTOMER_STREAM_ID,
new Values(tupleId, customer)); // CUSTOMER_STREAM_ID 就是流名称
}else if (obj != null){
LOG.info("Unknow type " + obj.getClass().getName());
}else {
LOG.info("Nullpointer " );
}
}
定义输出流格式:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(TRADE_STREAM_ID, new Fields("ID", "TRADE"));
declarer.declareStream(CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
}
接收消息时要判断:
if (input.getSourceStreamId().equals(TRADE_STREAM_ID) ) {
customer = pair;
customerTuple = input;
tradeTuple = tradeMap.get(tupleId);
if (tradeTuple == null) {
customerMap.put(tupleId, input);
return;
}
trade = (Pair) tradeTuple.getValue(1);
}
合流很简单,发送方无需做什么改动,提交Topology的时候,让合流的Bolt同时接受上游多个Bolt或Spout的消息即可。
合流Bolt接受消息时判断消息来自哪个源组件即可。
if (input.getSourceComponent().equals(CUSTOMER_BOLT_NAME) ) {
customer = pair;
customerTuple = input;
tradeTuple = tradeMap.get(tupleId);
if (tradeTuple == null) {
customerMap.put(tupleId, input);
return;
}
trade = (Pair) tradeTuple.getValue(1);
} else if (input.getSourceComponent().equals(TRADE_BOLT_NAME)) {
trade = pair;
tradeTuple = input;
customerTuple = customerMap.get(tupleId);
if (customerTuple == null) {
tradeMap.put(tupleId, input);
return;
}
customer = (Pair) customerTuple.getValue(1);
}