您的位置 首页 java

RocketMQ篇2:发送和接收消息

不同类型的消费者

1、DefaultMQPushConsumer

  1. 由系统控制读取操作,收到消息后自动调用传入的处理方法来处理
  2. 设置好各种参数和传入处理消息的函数。系统收到后自动调用处理函数来处理消息,自动保存Offset,并加入新的消费者之后自动做负载均衡

  1. Consumer的GroupName:用于把多个Consumer一起,提高并发处理能力,两种消费模式
    1. Clustering:顺序消费模式,同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分内容。同一个ConsumerGroup里所有的Consumer消费的内容合起来才是订阅Topic内容的全部,达到负载均衡的目的。
    2. Broadcasting:广播模式,同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,就是一个消息会被多次分发,被多个Consumer消费。
  1. NameServer的地址和端口号,可以填写多个,用分号分割。
  2. Topic名称用来标识消息类型,需要提前创建。如果不需要消费某个Topic下的所有消息,可以通过指定消息的Tag进行消息过滤。

2、DefaultMQPushConsumer的处理流程

  1. Push方式优点:实时性高
  2. Push方式缺点:加大 Server 端 工作量,进而影响Server端性能;各个Client性能不同,处理发送过来的任务时可能出现各种潜在问题;
  3. Pull方式优点:有主动权,Client自己去拉取数据
  4. Pull方式缺点:间隔不好确定;循环拉取消息的间隔不好设定,间隔小,容易”忙等”,浪费资源;间隔大,不能及时处理
  5. 通过长轮询的方式达到Push效果的方法,长轮询方式既有Pull的优点,又兼具Push方式的实时性。设置brokerSuspendMaxTimeMillis,设置Broker最长阻塞时间,默认设置15s。

  1. 长轮询的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。
  2. 长轮询方式的局限性,是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列客户端连接数可控的场景中。

3、DefaultMQPushConsumer的流量控制

  1. PushConsumer的核心还是Pull方式,PushConsumer有个线程池,消息处理逻辑在各个线程里同时执行。

  1. RocketMQ定义了一个快照类ProcessQueue来处理多线程的监控和控制。每个Message Queue都有一个对应的ProcessQueue对象,保存这个Message Queue消息处理状态的快照。
  2. ProcessQueue对象主要的内容是一个TreeMap和一个读写锁。TreeMap以Message Queue的Offset作为Key,以消息内容的引用作为Value,保存了所有从MessageQueue获取到,但是还未被处理的消息;读写锁控制着多个线程堆TreeMap对象的并发访问。

  1. PushConsumer会判断获取但还未处理的消息个数、消息总大小,Offset的跨度,任何一个值超过设定的大小就隔一段时间在拉取消息,从而达到流浪控制的目的。

4、DefaultMQPullConsumer

  1. 读取操作中大部分功能由使用者自主控制

  1. 获取Message Queue并遍历
  2. 维护Offsetstore
  3. 根据不同的消息状态做不同的处理;FOUND表示获取到消息;NO_NEW_MSG表示没有新的消息。

5、Consumer的启动、关闭流程

  1. PushConsumer在启动的时候,会做各种配置检查,然后连接NameServer获取Topic信息,启动时如果遇到异常,比如无法连接NameServer,程序仍然可以正常启动不报错。
  2. 为什么不直接报错退出?因为RocketMQ集群可以有多个NamerServer、Broker,某个机器出异常后整体服务依然可用,所以DefaultMQPushConsumer被设计成当发现某个连接异常时不立即退出,而是不断尝试重连。

不同类型的生产者

1、DefaultMQProducer

  1. 设置Producer的GroupName。
  2. 设置InstanceName,当一个JVM需要启动多个Producer的时候,通过设置不同的InstanceName来区分。
  3. 设置发送失败重试次数。
  4. 设置NameServer地址
  5. 组装消息并发送
  6. 异步发送返回状态:
    1. FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘,该错误在Broker的刷盘策略被设置为SYNC_FLUSH。
    2. FLUSH_SLAVE_TIEMOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在规定时间内完成主从同步。
    3. SLAVE_NOT_AVAILABLE:表示在主备方式下,并且Broker被设置为SYNC_MASTER,当时没有找到配置成Slave的Broker。
    4. SEND_OK:发送成功。

2、发送延迟消息

RocketMQ支持发送延迟消息,在创建Message对象时,调用setDelayTimeLevel(int level)方法设置延迟时间。

3、自定义消息发送规则

  1. 把消息发送到指定的Message Queue里。

  1. 发送时,将MessageQueueSelector的对象作为参数,使用public SendResult send(Message msg, MessageQueueSelector selector, Object arg)函数发送即可。

4、对事务的支持

  1. 事务具体流程
    1. 发送方向RocketMQ发送“待确认”消息。
    2. RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功。第一阶段消息发送完成。
    3. 发送方开始执行本地事件逻辑。
    4. 发送发根据本地事件执行结果向RocketMQ发送二次确认( Commit 或Rollback)消息,RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接受不到消息。
    5. 如果出现异常情况,步骤d)提交的二次确认最终未达到RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起回查请求。
    6. 发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit或Rollback状态。
    7. RocketMQ收到回查请求后,按照步骤d)的逻辑继续处理。
  1. 客户端有三个类支持用户实现事务消息
    1. LocalTransactionExecuter,用来处理步骤c)的逻辑,根据情况返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE状态。
    2. TransactionMQProducer,用法与DefaultMQProducer类似,要通过它启动一个Producer并发消息,但是比DefaultMQProducer多设置本地事务处理函数和回查状态函数。
    3. TransactionCheckListener,用来处理步骤e)中MQ服务器的回查请求,返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE。

如何存储队列位置信息

  1. OffsetStore的类结构

  1. Offset的类结构,主要分为本地文件类型和Broker代存类型
  2. 对于DefaultMQPushConsumer CLUSTERING模式,顺序消费模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore结构
  3. 对于DefaultMQPushConsumer BROADCASTING模式,广播模式,每个Consumer都能收到这个Topic的全部消息,各个Consumer间互相没有干扰,使用LocalOffsetStore,把Offset存在本地。
  4. 如果使用PullConsumer,就要自己处理好OffsetStore了。因为默认的情况会存储在内存中,如果程序异常或重启会导致Offset丢失,消费异常,自定义持久存储OffsetStore如下:

自定义日志输出

  1. 默认Log存储位置:${user.home}/Logs/rocketmqLogs
  2. 通过Rocketmq.Client.LogLevel或System. setProperty (“rockemtmq.Client.LogLevel”)设置log level。
  3. 如果通过lockback设置的话,需要把Rocketmq.Client.Log.loadconfig或System.setProperty(“rockemtmq.Client.Log.loadconfig”)设置为false,然后配置相应的logback.xml。

文章来源:智云一二三科技

文章标题:RocketMQ篇2:发送和接收消息

文章地址:https://www.zhihuclub.com/196252.shtml

关于作者: 智云科技

热门文章

网站地图