kafka_2.1.1-1.0.0的学习笔记共两篇,第一篇为java直接使用kafka的代码,第二篇为kafka整合spring的代码内容。

版本:kafka_2.11-1.0.0、spring4.3.13、java8、zookeeper-3.4.11

zookeeper和kafka安装的那些过程,我就不写了,挺简单的,去官网下载文件解压改一些日志目录运行就行了。zookeeper需要添加一下环境变量。

源代码下载地址:https://gitee.com/zhuhongliang/kafkaStudy.git

下面是发布者的代码:

package kafkaStudy2; 
//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月13日 下午2:41:07 
* 类说明 直接使用 kafka_2.11-1.0.0,未整合spring
*/
public class ProducerStudy {

    public static void main(String[] args) {
        
        //Assign topicName to string variable
        String topicName = "zhlTopic";//args[0].toString();
        
        // create instance for properties to access producer configs   
        Properties props = new Properties();
        
        props.put("group.id", "GroupZHL01");
        
        //Assign localhost id
        props.put("bootstrap.servers", "localhost:9092");
        
        //Set acknowledgements for producer requests.      
        props.put("acks", "all");
        
        //If the request fails, the producer can automatically retry,
        props.put("retries", 0);
        
        //Specify buffer size in config
        props.put("batch.size", 16384);
        
        //Reduce the no of requests less than 0   
        props.put("linger.ms", 1);
        
        //The buffer.memory controls the total amount of memory available to the producer for buffering.   
        props.put("buffer.memory", 33554432);
        
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
           
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        Producer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
              
        for(int i = 0; i < 10; i++)
        	producer.send(new ProducerRecord<Integer, String>(topicName, Integer.valueOf(i), Integer.toString(i)));
    	System.out.println("Message sent successfully");
    	producer.close();
    }

}

 

下面是消费者的代码:

package kafkaStudy2; 

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月13日 下午2:15:26 
* 类说明 直接使用 kafka_2.11-1.0.0,未整合spring
*/
public class ConsumerGroup {

    @SuppressWarnings("resource")
	public static void main(String[] args) {
         
         String topic = "zhlTopic";//args[0].toString();
         String group = "GroupZHL01";//args[1].toString();
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("client.id", "ClientZHL001");
         props.put("group.id", group);
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("session.timeout.ms", "30000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props);
         
         consumer.subscribe(Arrays.asList(topic));
         System.out.println("Subscribed to topic " + topic);
            
         while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(100);
               for (ConsumerRecord<Integer, String> record : records){
            	   System.out.println("=============ConsumerTest开始消费=============");
	               String topic1 = record.topic();
	               Integer key = record.key();
	               String value = record.value();
	               long offset = record.offset();
	               int partition = record.partition();
	               System.out.println("-------------topic:"+topic1);
	               System.out.println("-------------value:"+value);
	               System.out.println("-------------key:"+key);
	               System.out.println("-------------offset:"+offset);
	               System.out.println("-------------partition:"+partition);
	               System.out.println("~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~");
               }
            	  
         }     
      }  
    


}

 

分别运行两个程序就可以在eclipse的控制台看到如下(上面因为没有配置log4j):

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Message sent successfully
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Subscribed to topic zhlTopic
=============ConsumerTest开始消费=============
-------------topic:zhlTopic
-------------value:0
-------------key:0
-------------offset:480
-------------partition:0
~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~
=============ConsumerTest开始消费=============
-------------topic:zhlTopic
-------------value:1
-------------key:1
-------------offset:481
-------------partition:0
~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~
=============ConsumerTest开始消费=============
-------------topic:zhlTopic
-------------value:2
-------------key:2
-------------offset:482
-------------partition:0
~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~
=============ConsumerTest开始消费=============
-------------topic:zhlTopic
-------------value:3
-------------key:3
-------------offset:483
-------------partition:0
~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~
=============ConsumerTest开始消费=============
-------------topic:zhlTopic
-------------value:4
-------------key:4
-------------offset:484
-------------partition:0
~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~
=============ConsumerTest开始消费=============
-------------topic:zhlTopic
-------------value:5
-------------key:5
-------------offset:485
-------------partition:0
~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~
=============ConsumerTest开始消费=============
-------------topic:zhlTopic
-------------value:6
-------------key:6
-------------offset:486
-------------partition:0
~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~
=============ConsumerTest开始消费=============
-------------topic:zhlTopic
-------------value:7
-------------key:7
-------------offset:487
-------------partition:0
~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~
=============ConsumerTest开始消费=============
-------------topic:zhlTopic
-------------value:8
-------------key:8
-------------offset:488
-------------partition:0
~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~
=============ConsumerTest开始消费=============
-------------topic:zhlTopic
-------------value:9
-------------key:9
-------------offset:489
-------------partition:0
~~~~~~~~~~~~~ConsumerTest消费结束~~~~~~~~~~~~~

源代码下载地址:https://gitee.com/zhuhongliang/kafkaStudy.git

 

发表评论