初识ActiveMQ

2018-10-22 by Victor Lv Filed under 技术博客

ActiveMQ是一个遵循JMS规范的消息服务系统,开源,纯Java实现,但可以跨语言使用。 ActiveMQ(JMS)的主要有以下几个使用场景:

  1. 系统整合:同构/异构系统整合,分布式环境中
  2. 降低模块间耦合:比如AB两个系统,A性能瓶颈或宕机、系统差不干扰B
  3. 实现异步:1.推消息 2.削峰请求
  4. 数据同步:web应用->缓存,搜索,db

JMS 支持两种消息模型:

Point-to-Point(P2P)点对点模型

一种是 Point-to-Point(P2P)点对点模型,一个生产者对应一个消费者,生产者将消息发送到Queue-队列中,消息消费者从队列中接收消息。P2P 模式好比两人之间的信件传输,生产者A君将信件(Message)寄给信箱,指明它唯一的收件人B君,在B君上来取走之前,信件将一直存在于信箱当中,当作为消费者的B君出现并且取走了信件,信箱里也就没了这个信件了。

Publish/Subscribe 发布/订阅模型

另一种是 Publish/Subscribe 发布/订阅模型,此时,消息的双方不再是一对一的唯一对应关系了,而是一对多(一个消息发送者对应多个消息接收者)的关系。在这种模式下,消息中介叫做Topic,消息的发布者需向Topic发送消息,而消息的订阅者则需要在相应的 Topic 上进行注册,以便接收该 Topic 的消息。与 P2P 消息模型不同,Pub/Sub 模式下,消息发布者的消息将会被自动发送给所有订阅了该 Topic 的消息订阅者。普通订阅(默认)模式下,当消息接收者在某段时间断开了与消息服务器的连接时,在这个时间段内传递过来的消息将会丢失(相当于认为接收者已经取消了订阅); JMS 提供了持久订阅模式,当 Topic 被声明为持久订阅,并且消息接收者也注册为持久订阅者,那么系统仍会为该持久订阅者保留断连这段时间所产生的消息,直至该接收者重新上线并“消费”消息。Pub/Sub 模式,好比所有会员,包括付费会员[持久订阅者]和普通会员[非持久订阅者],大家都在一个微信群(Topic),某天群主(Publisher)在群里发了个红包(Message),群里所有会员(Subcriber)每人一个,对于付费会员(持久订阅者),即时它暂时不在线,系统也会为它保留红包,等它过后重新上线还能认领该红包,而非持久订阅者(普通会员)如果没即时领取,则将会失去该红包。另外,也可以像目前微信红包做的那样,设置一个红包(消息)过期时间,如果超过了设定的时间,付费会员(持久订阅者)仍未领取(消费),则系统会将该红包(消息)回收(丢弃)。

JMS 针对上述两种模式都可以设置消息的过期时间。

Demo

针对 ActiveMQ,本文下载了其 Windows 版搭建了一个单机版的用 Java 实现小 demo,搭建步骤和代码如下:

安装和部署 ActiveMQ Server

首先从 Apache ActiveMQ 官网下载 ActiveMQ Windows 版安装包,5.15.6 Release 版本下载地址:http://activemq.apache.org/activemq-5156-release.html。 下载完成后无需安装直接运行 ActiveMQ Server: 进入...apache-activemq-5.15.6/bin目录,在命令行中执行 start 命令:

./activemq start

执行后没报错的话会输入类似下面的信息: 启动 可以使用浏览器登录 ActiveMQ Server 后台确认 Server 是否启动成功,默认登录地址:http://localhost:8161/admin/

登录成功可见如下页面: 后台页面

好了,ActiveMQ Server 已经启动了,下面我们用 Java 去实现一个客户端调用,包括消息生产者和消费者。

P2P 模式版

上面说到 JMS 有两种消息模型,首先来看更简单些的 P2P 模型的 demo。

引入 jar 包依赖

首先在 Java 工程中引入刚才 ActiveMQ 下载目录下的 Java 依赖包 activemq-all-5.15.6.jar,Maven 工程的话在 pom.xml 中加入下述语句:

    <!-- jar version configuration -->
    <properties>
        <ActiveMQ.version>5.11.4</ActiveMQ.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>${ActiveMQ.version}</version>
        </dependency>
    </dependencies>

创建 Producer

然后创建一个消息生产者:

package ActiveMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQTextMessage;

import javax.jms.*;
import java.io.IOException;

/**
 * @ClassName: QueueProducer
 * @Description: TODO
 * @Author: Victor Lv (http://langlv.me)
 * @Date: 2018/10/19 9:46
 * @Version: 1.0
 */

public class QueueProducer {

    public static void createQueueProducer() throws JMSException, IOException {
        //1. 建立连接工厂,这里我的 ActiveMQ server 起在默认url
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                ActiveMQConnection.DEFAULT_BROKER_URL);

        MessageProducer producer = null;
        Connection connection = null;
        Session session = null;
        boolean transacted = Boolean.FALSE; //消息是否使用事务
        int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; //消息确认模式,AUTO_ACKNOWLEDGE--系统自动发送签收通知
        String queueName = "Queue1"; //队列名
        try {
            //2. 从连接工厂获取一个连接
            connection = connectionFactory.createConnection();
            connection.start(); //进行连接

            //3. 从本次连接中建立一个 session 会话
            session = connection.createSession(transacted, acknowledgeMode);

            //4. 创建/选择 一个消息队列
            Destination destination = session.createQueue(queueName);

            //5. 创建生产者
            producer = session.createProducer(destination);

            //6. 传送消息

            //传送 String文本消息
            TextMessage message = session.createTextMessage("Hello World!");
            producer.send(message);

        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //7. 关闭连接,回收资源
            producer.close();
            session.close();
            connection.close();
        }

    }

    public static void main(String[] args) throws JMSException, IOException {
        createQueueProducer();
    }
}

消息生产者的创建要经过几个步骤,前三个步骤跟 Mybatis/Hibernate 框架的处理差不多: 1. 创建 ConnectionFactory :也是类似 Mybatis/Hibernate 的连接工厂,实际项目中一般只会在项目初始化的时候创建一个连接工厂,后续每创建一个 Producer 只需要从工厂中直接获取连接即可; 2. 通过 ConnectionFactory 获取一个 Connection,并与 ActiveMQ Server 进行连接; 3. 通过 Connection 获取一个 Session会话; 4. 创建/选取一个消息队列: 根据 QueueName 在 ActiveMQ Server 中创建一个消息队列,如果该名称的队列在 Server 中已经存在,则系统会直接沿用并选中该队列,与该消息队列绑定之后,就可以生成一个 Session会话了; 5. 创建消息 Producer:目的地(Destination)和履带(Connecttion)都有了,就可以把生产机器(Producer)拿上来了。

因为我刚才启用 ActiveMQ Server 都是用的默认配置,所以在连接工厂处使用默认的 url、 username 和 userPassword 即可。 上面发的是一个 String 文本消息对象,也可以发送一个更复杂的消息对象到队列中:

//传送 Object 对象消息
WorkMessage workMessage = new WorkMessage(1, "Do something");
/**
 * 这里也可以采用 JMS 原生的 ObjectMessage;
 * 使用 ActiveMQ 的 ObjectMessage 有诸如压缩消息等更多功能,但带入了侵入式编程
 */

ActiveMQObjectMessage objectMessage =
        (ActiveMQObjectMessage) session.createObjectMessage(workMessage);
objectMessage.compress(); //压缩下message
producer.send(objectMessage);

上面代码中的 WorkMessage 构造如下:

package ActiveMQ;

import java.io.Serializable;

/**
 * @ClassName: WorkMessage
 * @Description: TODO
 * @Author: Victor Lv (http://langlv.me)
 * @Date: 2018/10/19 10:32
 * @Version: 1.0
 */

public class WorkMessage implements Serializable{

    int id;
    String command;

