第一篇是ActiveMQ的helloworld代码。

第二篇使用ActiveMQ整合Spring,非springmvc。(ActiveMQ整合SpringMVC的代码放在下一篇)

第三篇使用ActiveMQ整合SpringMVC

JDK8+spring4.1.3+ActiveMQ5.15.2

文件:一共14个文件,其中8个java文件,6个xml配置文件。源代码下载地址在最下面↓↓↓

3.ActiveMQ+Spring,点对点信息发布与接收(Queue)

先看下发布者的代码和xml文件:
ProducerQueue.java

package spring; 
import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
  
import org.springframework.context.ApplicationContext;  
import org.springframework.context.support.ClassPathXmlApplicationContext;  
import org.springframework.jms.core.JmsTemplate;  
import org.springframework.jms.core.MessageCreator; 
/** 
* @author 朱宏亮
* @version 创建时间:2017年12月5日 下午4:59:03 
* 类说明 
* 生产
*/
public class ProducerQueue {
 // 负责消息的发送和接收可以理解为MessageProducer 和MessageConsummer的组合。  
  
    public static void main(String[] args) {  
        System.out.println("发送消息 开始!");  
        ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ProducerQueue.xml");  
        JmsTemplate jt = null;  
        // 获取JmsTemplate对象  
        jt = (JmsTemplate) ctx.getBean("jmsTemplate");  
        // 调用方法,发送消息  

        jt.send(new MessageCreator() {  
            // 消息的产生,返回消息发送消息  
            public Message createMessage(Session s) throws JMSException {  
                TextMessage msg = s.createTextMessage("Spring 发送消息 ----> 你好 ActiveMQ");  
                return msg;  
            }  
        });  
        
        System.out.println("发送消息 结束!");  
        System.exit(0);//因与ActiveMQ的连接没结束,手动结束程序
    }  
}
 

ProducerQueue.xml

  <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">


    
    <amq:connectionFactory id="amqConnectionFactory" 
        brokerURL="tcp://127.0.0.1:61616" 
        userName="admin" 
        password="admin" />
    
    <!-- 配置JMS连接工厂 -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
 
    </bean>
    
    <!-- 定义消息队列(Queue) -->
    <bean id="QueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息队列的名字 -->
        <constructor-arg>
            <value>queue朱宏亮队列</value>
        </constructor-arg>
    </bean>
    
    <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->  
    <!-- <bean id="TopicDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg index="0" value="朱宏亮订阅" />  
    </bean>   -->
    
    <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    
        <property name="connectionFactory" ref="connectionFactory" />
        
        <property name="defaultDestination" ref="QueueDestination" />
        
        <!-- 内部的consumer在receive方法中阻塞的时间。默认为1秒。 -->
        <property name="receiveTimeout" value="10000" />
        <!-- true是topic,false是queue,默认是false,此处显示写出false -->
        <property name="pubSubDomain" value="false" /> 
        
    </bean>
    
    <!-- 在不试用自动监听的时候 下面所有的代码全部注释 -->
        
	<bean id="messageListener" class="spring.ConsumerListener1" />
	
	<!-- and this is the message listener container -->
	<!-- <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	    <property name="connectionFactory" ref="connectionFactory"/>
	    <property name="destination" ref="QueueDestination"/>
	    <property name="messageListener" ref="messageListener" />
	</bean> -->
<!-- 	
	<bean id="messageListener2" class="spring.ConsumerListener2" />
    <jms:listener-container connection-factory="connectionFactory" destination-type="queue"  >
        <jms:listener destination="queue朱宏亮队列"  subscription="订阅者一号" id="bb" ref="messageListener2" method="receiveMessage"/>    
    </jms:listener-container>  -->
    
</beans>  

下面看一下订阅者的代码和xml文件:
ConsumerQueue.java

package spring;

import javax.jms.Destination;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月7日 上午10:30:31 
* 类说明 
*/
public class ConsumerQueue {
    public static void main(String[] args) {  
        
        ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ConsumerQueue.xml");  

        // 获取JmsTemplate对象  
        JmsTemplate jt = (JmsTemplate) ctx.getBean("jmsTemplate2");  
        
        Destination destination =  (Destination) ctx.getBean("QueueDestination2");  

        
        while(true){
            TextMessage tm = (TextMessage) jt.receive(destination);
            try {
                
                System.out.println("从队列" + destination.toString() + "收到了消息:\t" + tm.getText());
         
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("暂无消息");
                break;
            }
        }
        
        System.out.println("end!");  
        System.exit(0);//因与ActiveMQ的连接没结束,手动结束程序
  
    }  
}
 

