第一篇是ActiveMQ的helloworld级别代码。

第二篇使用ActiveMQ整合Spring,非springmvc。(ActiveMQ整合SpringMVC的代码放在下一篇)

第三篇使用ActiveMQ整合SpringMVC

JDK8+spring4.1.3+ActiveMQ5.15.2

简单说明一下:

ActiveMQ ,queue的消费者不在也会给他保留,topic只有持久化订阅者会保留

(1)使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可。然后,随便一个消费者取走处理即可。某个消费者关掉一阵子,也无所谓。

(2)使用topic,即订阅时,每个消息可以有多个消费者,就麻烦一些。

首先,假设消费者都是普通的消费者,
————————
<1>activemq启动后,发布消息1,可惜,现在没有消费者启动着,也就是没有消费者进行了订阅。那么,这个消息就被抛弃了。

<2>消费者1启动了,连接了activemq,进行了订阅,在等待消息~~

activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。

<3>消费者2也启动了,连接了activemq,进行了订阅,在等待消息~~

activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。

<4>消费者1关掉了。

activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。

<5>消费者1又启动了。

activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
—————————–
总结一下:
activemq只是向当前启动的消费者发送消息。
关掉的消费者,会错过很多消息,并无法再次接收这些消息。

如果发送的消息是重要的用户同步数据,错过了,用户数据就不同步了。

那么,如何让消费者重新启动时,接收到错过的消息呢?

答案是持久订阅。

(3)普通的订阅,不区分消费者,场地里有几个人头,就扔几个馒头。
持久订阅,就要记录消费者的名字了。
A说,我是A,有馒头给我留着,我回来拿。
B说,我是B,有馒头给我留着,我回来拿。
activemq就记下A,B两个名字。

那么,分馒头时,还是一个人头给一个馒头。
分完了,一看张三没说话,说明他不在,给他留一个。
B说话了,那就不用留了。

A回来了,找activemq,一看,这不A吧,快把他的馒头拿来。
可能是一个馒头,也可能是100个馒头,就看A离开这阵子,分了多少次馒头了。

 

activemq区分消费者,是通过clientID和订户名称来区分的。

发布者的clientID可以不指定,不影响消息持久化

订阅者的clientID一定要指定,并且需要指定subscriber的name,才可以实现消息持久化
session.createDurableSubscriber(session.createTopic(“topic”), “c”); //方法 第一个参数是topic的名称,第二个参数为subscriber指定name

最关键的是,一定要在AMQ中创建Durable Topic Subscribers。
Subscribers要在发布消息之前创建,不然只能接收到创建之后的消息。除了在web管理页面手动创建,程序运行时指定subscribers的name时,若不存在也会自动创建。

 

我使用的ActiveMQ版本是5.15.2,官网下载的安装包里有java用的jar包:activemq-all-5.15.2.jar。jdk版本必须是1.8以上才可以使用

 

本篇先放上ActiveMQ单独使用的java代码(Queue+Topic),非整合Spring。(整合Spring的在下一篇)

本篇我们一共创建了四个文件,点对点消息队列的发布者和消费者,一对多订阅的发布者和消费者。

如图:

1.点对点消息列队Queue:

下面是发布点对点消息的代码:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


