Storm介绍

Storm是一个分布式调度系统,用户按照接口实现一个计算任务,然后把这个任务交给Storm系统。Storm将这个任务跑起来,并且7*24小时运行,一旦中间一个Worker挂了,调度器会立即重启一个Work替换这个失效的Worker。

本文所说的Storm均指阿里开发的JStorm。

把任务交给Storm的好处有:接口简单,开发快;当速度慢时,配置一下并发数,实现性能的线性扩展;健壮性好,自动进行故障恢复;可以启动Ack机制,保证数据不丢失;延时低。

Storm最适合的场景是无状态计算,即计算依赖的数据全部在接收的消息里,且一个消息的计算不依赖于另一个消息。典型应用场景有:日志分析、管道系统、消息转化器、统计分析器。

基本组件

  • Spout:数据源,可以是Kafka、HBase、HDFS等。核心接口是nextTuple,用于获取下一条消息。Storm框架会不断地调用这个接口,从数据源拉取数据,发送给下游Bolt。
  • Bolt:任务处理逻辑;Bolt之间可以建立连接关系,从而形成一个处理流水线;通常最后一个Bolt会做一些数据的存储工作,如写入DB、HBase等,供前台业务查询和展现。核心接口是execute,内容是具体业务逻辑的实现。

Spout和Bolt组合起来就是一个任务,称为Topology。向Storm集群提交一个Topology后,框架会对它进行调度和执行。

调度和执行

提交Topology的时候,用户可以指定总worker数,即系统处理这个任务所使用的进程数。用户还可以指定每个组件的并行度。比如一个Topology含有一个Spout和一个Bolt,Spout的并行度是5,Bolt的并行度是10。那么我们最终会有15个线程,5个Spout线程,10个Bolt线程。Storm中,每个执行线程都有一个task id,从1开始递增,每一个组件的task id是连续的。

于是这个Topology共有3个进程和15个线程。线程会分布在3个进程中。Storm使用了一种均匀的调度算法,因此在执行的时候,你会看到,每个进程分别都各有5个线程在执行。当然,由于spout是5个线程,不能均匀地分配到3个进程中,会出现一个进程只有1个spout线程的情况;同样地,也会出现一个进程中有4个bolt线程的情况。

消息通信

Spout的消息会发送给特定的Bolt,Bolt也可以发送给其他的Bolt,这之间的通信机制如下:从Spout发送消息的时候,Storm会计算出消息要发送的目标task id列表,然后看目标task id是在本进程中还是在其他进程中。如果在本进程中,直接走进程内部通信;如果是跨进程,会通过netty将消息发送到目标task中。

Grouping方式

  • fieldsGrouping:保证相同的key的数据会发送到相同的task,原理是对某个或某几个字段做hash,然后用hash结果求模得到目标task id。
  • globalGrouping:发送到目标组件的第一个task。
  • shuffleGrouping:平均分配tuple到目标组件的各个task。
  • noneGrouping:随机分配tuple到目标组件的各个task,无法保证平均。
  • allGrouping:发送给目标组件的所有task。
  • directGrouping:发送指定目标task。

WordCount

Maven的pom.xml如下:

<dependency>
    <groupId>com.alibaba.jstorm</groupId>
    <artifactId>jstorm-core</artifactId>
    <version>2.1.1</version>
    <scope>compile</scope>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-nop</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>slf4j-jdk14</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>log4j-over-slf4j</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>log4j</artifactId>
            <groupId>log4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>commons-logging</artifactId>
            <groupId>commons-logging</groupId>
        </exclusion>
    </exclusions>
</dependency>

SentenceSpout.java:

public class SentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector spoutOutputCollector;
    private int index = 0;

    private static final String[] sentences = {
            "The logic for a realtime application is packaged into a Storm topology",
            "A Storm topology is analogous to a MapReduce job",
            "One key difference is that a MapReduce job eventually finishes whereas a topology runs forever",
            " A topology is a graph of spouts and bolts that are connected with stream groupings"
    };

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    public void nextTuple() {
        spoutOutputCollector.emit(new Values(sentences[index]));
        ++index;
        if(index >= sentences.length) {
            index = 0;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentences"));
    }
}

SPlitSentenceBolt.java

public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector outputCollector;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentences");
        String[] words = sentence.split(" ");
        for(String word: words) {
            outputCollector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

WordCount.java

public class WordCountBolt extends BaseRichBolt {
    private Map<String, Long> wordCount = null;
    private OutputCollector outputCollector;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
        wordCount = new HashMap<String, Long>();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = wordCount.get(word);
        if(count == null) {
            count = 0L;
        }
        ++count;
        wordCount.put(word, count);
        outputCollector.emit(new Values(word, count));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }
}

ReportBolt.java

