您的位置 首页 java

java微服务实战项目股票项目搭建,RocketMQ安装和使用

RocketMQ安装

RocketMQ用于内部服务之间的消息交互, 相比接口调用方式,能够提升数据吞吐量、处理性能以及数据的可靠性, 还能实现异步、应用 解耦 、流量削峰等功能。

1. RocketMQ简介

RocketMQ是 阿里巴巴 在 2012 年开源的第三代分布式消息中间件,历年双11,RocketMQ 都承载着阿里巴巴生产系统100%的消息流转,以2017年双11为例, RocketMQ 完成了1.2万亿消息精准低延迟投递,交易峰值高达17万笔/秒, 目前有 100 多家公司和科研机构正在使用RocketMQ。

2 . RocketMQ物理架构

RocketMQ集群中包含4个模块:Namesrv, Broker , Producer, Consumer。

  • Namesrv : 存储当前集群所有Brokers信息、Topic跟Broker的对应关系。
  • Broker : 集群最核心模块,主要负责Topic消息存储、消费者的消费位点管理(消费进度)。
  • Producer : 消息生产者,每个生产者都有一个ID(编号),多个生产者实例可以共用同一个ID。同一个ID下所有实例组成一个生产者集群。
  • Consumer : 消息消费者,每个订阅者也有一个ID(编号),多个消费者实例可以共用同一个ID。同一个ID下所有实例组成一个消费者集群。

3 RocketMQ 术语解析

在开发使用之前, 需要先了解RocketMQ 涉及的术语概念。

  1. Producer
  • Producer 消息生产者,生产者的作用就是将消息发送到 MQ (Message Queue)
  • 消息生产者,负责产生消息,一般由业务系统负责产生消息。

2.Producer Group

  • 生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。

3.Consumer

  • 消息消费者,负责消费消息,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。

4.Consumer Group

  • 消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。
  • 通过 Group 机制,让 RocketMQ 天然的支持消息负载均衡。比如某个Topic(主题)有9条消息,其中一个Consumer Group有3个实例(3个进程 OR 3台机器),那么每个实例将均摊3条消息。

5.Topic

  • Topic(主题) 是一种消息的逻辑分类,比如有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。

6.Message

  • Message 是消息的载体,一个 Message 必须指定 topic,相当于寄信的地址。
  • Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker(代理人/中间人) 上的消息,方便在开发过程中诊断问题。

7.Tag

  • Tag (标签)可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

8.Broker

  • Broker(代理人/中间人) 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。
  • Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。
  • Broker 作为消息中转角色,负责存储消息,转发消息,一般也称为 Server

3. 下载

Rocket MQ 4.4安装包

Rocket Console 控制台

  1. 解压

unzip -qo rocketmq-all-4.4.0-bin-release.zip

2. 修改配置

我们在虚拟机中运行,默认会占用4G内存,比较浪费资源, 作为学习和测试, 可以修改减少其内存占用:vi bin/runserver.sh

  JAVA _OPT="${JAVA_OPT} -server -Xms256m -Xmx1024m -Xmn1024m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m"  

vi bin/runbroker.sh

 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx1024m -Xmn1024m"  

vi bin/tools.sh

 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"  

3. 服务配置

vi /usr/local/rocketmq_4.4/conf/broker.conf, 示例:

 namesrvAddr=10.10.250.251:9876
brokerIP1=10.10.250.251
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
aclEnable=false  

4.启动服务

1.启动 Name Server

 nohup /usr/local/rocketmq_4.4/bin/mqnamesrv >/dev/null 2>&1 &   

日志显示,代表成功

 The Name Server boot success. serializeType=JSON  

2. 启动 Broker

上面启动的Name Server默认端口为9876, 这里需要指定

 nohup /usr/local/rocketmq_4.4/bin/mqbroker -n  127.0.0.1 :9876 > /dev/null 2>&1 &   

提示代表启动成功

 The broker[localhost.localdomain, 10.10.250.251:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876  

自定义脚本

启动脚本

 vi startup.sh
## 脚本内容
#!/bin/sh
echo "=============startup name server =================="
nohup /usr/local/rocketmq_4.4/bin/mqnamesrv >/dev/null 2>&1 &
 echo  "==============startup broker============"
nohup /usr/local/rocketmq_4.4/bin/mqbroker -n 127.0.0.1:9876 > /dev/null 2>&1 &  

停止脚本:

 vi  shutdown .sh
## 脚本内容
#!/bin/sh
echo "=============shutdown broker =================="
/usr/local/rocketmq_4.4/bin/mqshutdown broker
echo "==============shutdown name server============"
/usr/local/rocketmq_4.4/bin/mqshutdown namesrv  

启动时指定配置文件

 sh bin/mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq_4.4/conf/broker.conf &  

4. 安装Rocket Console控制台

1.解压

 unzip rocketmq-console-1.0.0.zip  

2. maven 打包

如果没有maven环境, 通过 yum 安装

 yum -y install maven  

3. 进入安装目录, 执行打包命令:

 mvn clean package -Dmaven.test.skip=true  

4.打包成功后, target目录下会生成rocketmq-console-ng-1.0.0.jar文件

5. 新建application.properties配置文件与jar包放到同一目录, 配置信息:

 server.port=9800
rocketmq.config.namesrvAddr=127.0.0.1:9876  

6.启动控制台

 nohup java -jar rocketmq-console-ng-1.0.0.jar > /dev/null 2>&1 &  

7.访问地址, #/ 测试使用#/

java微服务实战项目股票项目搭建,RocketMQ安装和使用

注意事项

mq安全组端口配置

java微服务实战项目股票项目搭建,RocketMQ安装和使用

项目模块划分

新建stock-quote行情服务工程
新建stock-p roxy代理服务

  • 行情代理服务与客户端对接, 建立长连接传输数据, 行情代理服务无状态, 负责行情数据的分发, 支持多个节点水平扩展,提高行情分发效率。
  • 行情源服务订阅/拉取第三方行情源服务提供的数据, 由于正式行情需要收取巨额费用, 这里我们是爬取的第三方数据,行情源服务核心是提供行情加工处理的服务, 可以支撑多个不同第三方行情源的接入, 比如不同的交易所( 上交所 、深交所、港交所等),不同的产品行情报价( 外汇 、贵金属、 期货 股票 等), 即便第三方行情中断, 也能够基于已获取的行情数据继续提供服务。
  • 从设计上, 行情源服务是核心, 以集群方式运作, 为保证处理性能, 加入缓存, 行情数据是需要落地至数据库, 保障数据的稳定性, 以及其他服务对行情数据的要求; 行情代理服务通过RocketMQ异步队列与行情源服务建立连接, RocketMQ是强一致性队列, 有较高的数据安全与可靠性, 能够保障行情数据的快速稳定分发。

结束

文章来源:智云一二三科技

文章标题:java微服务实战项目股票项目搭建,RocketMQ安装和使用

文章地址:https://www.zhihuclub.com/193172.shtml

关于作者: 智云科技

热门文章

网站地图