kafka_2.1.1-1.0.0的学习笔记共两篇,第一篇为java直接使用kafka的代码,第二篇为kafka整合spring的代码内容。
版本:kafka_2.11-1.0.0、spring4.3.13、java8、zookeeper-3.4.11
zookeeper和kafka安装的那些过程,我就不写了,挺简单的,去官网下载文件解压改一些日志目录运行就行了。zookeeper需要添加一下环境变量。
源代码下载地址:https://gitee.com/zhuhongliang/kafkaStudy.git
下面是发布者的代码:
package kafkaStudy2; //import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; /** * @author 朱宏亮 * @version 创建时间:2017年12月13日 下午2:41:07 * 类说明 直接使用 kafka_2.11-1.0.0,未整合spring */ public class ProducerStudy { public static void main(String[] args) { //Assign topicName to string variable String topicName = "zhlTopic";//args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); props.put("group.id", "GroupZHL01"); //Assign localhost id props.put("bootstrap.servers", "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<Integer, String> producer = new KafkaProducer<Integer, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<Integer, String>(topicName, Integer.valueOf(i), Integer.toString(i))); System.out.println("Message sent successfully"); producer.close(); } }
下面是消费者的代码:
package kafkaStudy2; import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; /** * @author 朱宏亮 * @version 创建时间:2017年12月13日 下午2:15:26 * 类说明 直接使用 kafka_2.11-1.0.0,未整合spring */ public class ConsumerGroup { @SuppressWarnings("resource") public static void main(String[] args) { String topic = "zhlTopic";//args[0].toString(); String group = "GroupZHL01";//args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("client.id", "ClientZHL001"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(100); for (ConsumerRecord<Integer, String> record : records){ System.out.println("=============ConsumerTest开始消费============="); String topic1 = record.topic(); Integer key = record.key(); String value = record.value(); long offset = record.offset(); int partition = record.partition(); System.out.println("-------------topic:"+topic1); System.out.println("-------------value:"+value); System.out.println("-------------key:"+key); System.out.println("-------------offset:"+offset); System.out.println("-------------partition:"+partition); System.out.println("~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~"); } } } }
分别运行两个程序就可以在eclipse的控制台看到如下(上面因为没有配置log4j):
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Message sent successfully
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Subscribed to topic zhlTopic =============ConsumerTest开始消费============= -------------topic:zhlTopic -------------value:0 -------------key:0 -------------offset:480 -------------partition:0 ~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~ =============ConsumerTest开始消费============= -------------topic:zhlTopic -------------value:1 -------------key:1 -------------offset:481 -------------partition:0 ~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~ =============ConsumerTest开始消费============= -------------topic:zhlTopic -------------value:2 -------------key:2 -------------offset:482 -------------partition:0 ~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~ =============ConsumerTest开始消费============= -------------topic:zhlTopic -------------value:3 -------------key:3 -------------offset:483 -------------partition:0 ~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~ =============ConsumerTest开始消费============= -------------topic:zhlTopic -------------value:4 -------------key:4 -------------offset:484 -------------partition:0 ~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~ =============ConsumerTest开始消费============= -------------topic:zhlTopic -------------value:5 -------------key:5 -------------offset:485 -------------partition:0 ~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~ =============ConsumerTest开始消费============= -------------topic:zhlTopic -------------value:6 -------------key:6 -------------offset:486 -------------partition:0 ~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~ =============ConsumerTest开始消费============= -------------topic:zhlTopic -------------value:7 -------------key:7 -------------offset:487 -------------partition:0 ~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~ =============ConsumerTest开始消费============= -------------topic:zhlTopic -------------value:8 -------------key:8 -------------offset:488 -------------partition:0 ~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~ =============ConsumerTest开始消费============= -------------topic:zhlTopic -------------value:9 -------------key:9 -------------offset:489 -------------partition:0 ~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~
源代码下载地址:https://gitee.com/zhuhongliang/kafkaStudy.git