Storm 整合kafka

1.依赖

<dependency>
          <groupId>org.apache.curator</groupId>
          <artifactId>curator-framework</artifactId>
          <version>${curator.version}</version>
        <exclusions>
          <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
          </exclusion>
        </exclusions>
      </dependency>
 <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>${curator.version}</version>
    </dependency>
<dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>${curator.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
 <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-kafka</artifactId>
      <version>${storm.version}</version>
      <type>jar</type>
      <exclusions>
        <exclusion>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-client</artifactId>
      </exclusion>

        <exclusion>
          <groupId>org.apache.curator</groupId>
          <artifactId>curator-framework</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.10.0.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.0.1</version>
    </dependency>

2.实现

package com.waiting;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;


import java.util.Map;
import java.util.UUID;


public class LocalKafKaTopology {

    public static class CountBolt extends BaseRichBolt {

        private OutputCollector collector;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
           this.collector = collector;

        }

        @Override
        public void execute(Tuple input) {
            try {
                String word = (String) input.getValue(0);

                System.out.println(word + ".......");

                this.collector.ack(input);
            }catch (Exception e){
                this.collector.fail(input);
            }

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    public static void main(String[] args){
        TopologyBuilder builder = new TopologyBuilder();


        BrokerHosts zk = new ZkHosts("localhost:2181");

        SpoutConfig spoutConfig = new SpoutConfig(zk, "test", "/test" , UUID.randomUUID().toString());
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);



        builder.setSpout("KafkaSpout", kafkaSpout);
        builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("KafkaSpout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalKafKaTopology", new Config(), builder.createTopology());




    }
}

http://www.waitingfy.com/archives/4561

4561

One Response to Storm 整合kafka

Leave a Reply

Name and Email Address are required fields.
Your email will not be published or shared with third parties.