您的位置 首页 java

JavaDemo案例演示RocketMQ DLedger宕机故障下的高可用

今天演示RocketMQ 宕机故障下的高可用。

使用demo案例之前的文章已经上传至gitee,自己去clone一份。地址:

RocketMQ的环境是双主双从模式,具体请看文章:

我们先演示双主双从环境下到底能否实现高可用。

我们先检查下环境:

双主双从集群模式

双主双从的配置这里就不展示了,请移步查看文章:

从图中得知,双主双从模式是正常的,我们打开RocketMQ-SpringBoot项目。首先增加namesrv地址,因为之前的测试都是单台单主模式下进行的测试。

配置两台机器的namesrvAddr

我们修改完配置后,模拟创建生产者进行消息发送,如下所示「代码片段」:

 /**
 * 集群测试(双主双从)
 * 增加两个namesrv地址
 * 需要修改配置文件:rocketmq.name-server=10.211.55.11:9876;10.211.55.12:9876
 *
 * @return  java .lang.Object
 * @throws
 * @Date 2021/4/27 8:35 下午
 */
@ Request Mapping("/ CLUSTER ")
public Object cluster() {
    SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TopicCluster, "集群消息");
    log.info("《集群生产者》发送结果:{}", sendResult);
    return sendResult;
}  

再创建一个消费者进行消费。

 package com.sunjs.rocketmq.consumer.cluster;

import com.sunjs.rocketmq.common.MqConfig;
import com.sunjs.rocketmq.consumer._BaseConsumer;
import lombok.extern.slf4j.Slf4j;
import org. apache .rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring. annotation .RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
 
/**
 * 测试消费端接收String字符串
 *
 * @Date 2021/4/25 2:48 下午
 */
@Slf4j
@Service
@RocketMQMessageListener(
        topic = MqConfig.Topic.TopicCluster, //topic
        consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TopicCluster //分组规则,Group-"topic命名"
)
public class ClusterConsumer  extends  _BaseConsumer<String> implements RocketMQListener<MessageExt> {
 
    @Override
    public  void  onMessage(MessageExt message) {
        log.info("《集群消费者》接收数据:{}", message);
        if (!isVerify(message, String.class)) {
            return;
        }
        // 执行MQ 消息公共处理
        todo();
    }
 
    /**
     * 具体业务处理
     *
     * @Author: sun
     * @Date: 2021/4/25 3:29 下午
     */
    @Override
    protected void handle() {
        // TOOD 业务处理
    }
}  

都是很普通的生产者和消费者。主要是打印生产和消费的详细日志,我们来分析 broker 切换。

现在我们启动SpringBoot项目,请求URL进行测试,此时我的控制台打印情况如下:

 2021-04-28 20:45:21.060  INFO 79324 --- [nio-8080-exec-1] c.s.r.p.controller.ProducerController    : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F00000135DC18B4AAC28F7FDAE20000, offsetMsgId=0AD3370B00002B67000000000016FDE8, messageQueue=MessageQueue [topic=TopicCluster, brokerName=broker-a, queueId=3], queueOffset=0]
2021-04-28 20:45:56.013  INFO 79324 --- [MessageThread_1] c.s.r.consumer.cluster.ClusterConsumer   : 《集群消费者》接收数据:MessageExt [brokerName=broker-a, queueId=3, storeSize=306, queueOffset=0, sysFlag=0, bornTimestamp=1619613921038, bornHost=/10.211.55.2:51921, storeTimestamp=1619613921055, storeHost=/10.211.55.11:11111, msgId=0AD3370B00002B67000000000016FDE8, commitLogOffset=1506792, bodyCRC=1376009287, reconsumeTimes=0, prepared transaction Offset=0,  toString ()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619613956013, id=75b500d8-e90e-818e-011f-ed9b1129c215, UNIQ_KEY=7F00000135DC18B4AAC28F7FDAE20000, CLUSTER=DefaultCluster, WAIT=false, contentType=text/plain; charset =UTF-8, timestamp=1619613920953}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]  

rocketmq-console控制台查看topic主题信息

我们搭建的是双主双从,有两主,也就是说现在这里却只有一台11机器上的broker-a,这是为什么呢?

其实我们可以分析一下:

因为路由信息在broker上生成,当发送者发送消息的个数小于某个阀值时,RocketMQ根据轮询规则,消息会落在同一个broker上,该broker上生成路由规则上报给NameServer后,再同步给发送者,这样在发送者的内存中就只有一个broker的队列信息了,NameServer也就只有一个broker的路由信息。这里可能大部分人不太明白,没关系,后期我们会详细介绍。

言归正传:

我们通过控制台看到消息打到了11机器上,现在我们将11机器停机,或者停止掉broker。

 # 该命令将会停止11机器上的所有broker
> sh bin/mqshutdown broker  

停止后,此时我们看到控制台输出一堆的日志信息,如下所示:

 2021-04-28 20:47:33.822  INFO 79324 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true