/** 
* @author 朱宏亮
* @version 创建时间:2017年12月1日 下午2:32:58 
* 类说明 
* 未整合spring
* 生产者
*/
public class Publisher {
    
    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //发送的消息数量
    private static final int SENDNUM = 6;
    
    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(Publisher.USERNAME, Publisher.PASSWORD, Publisher.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //创建一个名称为朱宏亮队列的点对点消息队列
            destination = session.createQueue("朱宏亮队列");
            //创建消息生产者
            messageProducer = session.createProducer(destination);
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    
    /**
     * 发送消息
     * @param session
     * @param messageProducer  消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < Publisher.SENDNUM; i++) {
            //创建一条文本消息 
            TextMessage message = session.createTextMessage("ActiveMQ 发送点对点消息" +i);
            System.out.println("发送点对点消息:Activemq 发送消息" + i);
            //通过消息生产者发出消息 
            messageProducer.send(message);
        }

    }
    
}

 

运行上面这个代码之后,我们能在eclipse的控制台看到:

INFO | Successfully connected to tcp://localhost:61616
 发送点对点消息:Activemq 发送消息0
 发送点对点消息:Activemq 发送消息1
 发送点对点消息:Activemq 发送消息2
 发送点对点消息:Activemq 发送消息3
 发送点对点消息:Activemq 发送消息4
 发送点对点消息:Activemq 发送消息5

打开ActiveMQ的web控制台,可以看到如图。

在控制台打开第二个选项Queues,可以看到,我创建了一个名为“朱宏亮队列”的点对点消息队列,并且有六条消息。说明上面的代码是正确的。

接下来我们来看一看消费者的代码。

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月1日 下午2:52:58 
* 类说明 
* 未整合spring
* 点对点消费者
*/
public class Consumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    
    
    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(Consumer.USERNAME, Consumer.PASSWORD, Consumer.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个连接HelloWorld的消息队列
            destination = session.createQueue("朱宏亮队列");
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            while (true) {
                //receive的参数是获取消息等待的最大时间
                TextMessage textMessage = (TextMessage) messageConsumer.receive(10000);
                
                if(textMessage != null){
                    System.out.println("点对点队列收到的消息:" + textMessage.getText());
                    
                }else {
                    System.out.println("退出" );
                    //这里需要手动关闭连接,不然即使执行了break之后,连接还会存在,导致程序不结束。
                    session.close();
                    connection.close();
                    break;
                }
            }
            
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
       
    
}

 

运行这段代码之后,我们能够在控制台看到:

INFO | Successfully connected to tcp://localhost:61616
 点对点队列收到的消息:ActiveMQ 发送点对点消息0
 点对点队列收到的消息:ActiveMQ 发送点对点消息1
 点对点队列收到的消息:ActiveMQ 发送点对点消息2
 点对点队列收到的消息:ActiveMQ 发送点对点消息3
 点对点队列收到的消息:ActiveMQ 发送点对点消息4
 点对点队列收到的消息:ActiveMQ 发送点对点消息5

打开ActiveMQ的web控制台,看看:

在控制台能够看到我们刚刚发布的六条点对点消息已经被消费了。说明代码没有问题。

2.一对多订阅消息Topic(发布/订阅)

下面是生产者的代码:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/** 
* @author 朱宏亮
* @version 创建时间:2017年12月5日 下午12:21:04 
* 类说明 
* 未整合spring
* 发布者订阅模式
*/
public class PublisherTopic {

    private static final Logger log =  LoggerFactory.getLogger(PublisherTopic.class);

      //连接账号
      private final static String userName = "admin";
      //连接密码
      private final static String password = "admin";
      //连接地址
      private final static String brokerURL = "tcp://localhost:61616";
      //connection的工厂
      private ConnectionFactory factory;
      //连接对象
      private Connection connection;
      //一个操作会话
      private Session session;
      //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
      private Destination destination;
      //生产者,就是产生数据的对象
      private MessageProducer producer;
      
      public static void main(String[] args) {
          
          PublisherTopic ser = new PublisherTopic();
      
          ser.send();
      }
      
      private void send(){
          try {
              
              //根据用户名,密码,url创建一个连接工厂
              factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
              
              //从工厂中获取一个连接
              connection = factory.createConnection();
              //connection.setClientID("Client-zhl"); // 为Connection指定ClientID,可以不设置
              connection.start();
              
              //创建一个session
              //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
              //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
              //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
              //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
              //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
              
              //创建一个到达的目的地
              destination = session.createTopic("topic朱宏亮"); //---------与点对点不同之处
              
              //从session中,获取一个消息生产者
              producer = session.createProducer(destination);
              
              //设置生产者的模式,有两种可选
              //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存,持久化
              //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
              producer.setDeliveryMode(DeliveryMode.PERSISTENT);//
              
              //创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
              TextMessage textMsg = session.createTextMessage("hello ActiveMQ");
            
              for(int i = 0 ; i < 10 ; i ++){
                  //发送一条消息
                  producer.send(textMsg);
                  log.info("发送消息成功:{},序号:{}",textMsg.getText(),i);
              }
              
              producer.close();
              session.close();
              connection.close();
          } catch (JMSException e) {
              log.error("method send,error msg:{}",e.getMessage());
          }
      }
}

运行这段代码之后看到eclipse的控制台:有十条topic的消息已经发送出去。

INFO | 发送消息成功:hello ActiveMQ,序号:0
 INFO | 发送消息成功:hello ActiveMQ,序号:1
 INFO | 发送消息成功:hello ActiveMQ,序号:2
 INFO | 发送消息成功:hello ActiveMQ,序号:3
 INFO | 发送消息成功:hello ActiveMQ,序号:4
 INFO | 发送消息成功:hello ActiveMQ,序号:5
 INFO | 发送消息成功:hello ActiveMQ,序号:6
 INFO | 发送消息成功:hello ActiveMQ,序号:7
 INFO | 发送消息成功:hello ActiveMQ,序号:8
 INFO | 发送消息成功:hello ActiveMQ,序号:9

看一下web控制台:

这里注意一下,topic的name不要以中文开头,否则会出现意想不到的意外,你在使用订阅者读取消息的时候可能接受不到信息,我这里使用中文开头的时候时好时坏,我没有做更具体的研究,实际开发的时候应该用英文吧。

从图上可以看到name为“topic朱宏亮”的topic产生了十条消息。单注意一点,在图中,10左边的那个数据是0,这是订阅者的数量,所以,这十条消息,不会被任何订阅者获取,即便我的订阅者没有任何bug。

所以正确的顺序应该为,先在ActiveMQ服务器上创建一个Topic,然后再添加订阅者(Subscriber),然后再向这个Topic添加消息数据,所有的消息只会被已存在的订阅者接受,即便这个订阅者不在线也可以在上线后接收到(但是要保证添加的消息为持久化的)。

Topic的创建可以在web控制台创建,或者再代码运行时会自动创建一个Topic。

Subscriber订阅者的创建也可以在控制台,或者第一次运行代码时自动创建。

下面看一下订阅者的代码:

 

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** 
* @author 朱宏亮
* @version 创建时间:2017年12月5日 下午12:30:08 
* 类说明 
* 未整合spring
* 消费者订阅模式
*/
public class ConsumerTopic {

    private static final Logger log = LoggerFactory.getLogger(ConsumerTopic.class);

      //连接账号
      private static final  String userName = "admin";
      //连接密码
      private static final String password = "admin";
      //连接地址
      private String brokerURL = "tcp://localhost:61616";
      //connection的工厂
      private ConnectionFactory factory;
      //连接对象
      private Connection connection;
      //一个操作会话
      private Session session;
      //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
      private Destination destination;
      //消费者,就是接收数据的对象
      private MessageConsumer consumer;
      private MessageConsumer consumer2;
      public static void main(String[] args) {
          ConsumerTopic cl = new ConsumerTopic();
          cl.getMsg();
      }
      
      public void getMsg(){
          try {
              //根据用户名,密码,url创建一个连接工厂
              factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
              //从工厂中获取一个连接
              connection = factory.createConnection();
              connection.setClientID("ClientID连接1"); // 为Connection指定ClientID
              connection.start();
              //创建一个session
              //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
              //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
              //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
              //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
              //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             
              /**
               * 使用这2行代码创建的consumer不能接收到持久化消息
               * 
               * 创建一个到达的目的地
               * destination = session.createTopic("topic");// ---------与点对点不同之处
               * 根据session,创建一个接收者对象
               * consumer = session.createConsumer(destination);
               */
              
              /**
               * 要实现接受持久化消息必须为subscriber指定name
               */
              //第一个消费者
              consumer = session.createDurableSubscriber(session.createTopic("topic朱宏亮"), "消费者一号"); // 为subscriber指定name
              //实现一个消息的监听器
              //实现这个监听器后,以后只要有消息,就会通过这个监听器接收到
              consumer.setMessageListener(new MessageListener() {
                  @Override
                  public void onMessage(Message message) {
                      try {
                          //获取到接收的数据
                          String text = ((TextMessage)message).getText();
                          log.info("消费者一号消息接收成功:{}",text);
                      } catch (JMSException e) {
                          log.error("消费者一号消息接收失败:{}",e.getMessage());
                      }
                  }
              });
              
              //第二个消费者
              consumer2 = session.createDurableSubscriber(session.createTopic("topic朱宏亮"), "消费者二号"); 
              consumer2.setMessageListener(new MessageListener() {
                  @Override
                  public void onMessage(Message message) {
                      try {
                          //获取到接收的数据
                          String text = ((TextMessage)message).getText();
                          log.info("消费者二号消息接收成功:{}",text);
                      } catch (JMSException e) {
                          log.error("消费者二号消息接收失败:{}",e.getMessage());
                      }
                  }
              });
              

          //消费方不要关闭,让它一直开着。
          } catch (JMSException e) {
              log.error("method getMsg,error msg:{}",e.getMessage());
          }
      }
}
 

运行之后,eclipse的控制台里并没有任何反应,原因我在上面已经说了,因为订阅者出现的太晚,之前的消息已经被抛弃。看看web控制台的界面:

第一条数据2,表示我们现在有了两个订阅者者(上面运行发布者时为0),第二条数据是10,是上面发布者发布的10条消息,第三条数据是0代表没有任何消息被订阅者消费。

下面我们再次运行发布者的代码,向Topic再次发布10条消息。(这时我们的订阅者程序并没有关闭,还在运行中)

eclipse控制台输出:

 

INFO | 发送消息成功:hello ActiveMQ,序号:0
 INFO | 发送消息成功:hello ActiveMQ,序号:1
 INFO | 发送消息成功:hello ActiveMQ,序号:2
 INFO | 发送消息成功:hello ActiveMQ,序号:3
 INFO | 发送消息成功:hello ActiveMQ,序号:4
 INFO | 发送消息成功:hello ActiveMQ,序号:5
 INFO | 发送消息成功:hello ActiveMQ,序号:6
 INFO | 发送消息成功:hello ActiveMQ,序号:7
 INFO | 发送消息成功:hello ActiveMQ,序号:8
 INFO | 发送消息成功:hello ActiveMQ,序号:9

 

INFO | 消费者一号消息接收成功:hello ActiveMQ
 INFO | 消费者二号消息接收成功:hello ActiveMQ
 INFO | 消费者一号消息接收成功:hello ActiveMQ
 INFO | 消费者二号消息接收成功:hello ActiveMQ
 INFO | 消费者一号消息接收成功:hello ActiveMQ
 INFO | 消费者二号消息接收成功:hello ActiveMQ
 INFO | 消费者一号消息接收成功:hello ActiveMQ
 INFO | 消费者二号消息接收成功:hello ActiveMQ
 INFO | 消费者一号消息接收成功:hello ActiveMQ
 INFO | 消费者二号消息接收成功:hello ActiveMQ
 INFO | 消费者一号消息接收成功:hello ActiveMQ
 INFO | 消费者二号消息接收成功:hello ActiveMQ
 INFO | 消费者一号消息接收成功:hello ActiveMQ
 INFO | 消费者二号消息接收成功:hello ActiveMQ
 INFO | 消费者一号消息接收成功:hello ActiveMQ
 INFO | 消费者二号消息接收成功:hello ActiveMQ
 INFO | 消费者一号消息接收成功:hello ActiveMQ
 INFO | 消费者二号消息接收成功:hello ActiveMQ
 INFO | 消费者一号消息接收成功:hello ActiveMQ
 INFO | 消费者二号消息接收成功:hello ActiveMQ

程序运行没有bug,很完美。再看看web控制台:

订阅者依然是2,消息增加了10条变为了20,第三个数据由0变为了20,这二十条消息和第一次发布的十条消息没有任何关系。他是我们第二次发布的十条消息分别被两个订阅者获取了一次,所以是二十,如果我们有3个订阅者,那么这里就应该是30,以此类推。

在web控制台的第四个选项Subscribers中我们能够看到订阅者的详情:

本篇结束,下一篇开始ActiveMQ整合Spring。

源代码下载地址:https://gitee.com/zhuhongliang/ActiveMQStudy

 

发表评论