第一篇是ActiveMQ的helloworld代码。
第二篇使用ActiveMQ整合Spring,非springmvc。(ActiveMQ整合SpringMVC的代码放在下一篇)
第三篇使用ActiveMQ整合SpringMVC。
JDK8+spring4.1.3+ActiveMQ5.15.2
文件:一共14个文件,其中8个java文件,6个xml配置文件。源代码下载地址在最下面↓↓↓
3.ActiveMQ+Spring,点对点信息发布与接收(Queue)
先看下发布者的代码和xml文件:
ProducerQueue.java
package spring; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; /** * @author 朱宏亮 * @version 创建时间:2017年12月5日 下午4:59:03 * 类说明 * 生产 */ public class ProducerQueue { // 负责消息的发送和接收可以理解为MessageProducer 和MessageConsummer的组合。 public static void main(String[] args) { System.out.println("发送消息 开始!"); ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ProducerQueue.xml"); JmsTemplate jt = null; // 获取JmsTemplate对象 jt = (JmsTemplate) ctx.getBean("jmsTemplate"); // 调用方法,发送消息 jt.send(new MessageCreator() { // 消息的产生,返回消息发送消息 public Message createMessage(Session s) throws JMSException { TextMessage msg = s.createTextMessage("Spring 发送消息 ----> 你好 ActiveMQ"); return msg; } }); System.out.println("发送消息 结束!"); System.exit(0);//因与ActiveMQ的连接没结束,手动结束程序 } }
ProducerQueue.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> <property name="sessionCacheSize" value="100" /> </bean> <!-- 定义消息队列(Queue) --> <bean id="QueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>queue朱宏亮队列</value> </constructor-arg> </bean> <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 --> <!-- <bean id="TopicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="朱宏亮订阅" /> </bean> --> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="QueueDestination" /> <!-- 内部的consumer在receive方法中阻塞的时间。默认为1秒。 --> <property name="receiveTimeout" value="10000" /> <!-- true是topic,false是queue,默认是false,此处显示写出false --> <property name="pubSubDomain" value="false" /> </bean> <!-- 在不试用自动监听的时候 下面所有的代码全部注释 --> <bean id="messageListener" class="spring.ConsumerListener1" /> <!-- and this is the message listener container --> <!-- <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="QueueDestination"/> <property name="messageListener" ref="messageListener" /> </bean> --> <!-- <bean id="messageListener2" class="spring.ConsumerListener2" /> <jms:listener-container connection-factory="connectionFactory" destination-type="queue" > <jms:listener destination="queue朱宏亮队列" subscription="订阅者一号" id="bb" ref="messageListener2" method="receiveMessage"/> </jms:listener-container> --> </beans>
下面看一下订阅者的代码和xml文件:
ConsumerQueue.java
package spring; import javax.jms.Destination; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; /** * @author 朱宏亮 * @version 创建时间:2017年12月7日 上午10:30:31 * 类说明 */ public class ConsumerQueue { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ConsumerQueue.xml"); // 获取JmsTemplate对象 JmsTemplate jt = (JmsTemplate) ctx.getBean("jmsTemplate2"); Destination destination = (Destination) ctx.getBean("QueueDestination2"); while(true){ TextMessage tm = (TextMessage) jt.receive(destination); try { System.out.println("从队列" + destination.toString() + "收到了消息:\t" + tm.getText()); } catch (Exception e) { //e.printStackTrace(); System.out.println("暂无消息"); break; } } System.out.println("end!"); System.exit(0);//因与ActiveMQ的连接没结束,手动结束程序 } }
ConsumerQueue.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <amq:connectionFactory id="amqConnectionFactory2" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory2" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory2"/> <property name="clientId" value="c1"/> </bean> <!-- 定义消息队列(Queue) --> <bean id="QueueDestination2" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>queue朱宏亮队列</value> </constructor-arg> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate2" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory2" /> <property name="defaultDestination" ref="QueueDestination2" /> <!-- 内部的consumer在receive方法中阻塞的时间。默认为1秒。 --> <property name="receiveTimeout" value="10000" /> <!-- true是topic,false是queue,默认是false,此处显示写出false --> <property name="pubSubDomain" value="false" /> </bean> </beans>
运行,ProducerQueue.java
可以在eclipse的控制台看到
发送消息 开始! 发送消息 结束!
web控制台里可以看到有一条消息已经到底服务器,还没有被消费,消费者数量为0.
然后再运行 ConsumerQueue.java
在eclipse控制台看到:
从队列queue://queue朱宏亮队列收到了消息: Spring 发送消息 ----> 你好 ActiveMQ 暂无消息 end!
再看看web控制台,刚刚发送到服务器的数据已经被消费了。
是不是很简单。
下面再看看让服务自动监听消息,当有消息队列信息发送到服务器的时候,消费者自动获取信息。
ConsumerListener1.java
package spring; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * @author 朱宏亮 * @version 创建时间:2017年12月7日 下午1:44:21 * 类说明 */ public class ConsumerListener1 implements MessageListener { public void onMessage(Message arg0) { // TODO Auto-generated method stub try { String message = ((TextMessage) arg0).getText(); System.out.println("Listener1 消费方接收消息:" + message); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
然后取消ProducerQueue.xml文件下面的注释
<bean id="messageListener" class="spring.ConsumerListener1" /> <!-- and this is the message listener container --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="QueueDestination"/> <property name="messageListener" ref="messageListener" /> </bean>
接着直接运行ProducerQueue.java文件,可以在eclipse控制台看到:
发送消息 开始! Listener1 消费方接收消息:Spring 发送消息 ----> 你好 ActiveMQ 发送消息 结束!
除了上面这种写法之外,listener还有另外一种,还是打开ProducerQueue.xml文件,将刚刚上面的代码注释之后,取消下面这段代码的注释:
<bean id="messageListener2" class="spring.ConsumerListener2" /> <jms:listener-container connection-factory="connectionFactory" destination-type="queue" > <jms:listener destination="queue朱宏亮队列" subscription="订阅者一号" id="bb" ref="messageListener2" method="receiveMessage"/> </jms:listener-container>
ConsumerListener2.java
package spring; /** * @author 朱宏亮 * @version 创建时间:2017年12月7日 下午1:49:41 * 类说明 */ public class ConsumerListener2 { public void receiveMessage(String message) { System.out.println("Listener2 消费方接收消息:"+message); } }
接着直接运行ProducerQueue.java文件,可以在eclipse控制台看到:
发送消息 开始! Listener2 消费方接收消息:Spring 发送消息 ----> 你好 ActiveMQ 发送消息 结束!
Queue消息队列只允许有一个消费者存在,所以在不使用的时候将他注释,或者删了代码。根据自己的业务需求来指定代码。
4.ActiveMQ+Spring,一对多发布/订阅(Topic)
在这里我使用了一个发布者和两个订阅者
还是先看一看发布者的java代码和xml代码
ProducerTopic.java
package spring; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; /** * @author 朱宏亮 * @version 创建时间:2017年12月7日 下午2:10:02 * 类说明 */ public class ProducerTopic { public static void main(String[] args) { System.out.println("发送消息 开始!"); ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ProducerTopic.xml"); JmsTemplate jt = null; // 获取JmsTemplate对象 jt = (JmsTemplate) ctx.getBean("jmsTemplate3"); // 调用方法,发送消息 jt.send(new MessageCreator() { // 消息的产生,返回消息发送消息 public Message createMessage(Session s) throws JMSException { TextMessage msg = s.createTextMessage("Spring 发送Topic消息 ----> 你好 ActiveMQ"); return msg; } }); System.out.println("发送消息 结束!"); System.exit(0);//因与ActiveMQ的连接没结束,手动结束程序 } }
ProducerTopic.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <amq:connectionFactory id="amqConnectionFactory3" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- 配置JMS连接工厂 --> <bean id="connectionFactory3" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory3" /> <property name="sessionCacheSize" value="100" /> </bean> <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 --> <bean id="TopicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="topic朱宏亮订阅" /> </bean> <!-- 配置JMS模板(Topic),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate3" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory3" /> <property name="defaultDestination" ref="TopicDestination"></property> <!-- 进行持久化 --> <property name="deliveryMode" value="2" /> <!-- 开启订阅模式 --> <property name="pubSubDomain" value="true" /> </bean> </beans>
ConsumerTopic1.java
package spring; import javax.jms.Destination; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; /** * @author 朱宏亮 * @version 创建时间:2017年12月7日 下午2:15:20 * 类说明 */ public class ConsumerTopic1 { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ConsumerTopic1.xml"); // 获取JmsTemplate对象 JmsTemplate jt = (JmsTemplate) ctx.getBean("jmsTemplate4"); Destination destination = (Destination) ctx.getBean("TopicDestination4"); while(true){ TextMessage tm = (TextMessage) jt.receive(destination); try { System.out.println("订阅者1::" + destination.toString() + "收到了消息:\t" + tm.getText()); } catch (Exception e) { //e.printStackTrace(); System.out.println("暂无消息"); } } } }
ConsumerTopic1.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <amq:connectionFactory id="amqConnectionFactory4" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory4" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory4"/> <property name="clientId" value="clientId21"/> </bean> <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 --> <bean id="TopicDestination4" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="topic朱宏亮订阅" /> </bean> <!-- 配置JMS模板(Topic)1,Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate4" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory4" /> <property name="defaultDestination" ref="TopicDestination4"></property> <!-- 进行持久化 --> <property name="deliveryMode" value="2" /> <!-- 开启订阅模式 --> <property name="pubSubDomain" value="true" /> </bean> </beans>
ConsumerTopic2.java
package spring; import javax.jms.Destination; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; /** * @author 朱宏亮 * @version 创建时间:2017年12月7日 下午2:15:20 * 类说明 */ public class ConsumerTopic2 { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ConsumerTopic2.xml"); // 获取JmsTemplate对象 JmsTemplate jt = (JmsTemplate) ctx.getBean("jmsTemplate5"); Destination destination = (Destination) ctx.getBean("TopicDestination5"); while(true){ TextMessage tm = (TextMessage) jt.receive(destination); try { System.out.println("订阅者2::" + destination.toString() + "收到了消息:\t" + tm.getText()); } catch (Exception e) { //e.printStackTrace(); System.out.println("暂无消息"); } } } }
ConsumerTopic2.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <amq:connectionFactory id="amqConnectionFactory5" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory5" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory5"/> <property name="clientId" value="clientId22"/> </bean> <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 --> <bean id="TopicDestination5" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="topic朱宏亮订阅" /> </bean> <!-- 配置JMS模板(Topic)2,Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate5" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory5" /> <property name="defaultDestination" ref="TopicDestination5"></property> <!-- 进行持久化 --> <property name="deliveryMode" value="2" /> <!-- 开启订阅模式 --> <property name="pubSubDomain" value="true" /> </bean> </beans>
上面六个文件,一个发布者java和xml,两个订阅者java和xml。
我们先运行两个订阅者的java文件:ConsumerTopic1.java 、 ConsumerTopic2.java
这两个程序执行之后会一直保持运行,直到有消息发布才会获取到消息并输出在eclipse的控制台
然后我们执行ProducerTopic.java文件,然后再eclipse控制台可以看到:
发送消息 开始! 发送消息 结束!
订阅者1::topic://topic朱宏亮订阅收到了消息: Spring 发送Topic消息 ----> 你好 ActiveMQ
订阅者2::topic://topic朱宏亮订阅收到了消息: Spring 发送Topic消息 ----> 你好 ActiveMQ
发布一条消息之后,同时被两个订阅者接收到消息,非常完美。
ActiveMQ web控制台的界面:
能看到有两个消费者,一条消息,被读取了两次。点击右边的按钮(Active Subscribers):
在上图中能看到两个订阅者客户端的Subscription Name为空,如果这个值为空的话是不能实现消息持久化的。在下面的代码里我会在xml文件中配置Subscription Name,实现消息持久化。
Topic模式我们也可以做Listener,看代码
ConsumerListener3.java
package spring; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * @author 朱宏亮 * @version 创建时间:2017年12月7日 下午3:53:45 * 类说明 */ public class ConsumerListener3 { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext( "spring/ConsumerListener3.xml"); while(true){ } } public void receiveMessage1(String message) { System.out.println("Listener1 订阅者接收消息:"+message); } public void receiveMessage2(String message) { System.out.println("Listener2 订阅者接收消息:"+message); } }
ConsumerListener3.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <amq:connectionFactory id="amqConnectionFactory6" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- 配置JMS连接工厂 --> <bean id="connectionFactory6" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory6" /> <property name="sessionCacheSize" value="100" /> <property name="clientId" value="clientId23"/> </bean> <!-- 在不试用自动监听的时候 下面所有的代码全部注释 --> <bean id="messageListener1" class="spring.ConsumerListener3" /> <!-- and this is the message listener container --> <jms:listener-container connection-factory="connectionFactory6" destination-type="durableTopic" > <jms:listener destination="topic朱宏亮订阅" subscription="订阅者一号" id="b1" ref="messageListener1" method="receiveMessage1"/> </jms:listener-container> <jms:listener-container connection-factory="connectionFactory6" destination-type="durableTopic" > <jms:listener destination="topic朱宏亮订阅" subscription="订阅者二号" id="b2" ref="messageListener1" method="receiveMessage2"/> </jms:listener-container> </beans>
关闭刚刚已经执行的几个程序,
首先运行ConsumerListener3.java,程序启动后会保持监听,直到有消息发布会自动打印在eclipse控制台里,
接下来运行ProducerTopic.java文件,然后可以在eclipse控制台看到:
发送消息 开始! 发送消息 结束!
Listener1 订阅者接收消息:Spring 发送Topic消息 ----> 你好 ActiveMQ Listener2 订阅者接收消息:Spring 发送Topic消息 ----> 你好 ActiveMQ
程序执行的非常完美。
在上图中能看到,我的两个订阅者使用的是同一个Client,但是我为每个订阅者配置了不同的Subscription Name。这样就可以实现消息持久化,即使在客户端没有开启的情况下,也不用担心漏掉消息,在下一次客户端启动的时候,就可以接收到离线时的信息。
源代码下载地址:https://gitee.com/zhuhongliang/ActiveMQStudy