    public WorkMessage(int id, String command){
        this.id = id;
        this.command = command;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getCommand() {
        return command;
    }

    public void setCommand(String command) {
        this.command = command;
    }

    public String toString() {
        return "ID: " + id +", Command: " + command;
    }
}

好了,运行上面的 Producer 程序,即可告诉 Server 创建一个名为Queue1的消息队列,并发送第一个消息Hello World!

可以去 ActiveMQ Server 的后台查看下是否已添加了该 Queue: Queue

可以发现成功创建了一个 Name 是 Queue1 的队列,并且已经有一个消息入队了(Messages Enqueued == 1),目前该消息还没被消费(Messages Dequeued == 0)。

创建 Consumer

消费者的创建跟生产者的差不多,代码如下:

package ActiveMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ClassName: QueueConsumer
 * @Description: TODO
 * @Author: Victor Lv (http://langlv.me)
 * @Date: 2018/10/19 9:41
 * @Version: 1.0
 */

public class QueueConsumer {

    //消费者--P2P模式
    public static void QueueConsumer() throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                ActiveMQConnection.DEFAULT_BROKER_URL);

        MessageProducer producer = null;
        Connection connection = null;
        Session session = null;
        MessageConsumer consumer = null;
        boolean transacted = Boolean.FALSE;
        int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
        String queueName = "Queue1"; //队列名
        String clientID = "QueueConsumer1"; //消费者ID
        try {
            connection = connectionFactory.createConnection();
            connection.setClientID(clientID);
            connection.start();
            session = connection.createSession(transacted, acknowledgeMode);
            Destination destination = session.createQueue(queueName);

            //创建队列消费者
            consumer = session.createConsumer(destination);

            //启用消息监听
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    //接收String文本
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

        } catch (JMSException e) {
        }
        //不close连接,持续监听
    }

    public static void main(String[] args) throws JMSException {
        QueueConsumer();
    }
}

运行上述程序,将会创建一个 ID 为QueueConsumer1 的消费者,并启用消息监听,进程将会持续监听来自Queue1队列的消息,因为刚才我们已经在队列里塞了一个Hello World!的消息,所以在标准输出流会输出Hello World!字眼。 然后刷新下 Server 后台,可以发现Queue1队列多了一个消费者(Number Of Consumers == 1),并且消息出列数变为了1(Messages Dequeued == 1)也就是有一个消息被消费了。 Consumer

上面代码中的启用监听部分,我们可以把它独立出来单独实现一个 Listener 来处理消息:

//创建队列消费者
consumer = session.createConsumer(destination);
//启用自定义的 QueueListener 监听者
consumer.setMessageListener(new QueueListener());

自定义的 QueueListener如下:

package ActiveMQ;

import org.apache.activemq.command.ActiveMQObjectMessage;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

/**
 * @ClassName: QueueListener
 * @Description: TODO
 * @Author: Victor Lv (http://langlv.me)
 * @Date: 2018/10/19 11:19
 * @Version: 1.0
 */

public class QueueListener implements MessageListener {

    public void onMessage(Message message) {
//            //接收String文本
//            TextMessage textMessage = (TextMessage) message;

        //接收对象
        ActiveMQObjectMessage response = (ActiveMQObjectMessage) message;
        ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage) response.copy(); //copy一个复本出来使用

        try {
//                System.out.println(textMessage.getText());
            WorkMessage workMessage = (WorkMessage) activeMQObjectMessage.getObject();
            System.out.println(workMessage.toString());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Publish / Subscribe 模式版

Pub / Sub 模式下消息生产者和消费者的创建跟 P2P 模式的大体相似,不同于 P2P 模式的创建一个 Queue Destination ,Pub / Sub 模式创建的是 Topic

Topic 消息发布者

package ActiveMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ClassName: TopicPublisher
 * @Description: TODO
 * @Author: Victor Lv (http://langlv.me)
 * @Date: 2018/10/18 14:34
 * @Version: 1.0
 */

public class TopicPublisher {

    public static void createTopicProducer() throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                ActiveMQConnection.DEFAULT_BROKER_URL);

        MessageProducer producer = null;
        Connection connection = null;
        Session session = null;
        String topicName = "MyTopic";
        boolean transacted = Boolean.FALSE; //消息是否使用事务
        int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; //消息确认模式,AUTO_ACKNOWLEDGE--系统自动发送签收通知
        //*设置是否持久订阅: PERSISTENT--持久订阅模式; NON_PERSISTENT--普通订阅模式
        int deliveryMode = DeliveryMode.PERSISTENT;
        try {
            connection = connectionFactory.createConnection();
            connection.start();

            session = connection.createSession(transacted, acknowledgeMode);

            //根据 topicName 创建/选择一个 Topic
            Topic topic = session.createTopic(topicName);

            //生产者
            producer = session.createProducer(topic);

            producer.setDeliveryMode(deliveryMode);

            TextMessage message = session.createTextMessage("This is message.");
            producer.send(message);

        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            producer.close();
            session.close();
            connection.close();
        }

    }

    public static void main(String[] args) throws JMSException {
        createTopicProducer();
    }
}

上面的代码创建了一个名为Topic1的Topic。 在创建 Topic 时,可以指定消息的分发模式--DeliveryMode:PERSISTENT--持久订阅模式; NON_PERSISTENT--普通订阅模式,具体两种模式的作用如篇首所述。

Non Persistent Subscriber

普通订阅 / 非持久订阅的 Subscriber 创建没啥特殊的,实现如下:

package ActiveMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ClassName: NonPersistentSubscriber
 * @Description: TODO
 * @Author: Victor Lv (http://langlv.me)
 * @Date: 2018/10/19 9:36
 * @Version: 1.0
 */

public class NonPersistentSubscriber {

    //普通订阅消费者
    public static void nonPersistent() throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                ActiveMQConnection.DEFAULT_BROKER_URL);

        MessageProducer producer = null;
        Connection connection = null;
        Session session = null;
        MessageConsumer consumer = null;
        String clientID = "nonPersistent_Consumer";
        boolean transacted = Boolean.FALSE;
        int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
        String topicName = "MyTopic";
        try {
            connection = connectionFactory.createConnection();
            connection.setClientID(clientID);
            connection.start();
            session = connection.createSession(transacted, acknowledgeMode);
            Topic topic = session.createTopic(topicName);

            //创建普通订阅的消费者
            consumer = session.createConsumer(topic);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
        }
        //不close连接,持续监听
    }

