您的位置 首页 java

面试官:熟悉RocketMQ,那聊聊发送普通消息几种方式?3张图拿下

RocketMQ 发送普通消息有三种实现方式: 可靠同步发送 可靠异步发送 单向(Oneway)发送

注意 顺序消息只支持可靠同步发送

一、概念

1、可靠同步发送

原理 :同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。

面试官:熟悉RocketMQ,那聊聊发送普通消息几种方式?3张图拿下

应用场景: 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

2、可靠异步发送

原理 :异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 消息队列 RocketMQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。

面试官:熟悉RocketMQ,那聊聊发送普通消息几种方式?3张图拿下

应用场景: 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如批量发货等操作。

3、单向(Oneway)发送

原理 :单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。

面试官:熟悉RocketMQ,那聊聊发送普通消息几种方式?3张图拿下

应用场景: 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

4、三种对比

下表概括了三者的特点和主要区别。

发送方式

发送 TPS

发送结果反馈

可靠性

同步发送

不丢失

异步发送

不丢失

单向发送

最快

可能丢失


二、代码示例

1、三种方式代码示例

 @Slf4j
@ Rest Controller
public class Controller {
    /**
     * 生产者组
     */    private static String PRODUCE_RGROUP = "test_producer";
    /**
     * 创建生产者对象
     */    private static DefaultMQProducer producer = null;

    static {
        producer = new DefaultMQProducer(PRODUCE_RGROUP);
        //不开启vip通道 开通口端口会减2
        producer.setVipChannelEnabled(false);
        //绑定name server
        producer.setNamesrvAddr("47.99.03.25:9876");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }

    @GetMapping("/message")
    public  void  message() throws Exception {
        //1、同步
        sync();
        //2、异步
        async();
        //3、单项发送
        oneWay();
    }
    /**
     * 1、同步发送消息
     */    private  void sync() throws Exception {
        //创建消息
        Message message = new Message("topic_family", ("  同步发送  ").getBytes());
        //同步发送消息
         SEND Result sendResult = producer.send(message);
        log.info("Product-同步发送-Product信息={}", sendResult);
    }
    /**
     * 2、异步发送消息
     */    private  void async() throws Exception {
        //创建消息
        Message message = new Message("topic_family", ("  异步发送  ").getBytes());
        //异步发送消息
        producer.send(message, new  Send Callback() {
            @ Override 
            public void onSuccess(SendResult sendResult) {
                log.info("Product-异步发送-输出信息={}", sendResult);
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                //补偿机制,根据业务情况进行使用,看是否进行重试
            }
        });
    }
    /**
     * 3、单项发送消息
     */    private  void oneWay() throws Exception {
        //创建消息
        Message message = new Message("topic_family", (" 单项发送 ").getBytes());
        //同步发送消息
        producer.sendOneway(message);
    }
}
  

2、测试结果

这里消费者代码就不贴出来了。

面试官:熟悉RocketMQ,那聊聊发送普通消息几种方式?3张图拿下

通过这个很明显可以看出三种方式都被 Consumer 消费了。只不过对于 Product 同步和异步发送是有返回信息的,单项发送是没有返回信息的。


三、SendStatus状态

当Product发送消息的时候,会返回SendResult对象,该对象又包含了一个SendStatus对象。

 package org.apache.rocketmq.client.producer;
public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}
  

下面对这几种状态进行说明

SEND_OK

代表发送成功!但并不保证它是可靠的。要确保不会丢失任何消息,还应启用SYNC_MASTER或SYNC_FLUSH。

SLAVE_NOT_AVAILABLE

如果Broker的角色是SYNC_MASTER(同步复制)(默认为异步),但没有配置Slave Broker,将获得此状态。

FLUSH_DISK_TIMEOUT

如果Broker设置为 SYNC_FLUSH(同步刷盘)(默认为ASYNC_FLUSH),并且Broker的syncFlushTimeout(默认为5秒)内完成刷新磁盘,将获得此状态。

FLUSH_SLAVE_TIMEOUT

如果Broker的角色是SYNC_MASTER(同步复制)(默认为ASYNC_MASTER),并且从属Broker的syncFlushTimeout(默认为5秒)内完成与主服务器的同步,将获得此状态。

文章来源:智云一二三科技

文章标题:面试官:熟悉RocketMQ,那聊聊发送普通消息几种方式?3张图拿下

文章地址:https://www.zhihuclub.com/192117.shtml

关于作者: 智云科技

热门文章

网站地图