Topologies
The logic for a realtime application is packaged into a Storm topology. A Storm topology is analogous to a MapReduce job. One key difference is that a MapReduce job eventually finishes, whereas a topology runs forever (or until you kill it, of course). A topology is a graph of spouts and bolts that are connected with stream groupings. These concepts are described below.
Spouts
A spout is a source of streams in a topology. Generally spouts will read tuples from an external source and emit them into the topology (e.g. a Kestrel queue or the Twitter API). Spouts can either be reliable or unreliable. A reliable spout is capable of replaying a tuple if it failed to be processed by Storm, whereas an unreliable spout forgets about the tuple as soon as it is emitted.
Spouts can emit more than one stream. To do so, declare multiple streams using the declareStream
method of OutputFieldsDeclarer and specify the stream to emit to when using the emit
method on SpoutOutputCollector.
The main method on spouts is nextTuple
. nextTuple
either emits a new tuple into the topology or simply returns if there are no new tuples to emit. It is imperative that nextTuple
does not block for any spout implementation, because Storm calls all the spout methods on the same thread.
The other main methods on spouts are ack
and fail
. These are called when Storm detects that a tuple emitted from the spout either successfully completed through the topology or failed to be completed. ack
and fail
are only called for reliable spouts. See the Javadoc for more information.
Bolts
All processing in topologies is done in bolts. Bolts can do anything from filtering, functions, aggregations, joins, talking to databases, and more.
Bolts can do simple stream transformations. Doing complex stream transformations often requires multiple steps and thus multiple bolts. For example, transforming a stream of tweets into a stream of trending images requires at least two steps: a bolt to do a rolling count of retweets for each image, and one or more bolts to stream out the top X images (you can do this particular stream transformation in a more scalable way with three bolts than with two).
Bolts can emit more than one stream. To do so, declare multiple streams using the declareStream
method of OutputFieldsDeclarer and specify the stream to emit to when using the emit
method on OutputCollector.
When you declare a bolt’s input streams, you always subscribe to specific streams of another component. If you want to subscribe to all the streams of another component, you have to subscribe to each one individually. InputDeclarer has syntactic sugar for subscribing to streams declared on the default stream id. Saying declarer.shuffleGrouping("1")
subscribes to the default stream on component “1” and is equivalent to declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)
.
The main method in bolts is the execute
method which takes in as input a new tuple. Bolts emit new tuples using the OutputCollector object. Bolts must call the ack
method on the OutputCollector
for every tuple they process so that Storm knows when tuples are completed (and can eventually determine that its safe to ack the original spout tuples). For the common case of processing an input tuple, emitting 0 or more tuples based on that tuple, and then acking the input tuple, Storm provides an IBasicBolt interface which does the acking automatically.
Its perfectly fine to launch new threads in bolts that do processing asynchronously. OutputCollector is thread-safe and can be called at any time.
package com.waiting; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Map; public class LocalSumStormAckerTopology { public static class DataSourceSpout extends BaseRichSpout{ private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } int number = 0; @Override public void nextTuple() { ++number; this.collector.emit(new Values(number), number); System.out.println("Spout: " + number); Utils.sleep(1000); } @Override public void ack(Object msgId) { // super.ack(msgId); System.out.println("ack invoked ..." + msgId); } @Override public void fail(Object msgId) { // super.fail(msgId); System.out.println(" fail invoked ..." + msgId); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("num")); } } public static class SumBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } int sum = 0; @Override public void execute(Tuple input) { Integer value = input.getIntegerByField("num"); sum += value; if(value > 0 && value <= 10){ this.collector.ack(input); }else{ this.collector.fail(input); } System.out.println("bolt: sum = [" + sum + "]"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args){ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); builder.setBolt("SumBolt", new SumBolt()).shuffleGrouping("DataSourceSpout"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalSumStormAckerTopology", new Config(), builder.createTopology()); } }4382