1. KafkaProperties
package com.immooc.spark.kafka; public class KafkaProperties { public static final String ZK = "localhost:2181"; public static final String TOPIC = "test"; public static final String BROKER_LIST = "localhost:9092"; public static final String GROUP_ID = "test_group1"; }
2. KafkaProducerTest
package com.immooc.spark.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import static org.apache.kafka.clients.producer.ProducerConfig.*; public class KafkaProducerTest extends Thread{ private String topic; private KafkaProducer<Integer, String> producer; public KafkaProducerTest(String topic){ this.topic = topic; Properties properties = new Properties(); properties.put(BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.BROKER_LIST); properties.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ACKS_CONFIG, "1"); properties.put(LINGER_MS_CONFIG, 0); properties.put(BATCH_SIZE_CONFIG, 0); producer = new KafkaProducer<Integer, String>(properties); } @Override public void run(){ int messageNo = 1; while (true){ String message = "message_" + messageNo; producer.send(new ProducerRecord<Integer, String>(topic, message)); System.out.println("Send: " + message); messageNo ++; try{ Thread.sleep(2000); }catch (Exception e){ e.printStackTrace(); } } } }
3. KafkaConsumerTest
package com.immooc.spark.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; import static org.apache.kafka.clients.consumer.ConsumerConfig.*; public class KafkaConsumerTest extends Thread{ private String topic; public KafkaConsumerTest(String topic){ this.topic = topic; } private KafkaConsumer createConsumer(){ Properties properties = new Properties(); properties.put(BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.BROKER_LIST); properties.put(GROUP_ID_CONFIG, KafkaProperties.GROUP_ID); properties.put(ENABLE_AUTO_COMMIT_CONFIG, "true"); properties.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); properties.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return new KafkaConsumer(properties); } @Override public void run(){ KafkaConsumer consumer = createConsumer(); consumer.subscribe(Arrays.asList(this.topic)); while (true){ ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }4246
[…] 0.9版本的太老了, 1.1的看这篇文章《kafka 简单 java 生产消费API 1.1》 […]