ConsumerQueue.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">
        
             
        <amq:connectionFactory id="amqConnectionFactory2" 
            brokerURL="tcp://127.0.0.1:61616" 
            userName="admin" 
            password="admin" />
            
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory2" class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="amqConnectionFactory2"/>
            <property name="clientId" value="c1"/>
            
        </bean>
        
	    <!-- 定义消息队列(Queue) -->
	    <bean id="QueueDestination2" class="org.apache.activemq.command.ActiveMQQueue">
	        <!-- 设置消息队列的名字 -->
	        <constructor-arg>
	            <value>queue朱宏亮队列</value>
	        </constructor-arg>
	    </bean>
	    
	    <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
	    <bean id="jmsTemplate2" class="org.springframework.jms.core.JmsTemplate">
	    
	        <property name="connectionFactory" ref="connectionFactory2" />
	        
	        <property name="defaultDestination" ref="QueueDestination2" />
	        
	        <!-- 内部的consumer在receive方法中阻塞的时间。默认为1秒。 -->
	        <property name="receiveTimeout" value="10000" />
	        <!-- true是topic,false是queue,默认是false,此处显示写出false -->
	        <property name="pubSubDomain" value="false" /> 
	        
        </bean>

</beans>  

运行,ProducerQueue.java 

可以在eclipse的控制台看到

发送消息 开始!
发送消息 结束!


web控制台里可以看到有一条消息已经到底服务器,还没有被消费,消费者数量为0.

然后再运行 ConsumerQueue.java

在eclipse控制台看到:

从队列queue://queue朱宏亮队列收到了消息: Spring 发送消息 ----> 你好 ActiveMQ
暂无消息
end!

再看看web控制台,刚刚发送到服务器的数据已经被消费了。

是不是很简单。

下面再看看让服务自动监听消息,当有消息队列信息发送到服务器的时候,消费者自动获取信息。
ConsumerListener1.java

package spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月7日 下午1:44:21 
* 类说明 
*/
public class ConsumerListener1 implements MessageListener {  
    
    public void onMessage(Message arg0) {  
        // TODO Auto-generated method stub  
        try {  
            String message = ((TextMessage) arg0).getText();  
            System.out.println("Listener1 消费方接收消息:" + message);  
        } catch (JMSException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
    }  

}

然后取消ProducerQueue.xml文件下面的注释

        <bean id="messageListener" class="spring.ConsumerListener1" />
	
	<!-- and this is the message listener container -->
	<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	    <property name="connectionFactory" ref="connectionFactory"/>
	    <property name="destination" ref="QueueDestination"/>
	    <property name="messageListener" ref="messageListener" />
	</bean>

接着直接运行ProducerQueue.java文件,可以在eclipse控制台看到:

发送消息 开始!
Listener1 消费方接收消息:Spring 发送消息 ----> 你好 ActiveMQ
发送消息 结束!

除了上面这种写法之外,listener还有另外一种,还是打开ProducerQueue.xml文件,将刚刚上面的代码注释之后,取消下面这段代码的注释:

    <bean id="messageListener2" class="spring.ConsumerListener2" />
    <jms:listener-container connection-factory="connectionFactory" destination-type="queue"  >
        <jms:listener destination="queue朱宏亮队列"  subscription="订阅者一号" id="bb" ref="messageListener2" method="receiveMessage"/>    
    </jms:listener-container>

ConsumerListener2.java

package spring; 
/** 
* @author 朱宏亮
* @version 创建时间:2017年12月7日 下午1:49:41 
* 类说明 
*/
public class ConsumerListener2 {
    
    public void receiveMessage(String message) {
        System.out.println("Listener2 消费方接收消息:"+message);
    }
    
}
 

接着直接运行ProducerQueue.java文件,可以在eclipse控制台看到:

