kafka_2.1.1-1.0.0的学习笔记共两篇,第一篇为java直接使用kafka的代码,第二篇为kafka整合spring的代码内容。

版本:kafka_2.11-1.0.0、spring4.3.13、java8、zookeeper-3.4.11

zookeeper和kafka安装的那些过程,我就不写了,挺简单的,去官网下载文件解压改一些日志目录运行就行了。zookeeper需要添加一下环境变量。

源代码下载地址:https://gitee.com/zhuhongliang/kafkaStudy.git

kafka_2.11-1.0.0整合spring4.3.13,我这里一共是一个发布者,两个消费者。

其中一个是用spring配置了DefaultKafkaConsumerFactory,使用它在java代码里创建一个Consumer,实现消费者。还有一个消费者是直接在spring里直接配置KafkaMessageListenerContainer,在java代码里继承MessageListener,实现消息自动监听。

看下文件目录:

先看下发布者的java代码KafakaProducerTest.java

package kafkaStudy;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;  

import org.springframework.stereotype.Component;  

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月17日 下午2:15:26 
* 类说明 kafka_2.11-1.0.0整合spring4.3.13 发布者
*/

@Component("KafkaProducerTest")
public class KafkaProducerTest {
	
	 @Autowired  
	 private KafkaTemplate<Integer, String> kafkaTemplate;  
	 
	 
	 public void testTemplateSend(Integer i){  
	 	kafkaTemplate.sendDefault(i, "你好,小宝贝:"+i);  
	 	System.out.println("执行"+i);
	 }
	 
	 @SuppressWarnings("resource")
	 public static void main(String[] args){
		ApplicationContext factory = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
		KafkaProducerTest producer = (KafkaProducerTest) factory.getBean("KafkaProducerTest");
		for(int i =0; i<10; i++){
			producer.testTemplateSend(i);
			System.out.println("send:"+i);
		}
		System.out.println("send end!");
	 }
}

发布者的xml文件kafkaProducer.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     xmlns:context="http://www.springframework.org/schema/context"  
     xsi:schemaLocation="http://www.springframework.org/schema/beans  
         http://www.springframework.org/schema/beans/spring-beans.xsd  
         http://www.springframework.org/schema/context  
         http://www.springframework.org/schema/context/spring-context.xsd">  
      
      
     <!-- 定义producer的参数 -->  
     <bean id="producerProperties" class="java.util.HashMap">  
        <constructor-arg>  
            <map>  
                <entry key="bootstrap.servers" value="localhost:9092"/>  
                <entry key="group.id" value="GroupZHL02"/>  
                <entry key="retries" value="10"/>  
                <entry key="batch.size" value="16384"/>  
                <entry key="linger.ms" value="1"/>  
                <entry key="buffer.memory" value="33554432"/>  
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>  
                <!-- org.apache.kafka.common.serialization.IntegerSerializer -->
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>  
            </map>  
        </constructor-arg>  
     </bean>  
       
     <!-- 创建kafkatemplate需要使用的producerfactory bean -->  
     <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">  
        <constructor-arg>  
            <ref bean="producerProperties"/>  
        </constructor-arg>  
     </bean>  
       
     <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->  
     <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">  
        <constructor-arg ref="producerFactory"/>  
        <constructor-arg name="autoFlush" value="true"/>  
        <property name="defaultTopic" value="zhlTopic"/>  
     </bean>  
     
 
 
</beans>

自动监听订阅的java代码KafkaConListen.java

package kafkaStudy;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.kafka.listener.MessageListener;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月17日 下午2:15:26 
* 类说明 kafka_2.11-1.0.0整合spring4.3.13 自动监听
*/
public class KafkaConListen implements MessageListener<Integer, String>{

	public void onMessage(ConsumerRecord<Integer, String> record) {
		
		System.out.println("=============Listener开始消费=============");
        String topic = record.topic();
        Integer key = record.key();
        String value = record.value();
        long offset = record.offset();
        int partition = record.partition();
        System.out.println("-------------topic:"+topic);
        System.out.println("-------------value:"+value);
        System.out.println("-------------key:"+key);
        System.out.println("-------------offset:"+offset);
        System.out.println("-------------partition:"+partition);
        System.out.println("~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~");
	}
	