2021-04-28 20:47:33.929  INFO 79324 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true
2021-04-28 20:47:34.168  INFO 79324 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true
2021-04-28 20:47:34.275  INFO 79324 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true  

这时候我们再生产一条消息看看会发生什么现象

 2021-04-28 20:49:44.259 ERROR 79324 --- [nio-8080-exec-7] o.a.r.spring.core.RocketMQTemplate       : syncSend failed. destination:TopicCluster, message:GenericMessage [payload=RocketMQ双主双从测试, headers={id=cf63a0b2-6a5d-d7f4-5224-742ca1ea5707, timestamp=1619614184256}] 
2021-04-28 20:49:44.261 ERROR 79324 --- [nio-8080-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw  Exception  [Request processing failed; nested  exception  is org.springframework.messaging.MessagingException: No route info of this topic: TopicCluster
See  for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicCluster
See  for further details.] with  root  cause
 
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicCluster
See  for further details.
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:685) ~[rocketmq-client-4.8.0.jar:4.8.0]
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343) ~[rocketmq-client-4.8.0.jar:4.8.0]
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:344) ~[rocketmq-client-4.8.0.jar:4.8.0]
    at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:555) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]
    at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:484) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]  

控制台抛出了一大堆的异常,找不到主体TopicCluster。

总结:RocketMQ本身并不支持高可用,不能自动控制节点切换,一旦出了问题,需要人为介入。那么怎么人为介入呢,就是为12机器上的 broker 手动创建主题Topic。

我们可以使用rocketmq-console来测试添加一下,界面上给broker-b新增TopicCluster主题(mqadmin指令也可以操作)。

手动添加topic

创建完成后,我们再生产一条信息进行测试一下,结果如下所示,消息正常生产和消息了:

 2021-04-28 20:56:02.738  INFO 79324 --- [nio-8080-exec-3] c.s.r.p.controller.ProducerController    : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F00000135DC18B4AAC28F89A5AE0176, offsetMsgId=0AD3370C000056CE0000000000035237, messageQueue=MessageQueue [topic=TopicCluster, brokerName=broker-b, queueId=6], queueOffset=0]
2021-04-28 20:56:02.741  INFO 79324 --- [essageThread_13] c.s.r.consumer.cluster.ClusterConsumer   : 《集群消费者》接收数据:MessageExt [brokerName=broker-b, queueId=6, storeSize=306, queueOffset=0, sysFlag=0, bornTimestamp=1619614562734, bornHost=/10.211.55.2:52513, storeTimestamp=1619614562733, storeHost=/10.211.55.12:22222, msgId=0AD3370C000056CE0000000000035237, commitLogOffset=217655, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619614562741, id=3773aa8a-3ec7-7790-d241-a3e42f6e6ef7, UNIQ_KEY=7F00000135DC18B4AAC28F89A5AE0176, CLUSTER=DefaultCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619614562734}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]  

既然RocketMQ本身不支持高可用,那使用什么来支持真正意义上的高可用呢,比如:master服务宕机,slave服务转换成master。

我们可以使用DLedger,DLedger 这里就不做介绍了,后期会单独介绍这个。今天我们主要是演示RocketMQ容灾高可用。

那么DLedger如何配置呢,如果RocketMQ安装版本是按照我之前的文章来操作的,4.8版本是支持的,并且下载的是编译好的binary zip包。那么官方也给出了配置案例

JavaDemo案例演示RocketMQ DLedger宕机故障下的高可用

我们进入rocketmq conf目录下观察一下。/usr/local/rocketmq-4.8.0/conf

JavaDemo案例演示RocketMQ DLedger宕机故障下的高可用

JavaDemo案例演示RocketMQ DLedger宕机故障下的高可用

官方附带的dledger配置案例,有三个配置文件,分别为:broker-n0、broker-n1、broker-n2。

可以打开这三个文件挨个看一下,对比一下,这里就不详解了,直接上手测试吧。

现在我们无需修改任何东西,现在要做的就是停止掉11机器和12机器上的所有broker和namesrv服务。

11机器:

 # 先启动11机器上的namesrv服务
> nohup sh bin/mqnamesrv &
# 启动dledger服务
> nohup sh bin/dledger/fast-try.sh start &  

都启动好后,我们打开rocketmq-console控制台看一下集群

dledger高可用

我们可以看到启动后,有一主两从。生产环境搭建也建议使用3台机器。 现在我们看下那三个配置文件,看看哪个配置文件监听的端口是30931这个端口,配置项:listenPort。

我这里演示的环境经过查询broker-n2.conf这个文件监听的端口是30931,那我现在kill掉这个服务。

 # 查看broker-n2的线程,然后挨个kill掉
> ps -ef|grep broker-n2
> kill -9 21736 21755 21767  

停止后,然后我们再看一下rocketma-console控制台

slave 升级至 master

我们发现30911从slave自动转换到了master,从而实现了真正意义上的高可用。

