Maven 依赖添加
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>${storm.version}</version> <type>jar</type> </dependency>
package com.waiting; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.redis.bolt.RedisStoreBolt; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.shade.org.apache.commons.io.FileUtils; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.io.File; import java.io.IOException; import java.util.*; public class LocalWordCountRedisStormTopology { public static class DataSourceSpout extends BaseRichSpout { private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public static final String[] words = new String[]{"apple", "orange", "pineapple", "bannaer"}; @Override public void nextTuple() { Random random = new Random(); String word = words[random.nextInt(words.length)]; this.collector.emit(new Values(word)); System.out.println("word:" + word); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line") ); } } public static class SplitBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String word = input.getStringByField("line"); this.collector.emit(new Values(word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class CountBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } Map<String, Integer> map = new HashMap<String, Integer>(); @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = map.get(word); if(count == null){ count = 0; } count ++; map.put(word, count); this.collector.emit(new Values(word, map.get(word))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static class WordCountStoreMapper implements RedisStoreMapper { private RedisDataTypeDescription description; private final String hashKey = "wordCount"; public WordCountStoreMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.HASH, hashKey); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { return tuple.getStringByField("word"); } @Override public String getValueFromTuple(ITuple tuple) { return tuple.getIntegerByField("count") + ""; } } public static void main(String[] args){ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt"); JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost("127.0.0.1").setPort(6379).setPassword("test").build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper); builder.setBolt("RedisStoreBolt", storeBolt).shuffleGrouping("CountBolt"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountStormTopology", new Config(), builder.createTopology()); } }4521