RocketMQ安装
RocketMQ用于内部服务之间的消息交互, 相比接口调用方式,能够提升数据吞吐量、处理性能以及数据的可靠性, 还能实现异步、应用 解耦 、流量削峰等功能。
1. RocketMQ简介
RocketMQ是 阿里巴巴 在 2012 年开源的第三代分布式消息中间件,历年双11,RocketMQ 都承载着阿里巴巴生产系统100%的消息流转,以2017年双11为例, RocketMQ 完成了1.2万亿消息精准低延迟投递,交易峰值高达17万笔/秒, 目前有 100 多家公司和科研机构正在使用RocketMQ。
2 . RocketMQ物理架构
RocketMQ集群中包含4个模块:Namesrv, Broker , Producer, Consumer。
- Namesrv : 存储当前集群所有Brokers信息、Topic跟Broker的对应关系。
- Broker : 集群最核心模块,主要负责Topic消息存储、消费者的消费位点管理(消费进度)。
- Producer : 消息生产者,每个生产者都有一个ID(编号),多个生产者实例可以共用同一个ID。同一个ID下所有实例组成一个生产者集群。
- Consumer : 消息消费者,每个订阅者也有一个ID(编号),多个消费者实例可以共用同一个ID。同一个ID下所有实例组成一个消费者集群。
3 RocketMQ 术语解析
在开发使用之前, 需要先了解RocketMQ 涉及的术语概念。
- Producer
- Producer 消息生产者,生产者的作用就是将消息发送到 MQ (Message Queue)
- 消息生产者,负责产生消息,一般由业务系统负责产生消息。
2.Producer Group
- 生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
3.Consumer
- 消息消费者,负责消费消息,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。
4.Consumer Group
- 消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。
- 通过 Group 机制,让 RocketMQ 天然的支持消息负载均衡。比如某个Topic(主题)有9条消息,其中一个Consumer Group有3个实例(3个进程 OR 3台机器),那么每个实例将均摊3条消息。
5.Topic
- Topic(主题) 是一种消息的逻辑分类,比如有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。
6.Message
- Message 是消息的载体,一个 Message 必须指定 topic,相当于寄信的地址。
- Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker(代理人/中间人) 上的消息,方便在开发过程中诊断问题。
7.Tag
- Tag (标签)可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
8.Broker
- Broker(代理人/中间人) 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。
- Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。
- Broker 作为消息中转角色,负责存储消息,转发消息,一般也称为 Server
3. 下载
Rocket MQ 4.4安装包
Rocket Console 控制台
- 解压
unzip -qo rocketmq-all-4.4.0-bin-release.zip
2. 修改配置
我们在虚拟机中运行,默认会占用4G内存,比较浪费资源, 作为学习和测试, 可以修改减少其内存占用:vi bin/runserver.sh
JAVA _OPT="${JAVA_OPT} -server -Xms256m -Xmx1024m -Xmn1024m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m"
vi bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx1024m -Xmn1024m"
vi bin/tools.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"
3. 服务配置
vi /usr/local/rocketmq_4.4/conf/broker.conf, 示例:
namesrvAddr=10.10.250.251:9876
brokerIP1=10.10.250.251
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
aclEnable=false
4.启动服务
1.启动 Name Server
nohup /usr/local/rocketmq_4.4/bin/mqnamesrv >/dev/null 2>&1 &
日志显示,代表成功
The Name Server boot success. serializeType=JSON
2. 启动 Broker
上面启动的Name Server默认端口为9876, 这里需要指定
nohup /usr/local/rocketmq_4.4/bin/mqbroker -n 127.0.0.1 :9876 > /dev/null 2>&1 &
提示代表启动成功
The broker[localhost.localdomain, 10.10.250.251:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
自定义脚本
启动脚本
vi startup.sh
## 脚本内容
#!/bin/sh
echo "=============startup name server =================="
nohup /usr/local/rocketmq_4.4/bin/mqnamesrv >/dev/null 2>&1 &
echo "==============startup broker============"
nohup /usr/local/rocketmq_4.4/bin/mqbroker -n 127.0.0.1:9876 > /dev/null 2>&1 &
停止脚本:
vi shutdown .sh
## 脚本内容
#!/bin/sh
echo "=============shutdown broker =================="
/usr/local/rocketmq_4.4/bin/mqshutdown broker
echo "==============shutdown name server============"
/usr/local/rocketmq_4.4/bin/mqshutdown namesrv
启动时指定配置文件
sh bin/mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq_4.4/conf/broker.conf &
4. 安装Rocket Console控制台
1.解压
unzip rocketmq-console-1.0.0.zip
2. maven 打包
如果没有maven环境, 通过 yum 安装
yum -y install maven
3. 进入安装目录, 执行打包命令:
mvn clean package -Dmaven.test.skip=true
4.打包成功后, target目录下会生成rocketmq-console-ng-1.0.0.jar文件
5. 新建application.properties配置文件与jar包放到同一目录, 配置信息:
server.port=9800
rocketmq.config.namesrvAddr=127.0.0.1:9876
6.启动控制台
nohup java -jar rocketmq-console-ng-1.0.0.jar > /dev/null 2>&1 &
7.访问地址, #/ 测试使用#/
注意事项
mq安全组端口配置
项目模块划分
新建stock-quote行情服务工程
新建stock-p roxy代理服务
- 行情代理服务与客户端对接, 建立长连接传输数据, 行情代理服务无状态, 负责行情数据的分发, 支持多个节点水平扩展,提高行情分发效率。
- 行情源服务订阅/拉取第三方行情源服务提供的数据, 由于正式行情需要收取巨额费用, 这里我们是爬取的第三方数据,行情源服务核心是提供行情加工处理的服务, 可以支撑多个不同第三方行情源的接入, 比如不同的交易所( 上交所 、深交所、港交所等),不同的产品行情报价( 外汇 、贵金属、 期货 、 股票 等), 即便第三方行情中断, 也能够基于已获取的行情数据继续提供服务。
- 从设计上, 行情源服务是核心, 以集群方式运作, 为保证处理性能, 加入缓存, 行情数据是需要落地至数据库, 保障数据的稳定性, 以及其他服务对行情数据的要求; 行情代理服务通过RocketMQ异步队列与行情源服务建立连接, RocketMQ是强一致性队列, 有较高的数据安全与可靠性, 能够保障行情数据的快速稳定分发。