	/*@SuppressWarnings({  "resource", "unused" })
	public static void main(String[] args){
		ApplicationContext factory = new ClassPathXmlApplicationContext("classpath:kafkaConsumerListen.xml");

	}*/
	
	
}

自动监听的xml配置kafkaConsumerListen.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     xmlns:context="http://www.springframework.org/schema/context"  
     xsi:schemaLocation="http://www.springframework.org/schema/beans  
         http://www.springframework.org/schema/beans/spring-beans.xsd  
         http://www.springframework.org/schema/context  
         http://www.springframework.org/schema/context/spring-context.xsd">  
      

    <!-- 定义consumer的参数 -->  
     <bean id="consumerProperties" class="java.util.HashMap">  
        <constructor-arg>  
            <map>  
                <entry key="bootstrap.servers" value="localhost:9092"/>  
                <entry key="client.id" value="ClientZHL1"/>  
                <entry key="group.id" value="GroupZHL1"/>  
                <entry key="enable.auto.commit" value="true"/>  
                <entry key="auto.commit.interval.ms" value="1000"/>  
                <entry key="session.timeout.ms" value="15000"/>  
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>  
                <!-- org.apache.kafka.common.serialization.IntegerDeserializer -->
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>  
            </map>  
        </constructor-arg>  
     </bean>  
       
     <!-- 创建consumerFactory bean -->  
     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">  
        <constructor-arg>  
            <ref bean="consumerProperties"/>  
        </constructor-arg>  
     </bean>  
       
     <!-- 实际执行消息消费的类 -->  
     <bean id="messageListernerConsumerService" class="kafkaStudy.KafkaConListen"/>  
       
     <!-- 消费者容器配置信息 -->  
     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">  
        <constructor-arg value="zhlTopic"/>  
        <property name="messageListener" ref="messageListernerConsumerService"/>  
     </bean>
       
     <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法   -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">  
        <constructor-arg ref="consumerFactory"/>  
        <constructor-arg ref="containerProperties"/>
	</bean>
  
</beans>

spring的启动文件applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:p="http://www.springframework.org/schema/aop"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
	 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd 
	 http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd 
	 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
	 http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"
	>
	

	<context:component-scan base-package="kafkaStudy"/>
	<import resource="classpath:kafkaProducer.xml" />
	<import resource="classpath:kafkaConsumerListen.xml" />
	
</beans>

有了上面这个文件之后,直接运行KafkaProducerTest.java文件,就可以在eclipse的控制台上看到结果了:

log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
执行0
send:0
执行1
send:1
执行2
send:2
执行3
send:3
执行4
send:4
执行5
send:5
执行6
send:6
执行7
send:7
执行8
send:8
执行9
send:9
send end!
=============Listener开始消费=============
-------------topic:zhlTopic
-------------value:你好,小宝贝:0
-------------key:0
-------------offset:510
-------------partition:0
~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~
=============Listener开始消费=============
-------------topic:zhlTopic
-------------value:你好,小宝贝:1
-------------key:1
-------------offset:511
-------------partition:0
~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~
=============Listener开始消费=============
-------------topic:zhlTopic
-------------value:你好,小宝贝:2
-------------key:2
-------------offset:512
-------------partition:0
~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~
=============Listener开始消费=============
-------------topic:zhlTopic
-------------value:你好,小宝贝:3
-------------key:3
-------------offset:513
-------------partition:0
~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~
=============Listener开始消费=============
-------------topic:zhlTopic
-------------value:你好,小宝贝:4
-------------key:4
-------------offset:514
-------------partition:0
~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~
=============Listener开始消费=============
-------------topic:zhlTopic
-------------value:你好,小宝贝:5
-------------key:5
-------------offset:515
-------------partition:0
~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~
=============Listener开始消费=============
-------------topic:zhlTopic
-------------value:你好,小宝贝:6
-------------key:6
-------------offset:516
-------------partition:0
~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~
=============Listener开始消费=============
-------------topic:zhlTopic
-------------value:你好,小宝贝:7
-------------key:7
-------------offset:517
-------------partition:0
~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~
=============Listener开始消费=============
-------------topic:zhlTopic
-------------value:你好,小宝贝:8
-------------key:8
-------------offset:518
-------------partition:0
~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~
=============Listener开始消费=============
-------------topic:zhlTopic
-------------value:你好,小宝贝:9
-------------key:9
-------------offset:519
-------------partition:0
~~~~~~~~~~~~~Listener消费结束~~~~~~~~~~~~~

