kafka_2.1.1-1.0.0的学习笔记共两篇,第一篇为java直接使用kafka的代码,第二篇为kafka整合spring的代码内容。
版本:kafka_2.11-1.0.0、spring4.3.13、java8、zookeeper-3.4.11
zookeeper和kafka安装的那些过程,我就不写了,挺简单的,去官网下载文件解压改一些日志目录运行就行了。zookeeper需要添加一下环境变量。
源代码下载地址:https://gitee.com/zhuhongliang/kafkaStudy.git
kafka_2.11-1.0.0整合spring4.3.13,我这里一共是一个发布者,两个消费者。
其中一个是用spring配置了DefaultKafkaConsumerFactory,使用它在java代码里创建一个Consumer,实现消费者。还有一个消费者是直接在spring里直接配置KafkaMessageListenerContainer,在java代码里继承MessageListener,实现消息自动监听。
看下文件目录:
先看下发布者的java代码KafakaProducerTest.java
package kafkaStudy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * @author 朱宏亮 * @version 创建时间:2017年12月17日 下午2:15:26 * 类说明 kafka_2.11-1.0.0整合spring4.3.13 发布者 */ @Component("KafkaProducerTest") public class KafkaProducerTest { @Autowired private KafkaTemplate<Integer, String> kafkaTemplate; public void testTemplateSend(Integer i){ kafkaTemplate.sendDefault(i, "你好,小宝贝:"+i); System.out.println("执行"+i); } @SuppressWarnings("resource") public static void main(String[] args){ ApplicationContext factory = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); KafkaProducerTest producer = (KafkaProducerTest) factory.getBean("KafkaProducerTest"); for(int i =0; i<10; i++){ producer.testTemplateSend(i); System.out.println("send:"+i); } System.out.println("send end!"); } }
发布者的xml文件kafkaProducer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 定义producer的参数 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="localhost:9092"/> <entry key="group.id" value="GroupZHL02"/> <entry key="retries" value="10"/> <entry key="batch.size" value="16384"/> <entry key="linger.ms" value="1"/> <entry key="buffer.memory" value="33554432"/> <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/> <!-- org.apache.kafka.common.serialization.IntegerSerializer --> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> </map> </constructor-arg> </bean> <!-- 创建kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties"/> </constructor-arg> </bean> <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory"/> <constructor-arg name="autoFlush" value="true"/> <property name="defaultTopic" value="zhlTopic"/> </bean> </beans>
自动监听订阅的java代码KafkaConListen.java
package kafkaStudy; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.kafka.listener.MessageListener; /** * @author 朱宏亮 * @version 创建时间:2017年12月17日 下午2:15:26 * 类说明 kafka_2.11-1.0.0整合spring4.3.13 自动监听 */ public class KafkaConListen implements MessageListener<Integer, String>{ public void onMessage(ConsumerRecord<Integer, String> record) { System.out.println("=============Listener开始消费============="); String topic = record.topic(); Integer key = record.key(); String value = record.value(); long offset = record.offset(); int partition = record.partition(); System.out.println("-------------topic:"+topic); System.out.println("-------------value:"+value); System.out.println("-------------key:"+key); System.out.println("-------------offset:"+offset); System.out.println("-------------partition:"+partition); System.out.println("~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~"); } /*@SuppressWarnings({ "resource", "unused" }) public static void main(String[] args){ ApplicationContext factory = new ClassPathXmlApplicationContext("classpath:kafkaConsumerListen.xml"); }*/ }
自动监听的xml配置kafkaConsumerListen.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 定义consumer的参数 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="localhost:9092"/> <entry key="client.id" value="ClientZHL1"/> <entry key="group.id" value="GroupZHL1"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="15000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/> <!-- org.apache.kafka.common.serialization.IntegerDeserializer --> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 创建consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> <!-- 实际执行消息消费的类 --> <bean id="messageListernerConsumerService" class="kafkaStudy.KafkaConListen"/> <!-- 消费者容器配置信息 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="zhlTopic"/> <property name="messageListener" ref="messageListernerConsumerService"/> </bean> <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> </bean> </beans>
spring的启动文件applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:p="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd" > <context:component-scan base-package="kafkaStudy"/> <import resource="classpath:kafkaProducer.xml" /> <import resource="classpath:kafkaConsumerListen.xml" /> </beans>
有了上面这个文件之后,直接运行KafkaProducerTest.java文件,就可以在eclipse的控制台上看到结果了:
log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 执行0 send:0 执行1 send:1 执行2 send:2 执行3 send:3 执行4 send:4 执行5 send:5 执行6 send:6 执行7 send:7 执行8 send:8 执行9 send:9 send end! =============Listener开始消费============= -------------topic:zhlTopic -------------value:你好,小宝贝:0 -------------key:0 -------------offset:510 -------------partition:0 ~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~ =============Listener开始消费============= -------------topic:zhlTopic -------------value:你好,小宝贝:1 -------------key:1 -------------offset:511 -------------partition:0 ~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~ =============Listener开始消费============= -------------topic:zhlTopic -------------value:你好,小宝贝:2 -------------key:2 -------------offset:512 -------------partition:0 ~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~ =============Listener开始消费============= -------------topic:zhlTopic -------------value:你好,小宝贝:3 -------------key:3 -------------offset:513 -------------partition:0 ~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~ =============Listener开始消费============= -------------topic:zhlTopic -------------value:你好,小宝贝:4 -------------key:4 -------------offset:514 -------------partition:0 ~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~ =============Listener开始消费============= -------------topic:zhlTopic -------------value:你好,小宝贝:5 -------------key:5 -------------offset:515 -------------partition:0 ~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~ =============Listener开始消费============= -------------topic:zhlTopic -------------value:你好,小宝贝:6 -------------key:6 -------------offset:516 -------------partition:0 ~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~ =============Listener开始消费============= -------------topic:zhlTopic -------------value:你好,小宝贝:7 -------------key:7 -------------offset:517 -------------partition:0 ~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~ =============Listener开始消费============= -------------topic:zhlTopic -------------value:你好,小宝贝:8 -------------key:8 -------------offset:518 -------------partition:0 ~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~ =============Listener开始消费============= -------------topic:zhlTopic -------------value:你好,小宝贝:9 -------------key:9 -------------offset:519 -------------partition:0 ~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~
或者如果不想使用MessageListener接口的话,可以消费者可以手动创建
KafkaConsumer.java
package kafkaStudy; import java.util.Arrays; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; /** * @author 朱宏亮 * @version 创建时间:2017年12月17日 下午2:15:26 * 类说明 kafka_2.11-1.0.0整合spring4.3.13 订阅者 */ public class KafkaConsumer { @SuppressWarnings({ "unchecked", "resource", "rawtypes" }) public static void main(String[] args){ ApplicationContext factory = new ClassPathXmlApplicationContext("classpath:kafkaConsumer.xml"); DefaultKafkaConsumerFactory ConFactory = (DefaultKafkaConsumerFactory) factory.getBean("consumerFactory"); Consumer consumer = ConFactory.createConsumer(); consumer.subscribe(Arrays.asList("zhlTopic")); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(100); for (ConsumerRecord<Integer, String> record : records){ System.out.println("=============Consumer开始消费============="); String topic = record.topic(); Integer key = record.key(); String value = record.value(); long offset = record.offset(); int partition = record.partition(); System.out.println("-------------topic:"+topic); System.out.println("-------------value:"+value); System.out.println("-------------key:"+key); System.out.println("-------------offset:"+offset); System.out.println("-------------partition:"+partition); System.out.println("~~~~~~~~~~~~~Consumer消费结束~~~~~~~~~~~~~"); } } } }
消费者的xml配置,kafkaConsumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 定义consumer的参数 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="localhost:9092"/> <entry key="client.id" value="ClientZHL2"/> <entry key="group.id" value="GroupZHL2"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="15000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <!-- org.apache.kafka.common.serialization.IntegerDeserializer --> </map> </constructor-arg> </bean> <!-- 创建consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> </beans>
然后直接运行KafkaConsumer.java也可以实现循环消费队列消息。
这里有一个坑!
如果你遇到如下报错:
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition zhlTopic-0 at offset 180. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
那一定是发布和订阅的代码的数据类型配错了。
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<Integer, String> producer = new KafkaProducer<Integer, String>(props); <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
比如说,在发布信息时key的数据类型设置为String类型,那么订阅者的key的数据类型也需要设置为String类型,不然会在消费信息时抛出一个反序列化的异常。发布者和订阅者的key和value的数据类型一定要一致。
源代码下载地址:https://gitee.com/zhuhongliang/kafkaStudy.git