1. KafkaProperties
package com.immooc.spark.kafka; public class KafkaProperties { public static final String ZK = "localhost:2181"; public static final String TOPIC = "test"; public static final String BROKER_LIST = "localhost:9092"; public static final String GROUP_ID = "test_group1"; }
2. KafkaProducer
package com.immooc.spark.kafka; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class KafkaProducer extends Thread{ private String topic; private Producer<Integer, String> producer; public KafkaProducer(String topic){ this.topic = topic; Properties properties = new Properties(); properties.put("metadata.broker.list", KafkaProperties.BROKER_LIST); properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("request.required.acks", "1"); producer = new Producer<Integer, String>(new ProducerConfig(properties)); } @Override public void run(){ int messageNo = 1; while (true){ String message = "message_" + messageNo; producer.send(new KeyedMessage<Integer, String>(topic, message)); System.out.println("Send: " + message); messageNo ++; try{ Thread.sleep(2000); }catch (Exception e){ e.printStackTrace(); } } } }
3. KafkaConsumer
package com.immooc.spark.kafka; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class KafkaConsumer extends Thread{ private String topic; public KafkaConsumer(String topic){ this.topic = topic; } private ConsumerConnector createConnector(){ Properties properties = new Properties(); properties.put("zookeeper.connect", KafkaProperties.ZK); properties.put("group.id", KafkaProperties.GROUP_ID); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } @Override public void run(){ ConsumerConnector consumer = createConnector(); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()){ String message = new String(iterator.next().message()); System.out.println("rec:" + message); } } }
http://www.waitingfy.com/archives/4202
0.9版本的太老了, 1.1的看这篇文章《kafka 简单 java 生产消费API 1.1》
4202