    public static void main(String[] args) throws JMSException {
        nonPersistent();
    }
}

Persistent Subscriber

持久订阅的 Subscriber 创建有些不同,先看代码:

package ActiveMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ClassName: PersistentSubscriber
 * @Description: TODO
 * @Author: Victor Lv (http://langlv.me)
 * @Date: 2018/10/18 11:23
 * @Version: 1.0
 */

public class PersistentSubscriber {

    //持久订阅消费者1
    public static void persistentTest1() throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                ActiveMQConnection.DEFAULT_BROKER_URL);

        MessageProducer producer = null;
        Connection connection = null;
        Session session = null;
        MessageConsumer consumer = null;
        String clientID = "persistent_Consumer";
        String topicName = "MyTopic";
        String subscribeName = "Consumer";
        boolean transacted = Boolean.FALSE;
        int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
        try {
            connection = connectionFactory.createConnection();
            connection.setClientID(clientID); //持久订阅必须设置ClientID
            connection.start();
            session = connection.createSession(transacted, acknowledgeMode);
            Topic topic = session.createTopic(topicName);

            //创建持久订阅模式的消费者
            consumer = session.createDurableSubscriber(topic, subscribeName); //订阅名(类似笔名),与ClientID组合唯一确定一个消费者

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
        }
        //不close连接,持续监听
    }

    public static void main(String[] args) throws JMSException {
        persistentTest1();
    }
}

持久订阅的 Subscriber 创建需要使用 session.createDurableSubscriber()来创建(创建时需指定 Subscription Name),并且需要给它指定一个 ClientID

把上述NonPersistentSubscriber.javaPersistentSubscriber.java分别运行下,在 Server 后台能看到已经注册了这两个订阅者: Subscribers

运行下TopicPublisher.java发布一条消息,能看到正在监听的NonPersistentSubscriberPersistentSubscriber 都会打印消息:This is message.

然后把上面两个Subscriber都关闭,再次查看 Server 后台: Subscribers2 可以发现普通订阅者NonPersistentSubscriber已经不见了,只有一个ActiveNetwork状态都为falsePersistentSubscriber,这是因为非持久订阅者必须保持在线,一旦它跟 Server 失去了连接,那系统就会认为它取消了订阅;而持久化订阅者即使暂时不在线,也会作为已订阅者继续存在于系统当中。

Subscriber都离线的状态下,再次运行下TopicPublisher.java发布一条消息(把消息内容改为"This is message2."),运行完毕后分别启动NonPersistentSubscriberPersistentSubscriber。这时会发现持久订阅者PersistentSubscriber能够打印消息"This is message2.",说明系统为它保留了消息,而作为非持久订阅的NonPersistentSubscriber则没有消息打印。印证了上面关于持久订阅普通订阅的功能差异。


本文作者为 Victor Lv ,原出处为Victor Lv's Blog(http://langlv.me),转载请保留此句。


标签 Message Queue
Fork me on GitHub