 发送消息 开始!
Listener2 消费方接收消息:Spring 发送消息 ----> 你好 ActiveMQ
发送消息 结束!

Queue消息队列只允许有一个消费者存在,所以在不使用的时候将他注释,或者删了代码。根据自己的业务需求来指定代码。

4.ActiveMQ+Spring,一对多发布/订阅(Topic)

在这里我使用了一个发布者和两个订阅者
还是先看一看发布者的java代码和xml代码
ProducerTopic.java

package spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月7日 下午2:10:02 
* 类说明 
*/
public class ProducerTopic {

    
    public static void main(String[] args) {  
        System.out.println("发送消息 开始!");  
        ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ProducerTopic.xml");  
        JmsTemplate jt = null;  
        // 获取JmsTemplate对象  
        jt = (JmsTemplate) ctx.getBean("jmsTemplate3");  
        // 调用方法,发送消息  

        jt.send(new MessageCreator() {  
            // 消息的产生,返回消息发送消息  
            public Message createMessage(Session s) throws JMSException {  
                TextMessage msg = s.createTextMessage("Spring 发送Topic消息 ----> 你好 ActiveMQ");  
                return msg;  
            }  
        });  
        
        System.out.println("发送消息 结束!");  
        System.exit(0);//因与ActiveMQ的连接没结束,手动结束程序
    }  
}

ProducerTopic.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">


    
    <amq:connectionFactory id="amqConnectionFactory3" 
        brokerURL="tcp://127.0.0.1:61616" 
        userName="admin" 
        password="admin" />
    
    <!-- 配置JMS连接工厂 -->
    <bean id="connectionFactory3"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory3" />
        <property name="sessionCacheSize" value="100" />
      
    </bean>
    

    <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->  
    <bean id="TopicDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg index="0" value="topic朱宏亮订阅" />  
    </bean>
    
        
    <!-- 配置JMS模板(Topic),Spring提供的JMS工具类,它发送、接收消息。 -->
    <bean id="jmsTemplate3" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory3" />
        <property name="defaultDestination" ref="TopicDestination"></property>  
        <!-- 进行持久化 -->  
        <property name="deliveryMode" value="2" />  
        <!-- 开启订阅模式 -->  
        <property name="pubSubDomain" value="true" />  
    </bean>
   
</beans>  

 

ConsumerTopic1.java

package spring;

import javax.jms.Destination;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月7日 下午2:15:20 
* 类说明 
*/
public class ConsumerTopic1 {
    public static void main(String[] args) {  
        ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ConsumerTopic1.xml");  

        // 获取JmsTemplate对象  
        JmsTemplate jt = (JmsTemplate) ctx.getBean("jmsTemplate4");  
        
        Destination destination =  (Destination) ctx.getBean("TopicDestination4");  
        
        
        while(true){
            TextMessage tm = (TextMessage) jt.receive(destination);
            try {
                
                System.out.println("订阅者1::" + destination.toString() + "收到了消息:\t" + tm.getText());
         
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("暂无消息");
               
            }
        }
    }
}
 

ConsumerTopic1.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">
        
             
        <amq:connectionFactory id="amqConnectionFactory4" 
            brokerURL="tcp://127.0.0.1:61616" 
            userName="admin" 
            password="admin" />
            
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory4" class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="amqConnectionFactory4"/>
            <property name="clientId" value="clientId21"/>
          
        </bean>
        
	    <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->  
	    <bean id="TopicDestination4" class="org.apache.activemq.command.ActiveMQTopic">  
	        <constructor-arg index="0" value="topic朱宏亮订阅" />  
	    </bean>

        <!-- 配置JMS模板(Topic)1,Spring提供的JMS工具类,它发送、接收消息。 -->
	    <bean id="jmsTemplate4" class="org.springframework.jms.core.JmsTemplate">
	        <property name="connectionFactory" ref="connectionFactory4" />
	        <property name="defaultDestination" ref="TopicDestination4"></property>  
	        <!-- 进行持久化 -->  
	        <property name="deliveryMode" value="2" />  
	        <!-- 开启订阅模式 -->  
	        <property name="pubSubDomain" value="true" />    
	    </bean>
	    
    
</beans>  

ConsumerTopic2.java

package spring;

import javax.jms.Destination;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月7日 下午2:15:20 
* 类说明 
*/
public class ConsumerTopic2 {
    public static void main(String[] args) {  
        ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ConsumerTopic2.xml");  

        // 获取JmsTemplate对象  
        JmsTemplate jt = (JmsTemplate) ctx.getBean("jmsTemplate5");  
        
        Destination destination =  (Destination) ctx.getBean("TopicDestination5");  

        while(true){
            TextMessage tm = (TextMessage) jt.receive(destination);
            try {
                
                System.out.println("订阅者2::" + destination.toString() + "收到了消息:\t" + tm.getText());
         
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("暂无消息");
               
            }
        }
    }
}
 

ConsumerTopic2.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">
        
             
        <amq:connectionFactory id="amqConnectionFactory5" 
            brokerURL="tcp://127.0.0.1:61616" 
            userName="admin" 
            password="admin" />
            
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory5" class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="amqConnectionFactory5"/>
            <property name="clientId" value="clientId22"/>
      