public class ReportBolt extends BaseRichBolt {
    private static final Logger log = LoggerFactory.getLogger(ReportBolt.class);
    private Map<String, Long> counts = null;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        counts = new HashMap<String, Long>();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = tuple.getLongByField("count");
        counts.put(word, count);
        printReport();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    private void printReport() {
        log.info("-------------begin------------");
        Set<String> words = counts.keySet();
        for(String word: words) {
            log.info("@report-bolt@: " + word + " ---> " + counts.get(word));
        }
        log.info("-------------end--------------");
    }

}

WordCountTopology.java

public class WordCountTopology {
    private static final Logger log = LoggerFactory.getLogger(WordCountTopology.class);

    private final static String SETENCE_SPOUT_ID = "sentence-spout";
    private final static String SPLIT_SENTENCE_BOLT_ID = "split-bolt";
    private final static String WORD_COUNT_BOLT_ID = "count-bolt";
    private final static String REPORT_BOLT_ID = "report-bolt";

    private final static String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception{
        log.info(".......begining.......");

        SentenceSpout sentenceSpout = new SentenceSpout();
        SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();
        WordCountBolt wordCountBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout(SETENCE_SPOUT_ID, sentenceSpout);
        topologyBuilder.setBolt(SPLIT_SENTENCE_BOLT_ID, splitSentenceBolt).shuffleGrouping(SETENCE_SPOUT_ID);
        topologyBuilder.setBolt(WORD_COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_SENTENCE_BOLT_ID, new Fields("word"));
        topologyBuilder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(WORD_COUNT_BOLT_ID);

        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology());
        TimeUnit.DAYS.sleep(365);
        cluster.shutdown();
    }
}

数据分流与合流

分流

第一种情况,发送相同的tuple到下一级不同的bolt。这个很简单,对每个bolt设定接收同一个源即可。

第二种情况,发送不同的tuple到下一级不同的bolt。这个时候,需要引入stream的概念,发送方发送a消息给接收方A’时使用stream A,发送b消息给B’时使用stream B。

在Topology提交时:

builder.setBolt(SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
        SEQUENCE_SPOUT_NAME);
                
builder.setBolt(TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
    SPLIT_BOLT_NAME,  // --- 发送方名字
    TRADE_STREAM_ID); // --- 接收发送方该stream 的tuple
            
builder.setBolt(CUSTOMER_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
    SPLIT_BOLT_NAME, // --- 发送方名字
    CUSTOMER_STREAM_ID);      // --- 接收发送方该stream 的tuple

在发送消息时,需要注明消息属于哪个流:

public void execute(Tuple tuple, BasicOutputCollector collector) {
     Long tupleId = tuple.getLong(0);
     Object obj = tuple.getValue(1);
     
     if (obj instanceof TradeCustomer) {
     
         TradeCustomer tradeCustomer = (TradeCustomer)obj;
         
         Pair trade = tradeCustomer.getTrade();
         Pair customer = tradeCustomer.getCustomer();
            
         collector.emit(TRADE_STREAM_ID, 
                new Values(tupleId, trade)); //TRADE_STREAM_ID就是流名称
         collector.emit(CUSTOMER_STREAM_ID, 
                new Values(tupleId, customer)); // CUSTOMER_STREAM_ID 就是流名称
     }else if (obj != null){
         LOG.info("Unknow type " + obj.getClass().getName());
     }else {
         LOG.info("Nullpointer " );
     }
 }

定义输出流格式:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declareStream(TRADE_STREAM_ID, new Fields("ID", "TRADE"));
  declarer.declareStream(CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
 }

接收消息时要判断:

if (input.getSourceStreamId().equals(TRADE_STREAM_ID) ) {
      customer = pair;
      customerTuple = input;
      
      tradeTuple = tradeMap.get(tupleId);
      if (tradeTuple == null) {
          customerMap.put(tupleId, input);
          return;
      }
      
      trade = (Pair) tradeTuple.getValue(1);
      
}

合流

合流很简单,发送方无需做什么改动,提交Topology的时候,让合流的Bolt同时接受上游多个Bolt或Spout的消息即可。

合流Bolt接受消息时判断消息来自哪个源组件即可。

   if (input.getSourceComponent().equals(CUSTOMER_BOLT_NAME) ) {
      customer = pair;
      customerTuple = input;
      
      tradeTuple = tradeMap.get(tupleId);
      if (tradeTuple == null) {
          customerMap.put(tupleId, input);
          return;
      }
      
      trade = (Pair) tradeTuple.getValue(1);
      
  } else if (input.getSourceComponent().equals(TRADE_BOLT_NAME)) {
      trade = pair;
      tradeTuple = input;
      
      customerTuple = customerMap.get(tupleId);
      if (customerTuple == null) {
          tradeMap.put(tupleId, input);
          return;
      }
      
      customer = (Pair) customerTuple.getValue(1);
  }