现在我们使用java demo 案例测试一下。使用 sh bin/mqshutdown broker 先停止掉所有的broker,再重启一次,让三台broker都启动起来。

dledger 高可用

我们先修改java demo的配置文件,将namesrvAddr配置项去掉12这台机器,因为我们现在使用的是一台机器启动了三个broker。

发送消息后控制台输出如下:

 2021-04-28 21:33:53.720  INFO 80521 --- [nio-8080-exec-1] c.s.r.p.controller.ProducerController    : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000013A8918B4AAC28FAC4B690000, offsetMsgId=0AD3370B000078BF0000000000000030, messageQueue=MessageQueue [topic=TopicCluster, brokerName=RaftNode00, queueId=3], queueOffset=0]
2021-04-28 21:34:30.231  INFO 80521 --- [MessageThread_1] c.s.r.consumer.cluster.ClusterConsumer   : 《集群消费者》接收数据:MessageExt [brokerName=RaftNode00, queueId=3, storeSize=303, queueOffset=0, sysFlag=0, bornTimestamp=1619616833399, bornHost=/10.211.55.2:50006, storeTimestamp=1619616833471, storeHost=/10.211.55.11:30911, msgId=0AD3370B000078BF0000000000000030, commitLogOffset=48, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619616870231, id=266f6a06-5363-e267-885b-404e4851ff8c, UNIQ_KEY=7F0000013A8918B4AAC28FAC4B690000, CLUSTER=RaftCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619616833251}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]  

消息发往了30911这台master broker。现在我们使用上边的方式只停止掉这个master服务,找到线程 kill 掉。

slave 升级 master

然后30921从slave转换成了master,我们再发送消息试试看。

 2021-04-28 21:37:23.031  INFO 80521 --- [nio-8080-exec-4] c.s.r.p.controller.ProducerController    : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000013A8918B4AAC28FAF7E42000C, offsetMsgId=0AD3370B000078C90000000000000AC8, messageQueue=MessageQueue [topic=TopicCluster, brokerName=RaftNode00, queueId=0], queueOffset=0]
2021-04-28 21:37:23.032  INFO 80521 --- [MessageThread_3] c.s.r.consumer.cluster.ClusterConsumer   : 《集群消费者》接收数据:MessageExt [brokerName=RaftNode00, queueId=0, storeSize=303, queueOffset=0, sysFlag=0, bornTimestamp=1619617043010, bornHost=/10.211.55.2:50453, storeTimestamp=1619617043014, storeHost=/10.211.55.11:30921, msgId=0AD3370B000078C90000000000000AC8, commitLogOffset=2760, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619617043032, id=0bf4f7e5-b8ad-030e-6561-93aeb53dbd3a, UNIQ_KEY=7F0000013A8918B4AAC28FAF7E42000C, CLUSTER=RaftCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619617043010}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]  

消息又全部打到了30921这个broker上。

我们经过一系列的测试,使用Dledger达到了高可用架构。

我这里附带一份三台机器的测试配置方式吧。

环境:三台虚拟机,ip为:110.211.55.11、10.211.55.12、10.211.55.13

在三台机器的conf目录下新增配置文件,我这里就粘贴一份,根据机器不同进行修改吧。

三台机器上的 listenPort 配置项 依次为:10001、10002、10003

三台机器上的 dLegerSelfId 配置项 依次为:n0、n1、n2

 # dledger_broker.conf
 
#集群名称
brokerClusterName = RaftCluster
 
#broker集群名称
brokerName=RaftNode00
 
#监听端口
listenPort=10001
 
#namesrv地址列表
namesrvAddr=10.211.55.11:9876;10.211.55.12:9876;10.211.55.13:9876
 
#主题不存在时是否自动创建主题
autoCreateTopicEnable=true
 
#订阅组不存在时是否自动创建订阅组
autoCreateSubscriptionGroup=true
 
#数据存储根路径
storePathRootDir=/usr/local/rocketmq-4.8.0/store
 
#commitlog数据存储根路径
#storePathCommitLog=/tmp/rmqstore/node02/commitlog
 
#是否启用DLeger集群模式
enableDLegerCommitLog=true
 
#与brokerName保持一致就好
dLegerGroup=RaftNode00
 
#dLeger集群下的节点配置
dLegerPeers=n0-10.211.55.11:11111;n1-10.211.55.12:22222;n2-10.211.55.13:33333
 
## must be unique
#当前节点在dLeger集群下的标识
dLegerSelfId=n0
 
#服务端处理消息发送线程池数量
sendMessageThreadPoolNums=16  

然后分别启动服务

 # 挨个启动namesrv服务
nohup sh bin/mqnamesrv &
 
# 挨个启动broker服务
nohup sh bin/mqbroker -c conf/dledger_broker.conf &  

文章来源:智云一二三科技

文章标题:JavaDemo案例演示RocketMQ DLedger宕机故障下的高可用

文章地址:https://www.zhihuclub.com/186697.shtml

关于作者: 智云科技

热门文章

网站地图