        </bean>
        
        <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->  
        <bean id="TopicDestination5" class="org.apache.activemq.command.ActiveMQTopic">  
            <constructor-arg index="0" value="topic朱宏亮订阅" />  
        </bean>

        
        <!-- 配置JMS模板(Topic)2,Spring提供的JMS工具类,它发送、接收消息。 -->
        <bean id="jmsTemplate5" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory5" />
            <property name="defaultDestination" ref="TopicDestination5"></property>  
            <!-- 进行持久化 -->  
            <property name="deliveryMode" value="2" />  
            <!-- 开启订阅模式 -->  
            <property name="pubSubDomain" value="true" />    
        </bean>
        

</beans>  

上面六个文件,一个发布者java和xml,两个订阅者java和xml。

我们先运行两个订阅者的java文件:ConsumerTopic1.java 、 ConsumerTopic2.java

这两个程序执行之后会一直保持运行,直到有消息发布才会获取到消息并输出在eclipse的控制台

然后我们执行ProducerTopic.java文件,然后再eclipse控制台可以看到:

发送消息 开始!
发送消息 结束!
订阅者1::topic://topic朱宏亮订阅收到了消息: Spring 发送Topic消息 ----> 你好 ActiveMQ
订阅者2::topic://topic朱宏亮订阅收到了消息: Spring 发送Topic消息 ----> 你好 ActiveMQ

发布一条消息之后,同时被两个订阅者接收到消息,非常完美。

ActiveMQ web控制台的界面:

能看到有两个消费者,一条消息,被读取了两次。点击右边的按钮(Active Subscribers):

在上图中能看到两个订阅者客户端的Subscription Name为空,如果这个值为空的话是不能实现消息持久化的。在下面的代码里我会在xml文件中配置Subscription Name,实现消息持久化。

 

Topic模式我们也可以做Listener,看代码
ConsumerListener3.java

package spring;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月7日 下午3:53:45 
* 类说明 
*/
public class ConsumerListener3 {

    public static void main(String[] args) {  
        ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ConsumerListener3.xml");  
        while(true){
            
        }
    }
        

    
    public void receiveMessage1(String message) {
        System.out.println("Listener1 订阅者接收消息:"+message);
    }
    
    
    public void receiveMessage2(String message) {
        System.out.println("Listener2 订阅者接收消息:"+message);
    }
    
}
 

ConsumerListener3.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">


    
    <amq:connectionFactory id="amqConnectionFactory6" 
        brokerURL="tcp://127.0.0.1:61616" 
        userName="admin" 
        password="admin" />
    
    <!-- 配置JMS连接工厂 -->
    <bean id="connectionFactory6"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory6" />
        <property name="sessionCacheSize" value="100" />
        <property name="clientId" value="clientId23"/>
    </bean>
    

    
    <!-- 在不试用自动监听的时候 下面所有的代码全部注释 -->
    
    <bean id="messageListener1" class="spring.ConsumerListener3" />
    <!-- and this is the message listener container -->
    <jms:listener-container connection-factory="connectionFactory6" destination-type="durableTopic"  >
        <jms:listener destination="topic朱宏亮订阅"  subscription="订阅者一号" id="b1" ref="messageListener1" method="receiveMessage1"/>    
    </jms:listener-container> 
 
    <jms:listener-container connection-factory="connectionFactory6" destination-type="durableTopic"  >
        <jms:listener destination="topic朱宏亮订阅"  subscription="订阅者二号" id="b2" ref="messageListener1" method="receiveMessage2"/>    
    </jms:listener-container> 

        
        
</beans>  

关闭刚刚已经执行的几个程序,

首先运行ConsumerListener3.java,程序启动后会保持监听,直到有消息发布会自动打印在eclipse控制台里,

接下来运行ProducerTopic.java文件,然后可以在eclipse控制台看到:

发送消息 开始!
发送消息 结束!
Listener1 订阅者接收消息:Spring 发送Topic消息 ----> 你好 ActiveMQ
Listener2 订阅者接收消息:Spring 发送Topic消息 ----> 你好 ActiveMQ

程序执行的非常完美。

在上图中能看到,我的两个订阅者使用的是同一个Client,但是我为每个订阅者配置了不同的Subscription Name。这样就可以实现消息持久化,即使在客户端没有开启的情况下,也不用担心漏掉消息,在下一次客户端启动的时候,就可以接收到离线时的信息。

 

源代码下载地址:https://gitee.com/zhuhongliang/ActiveMQStudy

 

 

发表评论