1 、简介
同类产品: RabbitMQ 、 Kafka、Redis(List)
1.1 对比RabbitMQ
最接近的同类型产品,经常拿来比较,性能伯仲之间,基本上可以互相替代。最主要区别是二者的协议不同RabbitMQ的协议是 AMQP (Advanced Message Queueing Protoco),而 ActiveMQ 使用的是 JMS ( java Messaging Service )协议。顾名思义JMS是针对 JAVA 体系的传输协议,队列两端必须有 JVM ,所以如果开发环境都是java的话推荐使用 activeMQ ,可以用Java的一些对象进行传递比如Map、BLob、Stream等。而AMQP通用行较强,非java环境经常使用,传输内容就是标准字符串。
另外一点就是 RabbitMQ用Erlang开发 ,安装前要装Erlang环境,比较麻烦。ActiveMQ解压即可用不用任何安装。
1.2 对比 Kafka
Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好。
弊端是:
在传输过程中可能会出现消息重复的情况,
不保证发送顺序
一些传统 MQ 的功能没有,比如消息的事务功能。
所以通常用Kafka处理大数据日志。
1.3 对比Redis
其实Redis本身利用List可以实现消息队列的功能,但是功能很少,而且队列体积较大时性能会急剧下降。对于数据量不大、业务简单的场景可以使用。
2 安装 ActiveMQ
拷贝apache-activemq-5.14.4-bin.tar.gz到Linux服务器的 /opt 下
解压缩 tar -zxvf apache-activemq-5.14.4-bin.tar.gz
重命名 mv apache-activemq-5.14.4 activemq
vim /opt/activemq/bin/activemq
查看java环境:vim /etc/profile 或者 echo $JAVA_HOME
增加两行
JAVA_HOME=”/opt/jdk1.8.0_152″ JAVA_CMD=”/opt/jdk1.8.0_152/bin” |
注册服务
ln -s /opt/activemq/bin/activemq /etc/init.d/activemq(软连接必须使用绝对路径) chkconfig –add activemq # 禁止使用 # cp /opt/ activemq /bin/activemq /etc/init.d/activemq |
启动服务
service activemq start
关闭服务
service activemq stop
通过 net stat 查看端口
# netstat -tlnp
t:表示 tcp
l:表示监听
n: 将ip和端口转换成域名和服务名
p:显示的程序名
activemq两个重要的端口,一个是提供消息队列的默认端口:61616
另一个是控制台端口8161
通过控制台测试
启动消费端
进入网页控制台
账号/密码默认: admin/admin
点击Queues
观察客户端
3 在Java中使用消息队列
3.1 在gmall-service-util中导入依赖坐标
< dependency >
< dependency >
|
3.2 在payment项目中添加producer端
public class ProducerTest { MessageProducer producer = session.createProducer(queue); |
注意:如果有事务需要先提交事务session.commit();
Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Number Of Consumers 消费者 这个是消费者端的消费者数量
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费者消费掉的数量
总结:
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1.
在来一条消息时,等待消费的消息是1,进入队列的消息就是2.
3.3 在payment项目中添加consumer端
public class Consumer Test {
|
3.4 关于事务控制
producer提交时的事务 | 事务开启 true | 只执行send并不会提交到队列中,只有当执行session.commit()时,消息才被真正的提交到队列中。 |
事务不开启 false | 只要执行 send ,就进入到队列中。 | |
consumer 接收时的事务 | 事务开启,签收必须写 Session. SESSION_TRANSACTED
| 收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该 线程 死掉、抛异常,或者程序执行了session.rollback()那么消息会释放,重新回到队列中被别的消费端再次消费。 |
事务不开启,签收方式选择 Session.AUTO_ACKNOWLEDGE | 只要调用comsumer.receive方法 ,自动确认。 | |
事务不开启,签收方式选择 Session.CLIENT_ACKNOWLEDGE | 需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。 手动签收 | |
事务不开启,签收方式选择 Session.DUPS_OK_ACKNOWLEDGE | 在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。
|
3.5 持久化与非持久化
通过 producer.setDeliveryMode(DeliveryMode. PERSISTENT ) 进行设置
持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。
持久化:当服务器宕机,消息依然存在。
非持久化:当服务器宕机,消息不存在。
在 zookeeper 中,有 持久化 -非持久化。