或者如果不想使用MessageListener接口的话,可以消费者可以手动创建

KafkaConsumer.java

package kafkaStudy;

import java.util.Arrays;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月17日 下午2:15:26 
* 类说明 kafka_2.11-1.0.0整合spring4.3.13 订阅者
*/
public class KafkaConsumer {

	@SuppressWarnings({ "unchecked", "resource", "rawtypes" })
	public static void main(String[] args){
			
			ApplicationContext factory = new ClassPathXmlApplicationContext("classpath:kafkaConsumer.xml");
			DefaultKafkaConsumerFactory ConFactory = (DefaultKafkaConsumerFactory) factory.getBean("consumerFactory");
			Consumer consumer = ConFactory.createConsumer();
			consumer.subscribe(Arrays.asList("zhlTopic"));
			while (true) {
	            ConsumerRecords<Integer, String> records = consumer.poll(100);
	               for (ConsumerRecord<Integer, String> record : records){
	            	   System.out.println("=============Consumer开始消费=============");
	                   String topic = record.topic();
	                   Integer key = record.key();
	                   String value = record.value();
	                   long offset = record.offset();
	                   int partition = record.partition();
	                   System.out.println("-------------topic:"+topic);
	                   System.out.println("-------------value:"+value);
	                   System.out.println("-------------key:"+key);
	                   System.out.println("-------------offset:"+offset);
	                   System.out.println("-------------partition:"+partition);
	                   System.out.println("~~~~~~~~~~~~~Consumer消费结束~~~~~~~~~~~~~");
	               }
	                  
	         }
		 }
}

消费者的xml配置,kafkaConsumer.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     xmlns:context="http://www.springframework.org/schema/context"  
     xsi:schemaLocation="http://www.springframework.org/schema/beans  
         http://www.springframework.org/schema/beans/spring-beans.xsd  
         http://www.springframework.org/schema/context  
         http://www.springframework.org/schema/context/spring-context.xsd">  
      

    <!-- 定义consumer的参数 -->  
     <bean id="consumerProperties" class="java.util.HashMap">  
        <constructor-arg>  
            <map>  
                <entry key="bootstrap.servers" value="localhost:9092"/>  
                <entry key="client.id" value="ClientZHL2"/>  
                <entry key="group.id" value="GroupZHL2"/>  
                <entry key="enable.auto.commit" value="true"/>  
                <entry key="auto.commit.interval.ms" value="1000"/>  
                <entry key="session.timeout.ms" value="15000"/>  
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>  
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>  
                <!-- org.apache.kafka.common.serialization.IntegerDeserializer -->
            </map>  
        </constructor-arg>  
     </bean>  
       
     <!-- 创建consumerFactory bean -->  
     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">  
        <constructor-arg>  
            <ref bean="consumerProperties"/>   
        </constructor-arg>  
       
     </bean>  
  
</beans>

然后直接运行KafkaConsumer.java也可以实现循环消费队列消息。

这里有一个坑!

如果你遇到如下报错:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition zhlTopic-0 at offset 180. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

那一定是发布和订阅的代码的数据类型配错了。

 
    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
    <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>  
    <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>

比如说,在发布信息时key的数据类型设置为String类型,那么订阅者的key的数据类型也需要设置为String类型,不然会在消费信息时抛出一个反序列化的异常。发布者和订阅者的key和value的数据类型一定要一致。

源代码下载地址:https://gitee.com/zhuhongliang/kafkaStudy.git

 

发表评论