您的位置 首页 java

Java,ActiveMQ,发布订阅模式,自动应答模式案例

JMS(Java Messaging Service)

JSM是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的生产、发送、接收消息的接口简化企业应用的开发。

JMS本身只定义了一系列的接口规范,是一种与厂商无关的API,用来访问消息收发系统。

消息中间件一般有两种传递模式:发布-订阅模式(Pub/Sub)和点对点模式(P2P)

Publish/Subscribe(Pub/Sub)发布/订阅模式(Topic主题模型)

包含三个角色,主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者;发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当发布一个消息,所有的订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝;

1、每个消息可以有多个消费者;

2、发布者和订阅者之间有时间上的依赖性(先订阅主题,再来发送消息);

3、订阅者必须保持运行的状态,才能接受发布者发布的消息;

P2P(Point to Point)点对点模型(Queue队列模型):

即生成者和消费者的消息往来;每个消息都被发送到特定的消息队列,接收者从队列中获取消息,队列保证这消息,直到被消费或超时;

1、每个消息只有一个消费者(Consumer),一旦被消费,消息就不在消息队列中了;

2、发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列;

3、接收者在成功接收消息之后需向队列应答成功。

JMS协议

JMS协议组成结构:

1、JMS Provider,消息中间件/消息服务器;

2、JMS Producer,消息生产者;

3、JMS Consumer,消息消费者;

4、JMS Message,消息; 三部分组成:消息头、消息体、消息属性;

消息头(Header)属性:

1、JMSDestination,消息发送的目的地,是一个Topic或Queue;设置者:send;

2、JMSDeliveryMode,消息的发送模式,分为NON_PERSISTENT和PERSISTENT,即持久化的和非持久化的;设置者:send;

3、JMSMessageID,消息ID,需要以ID:开头;设置者:send;

4、JMSTimestamp,消息发送时的时间,也可以理解为调用send()方法时的时间,而不是该消息发送完成的时间;设置者:send;

5、JMSCorrelationID,关联的消息ID,这个通常用在需要回传消息的时候;设置者:send;

6、JMSReplyTo,消息回复的目的地,其值为一个Topic或Queue, 这个由发送者设置,但是接收者可以决定是否响应;设置者:send;

7、JMSRedelivered,消息是否重复发送过,如果该消息之前发送过,那么这个属性的值需要被设置为true, 客户端可以根据这个属性的值来确认这个消息是否重复发送过,以避免重复处理;设置者:Provider;

8、JMSType,由消息发送者设置的个消息类型,代表消息的结构,有的消息中间件可能会用到这个,但这个并不是是批消息的种类,比如TextMessage之类的;设置者:client;

9、JMSExpiration,消息的过期时间,以毫秒为单位,根据定义,它应该是timeToLive的值再加上发送时的GMT时间,也就是说这个指的是过期时间,而不是有效期;设置者:client;

10、JMSPriority,消息的优先级,0-4为普通的优化级,而5-9为高优先级,通常情况下,高优化级的消息需要优先发送;设置者:client;

Message消息体(Properties):

1、Text message (文本消息);一个字符串使用;

2、MapMessage(键值对消息);一套名称/值使用;

3、ObjectMessage(对象消息);序列化Java对象;

4、BytesMessage(bytes消息);一个字节的数据流;

5、StreamMessage(流消息);Java原始值的数据流;

Message消息属性:

1、JMSXUserID:发送消息的用户识别,发送时提供商设置;

2、JMSXappID:发送消息的应用标识,发送时提供商设置;

3、JMSXdeliveryCount:转发消息重试次数,从1开始,发送方提供商设置;

4、JMSXGroupID:消息所在消息组的用户标识,由客户端设置;

5、JMSXGroupSeq:组内消息的序号,从1开始.由客户端设置;

6、JMSXProducerTEID:产生消息的事务的事务表示,发送方提供商设置;

7、JMSConsumerTXID:消费消息的事务的事务表示,接收方提供设置;

8、JMSXRevTimestamp:JMS转发消息到消费者的事件,接收方提供设置;

9、JMState:假设有个消息仓库,它存储每个消息的单独拷贝,从原始消息被发送时开始,状态有1(等待)、2(准备)、3(到期)、4(保留),由于状态和生产者和消费者无关,所以它不是由他们提供,它只和仓库查找消息相关,因此JMS没有提供这中API,由提供商设置;

自动应答模式

Session session = connection.createSession( false , Session.AUTO_ACKNOWLEDGE);

第1个参数:是否支持事务,如果为true,则会忽略第2个参数(消息确认模式),自动被JMS服务器设置为SESSION_TRANSACTED,第2个参数:消息确认模式。

案例代码:

生产者:

 import org. apache .activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQProducerTopic {

    //activemq的服务地址  这里使用的是 tcp 协议,源码里可以看到
    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException {
        //创建连接工厂 ,按照定的url地址给定默认的用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂获取connection连接,并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地(选择是队列还是主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        //创建消息的生产者
        MessageProducer messageProducer = session.createProducer(topic);
        //通过使用消息生产者messageProducer生产3条消息发送到队列中
        for (int i = 1; i <= 3; i++) {
            //创建消息   一个字符串消息
            TextMessage textMessage = session.createTextMessage("topic---->" + i);
            //通过messageProducer 发布消息
            messageProducer.send(textMessage);
        }
        //关闭资源
        messageProducer. close ();
        session.close();
        connection.close();
        System.out.println("topic消息发送到MQ成功");
    }
}  

消费者:

 import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;


public class ActiveMQConsumerTopic {

    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂获取connection连接 并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //创建会话session,需要两个参数,第一个事务,第二个签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地(选择是队列还是主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        //创建消息的消费者
        MessageConsumer messageConsumer = session.createConsumer(topic);
        //通过监听的机制消费消息
        messageConsumer.setMessageListener((message) -> {
            if (message != null && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费者接受到消息topic---->" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // 不关闭控制台,如果不加这句话,在下面可能在连接的时候直接关闭了,造成无法消费的问题
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}  

接收消息,必须在线才能接收到消息。

签收方式:Session.AUTO_ACKNOWLEDGE(自动应答),当客户端从receiver()或onMessage()成功返回时,Session自动签收客户端的这条消息的收条。

签收,客户端成功接收一条消息的标志是这条消息被签收,成功接收一条消息一般包括3个阶段:1、客户端接收消息;2、客户端处理消息;3、消息被签收。

控制台查看:

文章来源:智云一二三科技

文章标题:Java,ActiveMQ,发布订阅模式,自动应答模式案例

文章地址:https://www.zhihuclub.com/187143.shtml

关于作者: 智云科技

热门文章

网站地图