您的位置 首页 java

消息中间件—RocketMQ的RPC通信(一)

作者:胡宗棠

来源: 匠心独运的博客

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键。

一、RocketMQ中Remoting通信模块概览

RocketMQ消息队列的整体部署架构如下图所示:

消息中间件—RocketMQ的RPC通信(一)

先来说下RocketMQ消息队列集群中的几个角色:

(1)NameServer:在MQ集群中做的是做命名服务,更新和路由发现 broker服务;

(2)Broker-Master:broker 消息主机服务器;

(3)Broker-Slave:broker 消息从机服务器;

(4)Producer:消息生产者;

(5)Consumer:消息消费者;

其中,RocketMQ集群的一部分通信如下:

(1)Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定期向NameServer上报Topic路由信息;

(2)消息生产者Producer作为客户端发送消息时候,需要根据Msg的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取;

(3)消息生产者Producer根据(2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者收消息并落盘存储; 从上面(1)~(3)中可以看出在消息生产者, Broker和Name Server 之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。

rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-server、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在 Netty 的基础之上扩展了通信模块。ps:鉴于RocketMQ的通信模块是建立在Netty基础之上的,因此在阅读RocketMQ的源码之前,读者最好先对Netty的多线程模型、JAVA NIO模型均有一定的了解,这样子理解RocketMQ源码会较为快一些。

作者阅读的RocketMQ版本是4.2.0, 依赖的netty版本是4.0.42.Final. RocketMQ的代码结构图如下:

消息中间件—RocketMQ的RPC通信(一)

源码部分主要可以分为rocketmq-broker,rocketmq-client,rocketmq-common,rocketmq-filterSrv,rocketmq-namesrv和rocketmq-remoting等模块,通信框架就封装在rocketmq-remoting模块中。 本文主要从RocketMQ的协议格式,消息编解码,通信方式(同步/异步/单向)和具体的发送/接收消息的通信流程来进行阐述等。

二、RocketMQ中Remoting通信模块的具体实现

  • Remoting通信模块的类结构图
消息中间件—RocketMQ的RPC通信(一)

从类层次结构来看:

(1)RemotingService:为最上层的接口,提供了三个方法:

  1. void
  2. start();
  3. void
  4. shutdown();
  5. void
  6. register RPC Hook(
  7. RPCHook
  8. rpcHook);

(2)RemotingClient/RemotingSever:两个接口继承了最上层接口—RemotingService,分别各自为Client和Server提供所必需的方法,下面所列的是RemotingServer的方法:

  1. /**
  2. * 同RemotingClient端一样
  3. *
  4. * @param requestCode
  5. * @param processor
  6. * @param executor
  7. */
  8. void
  9. registerProcessor(
  10. final
  11. int
  12. requestCode,
  13. final
  14. NettyRequestProcessor
  15. processor,
  16. final
  17. ExecutorService
  18. executor);
  19. /**
  20. * 注册默认的处理器
  21. *
  22. * @param processor
  23. * @param executor
  24. */
  25. void
  26. registerDefaultProcessor(
  27. final
  28. Netty request Processor
  29. processor,
  30. final
  31. ExecutorService
  32. executor);
  33. int
  34. localListenPort();
  35. /**
  36. * 根据请求code来获取不同的处理Pair
  37. *
  38. * @param requestCode
  39. * @return
  40. */
  41. Pair
  42. <
  43. NettyRequestProcessor
  44. ,
  45. ExecutorService
  46. > getProcessorPair(
  47. final
  48. int
  49. requestCode);
  50. /**
  51. * 同RemotingClient端一样,同步通信,有返回RemotingCommand
  52. * @param channel
  53. * @param request
  54. * @param timeoutMillis
  55. * @return
  56. * @ throws InterruptedException
  57. * @throws RemotingSendRequestException
  58. * @throws RemotingTimeoutException
  59. */
  60. RemotingCommand
  61. invokeSync(
  62. final
  63. Channel
  64. channel,
  65. final
  66. RemotingCommand
  67. request,
  68. final
  69. long
  70. timeoutMillis)
  71. throws
  72. InterruptedException
  73. ,
  74. RemotingSendRequestException
  75. ,
  76. RemotingTimeoutException
  77. ;
  78. /**
  79. * 同RemotingClient端一样,异步通信,无返回RemotingCommand
  80. *
  81. * @param channel
  82. * @param request
  83. * @param timeoutMillis
  84. * @param invokeCallback
  85. * @throws InterruptedException
  86. * @throws RemotingTooMuchRequestException
  87. * @throws RemotingTimeoutException
  88. * @throws RemotingSendRequestException
  89. */
  90. void
  91. invokeAsync(
  92. final
  93. Channel
  94. channel,
  95. final
  96. RemotingCommand
  97. request,
  98. final
  99. long
  100. timeoutMillis,
  101. final
  102. InvokeCallback
  103. invokeCallback)
  104. throws
  105. InterruptedException
  106. ,
  107. RemotingTooMuchRequestException
  108. ,
  109. RemotingTimeoutException
  110. ,
  111. RemotingSendRequestException
  112. ;
  113. /**
  114. * 同RemotingClient端一样,单向通信,诸如心跳包
  115. *
  116. * @param channel
  117. * @param request
  118. * @param timeoutMillis
  119. * @throws InterruptedException
  120. * @throws RemotingTooMuchRequestException
  121. * @throws RemotingTimeoutException
  122. * @throws RemotingSendRequestException
  123. */
  124. void
  125. invokeOneway(
  126. final
  127. Channel
  128. channel,
  129. final
  130. RemotingCommand
  131. request,
  132. final
  133. long
  134. timeoutMillis)
  135. throws
  136. InterruptedException
  137. ,
  138. RemotingTooMuchRequestException
  139. ,
  140. RemotingTimeoutException
  141. ,
  142. RemotingSendRequestException
  143. ;

(3)NettyRemotingAbstract:Netty通信处理的 抽象类 ,定义并封装了Netty处理的公共处理方法;

(4)NettyRemotingClient以及NettyRemotingServer:分别实现了RemotingClient和RemotingServer, 都继承了NettyRemotingAbstract抽象类。RocketMQ中其他的组件(如client、nameServer、broker在进行消息的发送和接收时均使用这两个组件)

  • 消息的协议设计与编码解码

在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。

RemotingCommand类的部分成员变量如下:

Header字段类型Request说明Response说明codeint请求操作码,应答方根据不同的请求码进行不同的业务处理应答响应码。0表示成功,非0则表示各种错误languageLanguageCode请求方实现的语言应答方实现的语言versionint请求方程序的版本应答方程序的版本opaqueint相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应应答不做修改直接返回flagint区分是普通RPC还是onewayRPC得标志区分是普通RPC还是onewayRPC得标志remarkString传输自定义文本信息传输自定义文本信息extFieldsHashMap请求自定义扩展信息响应自定义扩展信息

这里展示下Broker向NameServer发送一次心跳注册的报文:

  1. [
  2. code=
  3. 103
  4. ,
  5. //这里的103对应的code就是broker向nameserver注册自己的消息
  6. language=JAVA,
  7. version=
  8. 137
  9. ,
  10. opaque=
  11. 58
  12. ,
  13. //这个就是requestId
  14. flag(B)=
  15. 0
  16. ,
  17. remark=
  18. null
  19. ,
  20. extFields={
  21. brokerId=
  22. 0
  23. ,
  24. clusterName=
  25. DefaultCluster
  26. ,
  27. brokerAddr=ip1:
  28. 10911
  29. ,
  30. haServerAddr=ip1:
  31. 10912
  32. ,
  33. brokerName=LAPTOP-SMF2CKDN
  34. },
  35. serializeTypeCurrentRPC=JSON

下面来看下RocketMQ通信协议的格式:

消息中间件—RocketMQ的RPC通信(一)

可以看到传输内容主要可以分为以下4部分:

(1)消息长度: 总长度,四个字节存储,占用一个int类型;

(2)序列化类型&消息头长度: 同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;

(3)消息头数据: 经过序列化后的消息头数据;

(4)消息主体数据: 消息主体的二进制字节数据内容;

消息的编码和解码分别在RemotingCommand类的encode和decode方法中完成,下面是消息编码encode方法的具体实现:

  1. public
  2. byte Buffer
  3. encode() {
  4. // 1> header length size
  5. int
  6. length =
  7. 4
  8. ;
  9. //消息总长度
  10. // 2> header data length
  11. //将消息头编码成byte[]
  12. byte
  13. [] headerData =
  14. this
  15. .headerEncode();
  16. //计算头部长度
  17. length += headerData.length;
  18. // 3> body data length
  19. if
  20. (
  21. this
  22. .body !=
  23. null
  24. ) {
  25. //消息主体长度
  26. length += body.length;
  27. }
  28. //分配ByteBuffer, 这边加了4,
  29. //这是因为在消息总长度的计算中没有将存储头部长度的4个字节计算在内
  30. ByteBuffer
  31. result =
  32. ByteBuffer
  33. .allocate(
  34. 4
  35. + length);
  36. // length
  37. //将消息总长度放入ByteBuffer
  38. result.putInt(length);
  39. // header length
  40. //将消息头长度放入ByteBuffer
  41. result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
  42. // header data
  43. //将消息头数据放入ByteBuffer
  44. result.put(headerData);
  45. // body data;
  46. if
  47. (
  48. this
  49. .body !=
  50. null
  51. ) {
  52. //将消息主体放入ByteBuffer
  53. result.put(
  54. this
  55. .body);
  56. }
  57. //重置ByteBuffer的position位置
  58. result.flip();
  59. return
  60. result;
  61. }
  62. /**
  63. * markProtocolType方法是将RPC类型和headerData长度编码放到一个byte[4]数组中
  64. *
  65. * @param source
  66. * @param type
  67. * @return
  68. */
  69. public
  70. static
  71. byte
  72. [] markProtocolType(
  73. int
  74. source,
  75. SerializeType
  76. type) {
  77. byte
  78. [] result =
  79. new
  80. byte
  81. [
  82. 4
  83. ];
  84. result[
  85. 0
  86. ] = type.getCode();
  87. //右移16位后再和255与->“16-24位”
  88. result[
  89. 1
  90. ] = (
  91. byte
  92. ) ((source >>
  93. 16
  94. ) &
  95. 0xFF
  96. );
  97. //右移8位后再和255与->“8-16位”
  98. result[
  99. 2
  100. ] = (
  101. byte
  102. ) ((source >>
  103. 8
  104. ) &
  105. 0xFF
  106. );
  107. //右移0位后再和255与->“8-0位”
  108. result[
  109. 3
  110. ] = (
  111. byte
  112. ) (source &
  113. 0xFF
  114. );
  115. return
  116. result;
  117. }

消息解码decode方法是编码的逆向过程,其具体实现如下:

  1. public
  2. static
  3. RemotingCommand
  4. decode(
  5. final
  6. ByteBuffer
  7. byteBuffer) {
  8. //获取byteBuffer的总长度
  9. int
  10. length = byteBuffer.limit();
  11. //获取前4个字节,组装int类型,该长度为总长度
  12. int
  13. oriHeaderLen = byteBuffer.getInt();
  14. //获取消息头的长度,这里和0xFFFFFF做与运算,编码时候的长度即为24位
  15. int
  16. headerLength = getHeaderLength(oriHeaderLen);
  17. byte
  18. [] headerData =
  19. new
  20. byte
  21. [headerLength];
  22. byteBuffer.
  23. get
  24. (headerData);
  25. RemotingCommand
  26. cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
  27. int
  28. bodyLength = length –
  29. 4
  30. – headerLength;
  31. byte
  32. [] bodyData =
  33. null
  34. ;
  35. if
  36. (bodyLength >
  37. 0
  38. ) {
  39. bodyData =
  40. new
  41. byte
  42. [bodyLength];
  43. byteBuffer.
  44. get
  45. (bodyData);
  46. }
  47. cmd.body = bodyData;
  48. return
  49. cmd;
  50. }
  • 消息的通信方式和通信流程

在RocketMQ消息队列中支持通信的方式主要有以下三种:

(1)同步(sync)

(2)异步(async)

(3)单向(oneway)

其中“同步”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。本文将主要介绍RocketMQ的异步通信流程(限于篇幅,读者可以按照同样的模式进行分析同步通信流程)。下面先给出了RocketMQ异步通信的整体流程图:

消息中间件—RocketMQ的RPC通信(一)

下面两小节内容主要介绍了Client端发送请求消息、Server端接收消息的具体实现并简要分析的Client端的回调。

1)Client发送请求消息的具体实现

当客户端调用异步通信接口—invokeAsync时候,先由RemotingClient的实现类—NettyRemotingClient根据addr获取相应的channel(如果本地缓存中没有则创建),随后调用invokeAsyncImpl方法,将数据流转给抽象类NettyRemotingAbstract处理(真正做完发送请求动作的是在NettyRemotingAbstract抽象类的invokeAsyncImpl方法里面)。

具体发送请求消息的源代码如下所示:

  1. /**
  2. * invokeAsync(异步调用)
  3. *
  4. */
  5. public
  6. void
  7. invokeAsyncImpl(
  8. final
  9. Channel
  10. channel,
  11. final
  12. RemotingCommand
  13. request,
  14. final
  15. long
  16. timeoutMillis,
  17. final
  18. InvokeCallback
  19. invokeCallback)
  20. throws
  21. InterruptedException
  22. ,
  23. RemotingTooMuchRequestException
  24. ,
  25. RemotingTimeoutException
  26. ,
  27. RemotingSendRequestException
  28. {
  29. //相当于request ID, RemotingCommand会为每一个request产生一个request ID, 从0开始, 每次加1
  30. final
  31. int
  32. opaque = request.getOpaque();
  33. boolean
  34. acquired =
  35. this
  36. .semaphoreAsync.tryAcquire(timeoutMillis,
  37. TimeUnit
  38. .MILLISECONDS);
  39. if
  40. (acquired) {
  41. final
  42. SemaphoreReleaseOnlyOnce
  43. once =
  44. new
  45. SemaphoreReleaseOnlyOnce
  46. (
  47. this
  48. .semaphoreAsync);
  49. //根据request ID构建ResponseFuture
  50. final
  51. ResponseFuture
  52. responseFuture =
  53. new
  54. ResponseFuture
  55. (opaque, timeoutMillis, invokeCallback, once);
  56. //将ResponseFuture放入responseTable
  57. this
  58. .responseTable.put(opaque, responseFuture);
  59. try
  60. {
  61. //使用Netty的channel发送请求数据
  62. channel.writeAndFlush(request).addListener(
  63. new
  64. ChannelFutureListener
  65. () {
  66. //消息发送后执行
  67. @Override
  68. public
  69. void
  70. operationComplete(
  71. ChannelFuture
  72. f)
  73. throws
  74. Exception
  75. {
  76. if
  77. (f.isSuccess()) {
  78. //如果发送消息成功给Server,那么这里直接Set后return
  79. responseFuture.setSendRequestOK(
  80. true
  81. );
  82. return
  83. ;
  84. }
  85. else
  86. {
  87. responseFuture.setSendRequestOK(
  88. false
  89. );
  90. }
  91. responseFuture.putResponse(
  92. null
  93. );
  94. responseTable.remove(opaque);
  95. try
  96. {
  97. //执行回调
  98. executeInvokeCallback(responseFuture);
  99. }
  100. catch
  101. (
  102. Throwable
  103. e) {
  104. log .warn(
  105. “excute callback in writeAndFlush addListener, and callback throw”
  106. , e);
  107. }
  108. finally
  109. {
  110. //释放信号量
  111. responseFuture.release();
  112. }
  113. log.warn(
  114. “send a request command to channel <{}> failed.”
  115. ,
  116. RemotingHelper
  117. .parseChannelRemoteAddr(channel));
  118. }
  119. });
  120. }
  121. catch
  122. (
  123. Exception
  124. e) {
  125. //异常处理
  126. responseFuture.release();
  127. log.warn(
  128. “send a request command to channel <”
  129. +
  130. RemotingHelper
  131. .parseChannelRemoteAddr(channel) +
  132. “> Exception”
  133. , e);
  134. throw
  135. new
  136. RemotingSendRequestException
  137. (
  138. RemotingHelper
  139. .parseChannelRemoteAddr(channel), e);
  140. }
  141. }
  142. else
  143. {
  144. if
  145. (timeoutMillis <=
  146. 0
  147. ) {
  148. throw
  149. new
  150. RemotingTooMuchRequestException
  151. (
  152. “invokeAsyncImpl invoke too fast”
  153. );
  154. }
  155. else
  156. {
  157. String
  158. info =
  159. String
  160. .format(
  161. “invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d”
  162. ,
  163. timeoutMillis,
  164. this
  165. .semaphoreAsync.getQueueLength(),
  166. this
  167. .semaphoreAsync.availablePermits()
  168. );
  169. log.warn(info);
  170. throw
  171. new
  172. RemotingTimeoutException
  173. (info);
  174. }
  175. }
  176. }

在Client端发送请求消息时有个比较重要的数据结构需要注意下:

(1)responseTable—保存请求码与响应关联映射

  1. protected
  2. final
  3. ConcurrentHashMap
  4. <
  5. Integer
  6. /* opaque */
  7. ,
  8. ResponseFuture
  9. > responseTable

opaque表示请求发起方在同个连接上不同的请求标识代码,每次发送一个消息的时候,可以选择同步阻塞/异步非阻塞的方式。无论是哪种通信方式,都会保存请求操作码至ResponseFuture的Map映射—responseTable中。

(2)ResponseFuture—保存返回响应(包括回调执行方法和信号量)

  1. public
  2. ResponseFuture
  3. (
  4. int
  5. opaque,
  6. long
  7. timeoutMillis,
  8. InvokeCallback
  9. invokeCallback,
  10. SemaphoreReleaseOnlyOnce
  11. once) {
  12. this
  13. .opaque = opaque;
  14. this
  15. .timeoutMillis = timeoutMillis;
  16. this
  17. .invokeCallback = invokeCallback;
  18. this
  19. .once = once;
  20. }

对于同步通信来说,第三、四个参数为null;而对于异步通信来说,invokeCallback是在收到消息响应的时候能够根据responseTable找到请求码对应的回调执行方法,semaphore参数用作流控,当多个线程同时往一个连接写数据时可以通过信号量控制permit同时写许可的数量。

(3)异常发送流程处理—定时扫描responseTable本地缓存

在发送消息时候,如果遇到异常情况(比如服务端没有response返回给客户端或者response因网络而丢失),上面所述的responseTable的本地缓存Map将会出现堆积情况。这个时候需要一个定时任务来专门做responseTable的清理回收。在RocketMQ的客户端/服务端启动时候会产生一个频率为1s调用一次来的定时任务检查所有的responseTable缓存中的responseFuture变量,判断是否已经得到返回, 并进行相应的处理。

  1. public
  2. void
  3. scanResponseTable() {
  4. final
  5. List
  6. <
  7. ResponseFuture
  8. > rfList =
  9. new
  10. LinkedList
  11. <
  12. ResponseFuture
  13. >();
  14. Iterator
  15. <
  16. Entry
  17. <
  18. Integer
  19. ,
  20. ResponseFuture
  21. >> it =
  22. this
  23. .responseTable.entrySet().iterator();
  24. while
  25. (it.hasNext()) {
  26. Entry
  27. <
  28. Integer
  29. ,
  30. ResponseFuture
  31. >
  32. next
  33. = it.
  34. next
  35. ();
  36. ResponseFuture
  37. rep =
  38. next
  39. .getValue();
  40. if
  41. ((rep.getBeginTimestamp() + rep.getTimeoutMillis() +
  42. 1000
  43. ) <=
  44. System
  45. .currentTimeMillis()) {
  46. rep.release();
  47. it.remove();
  48. rfList.add(rep);
  49. log.warn(
  50. “remove timeout request, ”
  51. + rep);
  52. }
  53. }
  54. for
  55. (
  56. ResponseFuture
  57. rf : rfList) {
  58. try
  59. {
  60. executeInvokeCallback(rf);
  61. }
  62. catch
  63. (
  64. Throwable
  65. e) {
  66. log.warn(
  67. “scanResponseTable, operationComplete Exception”
  68. , e);
  69. }
  70. }
  71. }

2)Server端接收消息并进行处理的具体实现

Server端接收消息的处理入口在NettyServerHandler类的channelRead0方法中,其中调用了processMessageReceived方法(这里省略了Netty服务端消息流转的大部分流程和逻辑)。

其中服务端最为重要的处理请求方法实现如下:

  1. public
  2. void
  3. processRequestCommand(
  4. final
  5. ChannelHandlerContext
  6. ctx,
  7. final
  8. RemotingCommand
  9. cmd) {
  10. //根据RemotingCommand中的code获取processor和ExecutorService
  11. final
  12. Pair
  13. <
  14. NettyRequestProcessor
  15. ,
  16. ExecutorService
  17. > matched =
  18. this
  19. .processorTable.
  20. get
  21. (cmd.getCode());
  22. final
  23. Pair
  24. <
  25. NettyRequestProcessor
  26. ,
  27. ExecutorService
  28. > pair =
  29. null
  30. == matched ?
  31. this
  32. .defaultRequestProcessor : matched;
  33. final
  34. int
  35. opaque = cmd.getOpaque();
  36. if
  37. (pair !=
  38. null
  39. ) {
  40. Runnable
  41. run =
  42. new
  43. Runnable
  44. () {
  45. @Override
  46. public
  47. void
  48. run() {
  49. try
  50. {
  51. //rpc hook
  52. RPCHook
  53. rpcHook =
  54. NettyRemotingAbstract
  55. .
  56. this
  57. .getRPCHook();
  58. if
  59. (rpcHook !=
  60. null
  61. ) {
  62. rpcHook.doBeforeRequest(
  63. RemotingHelper
  64. .parseChannelRemoteAddr(ctx.channel()), cmd);
  65. }
  66. //processor处理请求
  67. final
  68. RemotingCommand
  69. response = pair.getObject1().processRequest(ctx, cmd);
  70. //rpc hook
  71. if
  72. (rpcHook !=
  73. null
  74. ) {
  75. rpcHook.doAfterResponse(
  76. RemotingHelper
  77. .parseChannelRemoteAddr(ctx.channel()), cmd, response);
  78. }
  79. if
  80. (!cmd.isOnewayRPC()) {
  81. if
  82. (response !=
  83. null
  84. ) {
  85. response.setOpaque(opaque);
  86. response.markResponseType();
  87. try
  88. {
  89. ctx.writeAndFlush(response);
  90. }
  91. catch
  92. (
  93. Throwable
  94. e) {
  95. PLOG.error(
  96. “process request over, but response failed”
  97. , e);
  98. PLOG.error(cmd. toString ());
  99. PLOG.error(response.toString());
  100. }
  101. }
  102. else
  103. {
  104. }
  105. }
  106. }
  107. catch
  108. (
  109. Throwable
  110. e) {
  111. if
  112. (!
  113. “com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException”
  114. .equals(e.getClass().getCanonicalName())) {
  115. PLOG.error(
  116. “process request exception”
  117. , e);
  118. PLOG.error(cmd.toString());
  119. }
  120. if
  121. (!cmd.isOnewayRPC()) {
  122. final
  123. RemotingCommand
  124. response =
  125. RemotingCommand
  126. .createResponseCommand(
  127. RemotingSysResponseCode
  128. .SYSTEM_ERROR,
  129. //
  130. RemotingHelper
  131. .exceptionSimpleDesc(e));
  132. response.setOpaque(opaque);
  133. ctx.writeAndFlush(response);
  134. }
  135. }
  136. }
  137. };
  138. if
  139. (pair.getObject1().rejectRequest()) {
  140. final
  141. RemotingCommand
  142. response =
  143. RemotingCommand
  144. .createResponseCommand(
  145. RemotingSysResponseCode
  146. .SYSTEM_BUSY,
  147. “[REJECTREQUEST]system busy, start flow control for a while”
  148. );
  149. response.setOpaque(opaque);
  150. ctx.writeAndFlush(response);
  151. return
  152. ;
  153. }
  154. try
  155. {
  156. //封装requestTask
  157. final
  158. RequestTask
  159. requestTask =
  160. new
  161. RequestTask
  162. (run, ctx.channel(), cmd);
  163. //想线程池提交requestTask
  164. pair.getObject2().submit(requestTask);
  165. }
  166. catch
  167. (
  168. RejectedExecutionException
  169. e) {
  170. if
  171. ((
  172. System
  173. .currentTimeMillis() %
  174. 10000
  175. ) ==
  176. 0
  177. ) {
  178. PLOG.warn(
  179. RemotingHelper
  180. .parseChannelRemoteAddr(ctx.channel())
  181. //
  182. +
  183. “, too many requests and system thread pool busy, RejectedExecutionException ”
  184. //
  185. + pair.getObject2().toString()
  186. //
  187. +
  188. ” request code: ”
  189. + cmd.getCode());
  190. }
  191. if
  192. (!cmd.isOnewayRPC()) {
  193. final
  194. RemotingCommand
  195. response =
  196. RemotingCommand
  197. .createResponseCommand(
  198. RemotingSysResponseCode
  199. .SYSTEM_BUSY,
  200. “[OVERLOAD]system busy, start flow control for a while”
  201. );
  202. response.setOpaque(opaque);
  203. ctx.writeAndFlush(response);
  204. }
  205. }
  206. }
  207. else
  208. {
  209. String
  210. error =
  211. ” request type ”
  212. + cmd.getCode() +
  213. ” not supported”
  214. ;
  215. //构建response
  216. final
  217. RemotingCommand
  218. response =
  219. RemotingCommand
  220. .createResponseCommand(
  221. RemotingSysResponseCode
  222. .REQUEST_CODE_NOT_SUPPORTED, error);
  223. response.setOpaque(opaque);
  224. ctx.writeAndFlush(response);
  225. PLOG.error(
  226. RemotingHelper
  227. .parseChannelRemoteAddr(ctx.channel()) + error);
  228. }
  229. }

上面的请求处理方法中根据RemotingCommand的请求业务码来匹配到相应的业务处理器;然后生成一个新的线程提交至对应的业务线程池进行异步处理。

(1)processorTable—请求业务码与业务处理、业务线程池的映射变量

  1. protected
  2. final
  3. HashMap
  4. <
  5. Integer
  6. /* request code */
  7. ,
  8. Pair
  9. <
  10. NettyRequestProcessor
  11. ,
  12. ExecutorService
  13. >> processorTable =
  14. new
  15. HashMap
  16. <
  17. Integer
  18. ,
  19. Pair
  20. <
  21. NettyRequestProcessor
  22. ,
  23. ExecutorService
  24. >>(
  25. 64
  26. );

个人觉得RocketMQ这种做法是为了给不同类型的请求业务码指定不同的处理器Processor处理,同时消息实际的处理并不是在当前线程,而是被封装成task放到业务处理器Processor对应的线程池中完成异步执行。(在RocketMQ中能看到很多地方都是这样的处理,这样的设计能够最大程度的保证异步,保证每个线程都专注处理自己负责的东西)

3)Client端异步回调执行的实现分析

看到这里可能有一些同学会疑问Client端的异步回调究竟在哪里执行的?从上面“RocketMQ异步通信的整体时序图”来看,回调执行处理的流程的确是放在了Client端来完成,而rocketmq-remoting通信模块中只是给异步回调处理提供了接口。

这里可以看下rocketmq-client模块异步发送消息的部分代码(限于篇幅也只是列举了异步回调执行的部分代码):

  1. private
  2. void
  3. sendMessageAsync(
  4. final
  5. String
  6. addr,
  7. final
  8. String
  9. brokerName,
  10. final
  11. Message
  12. msg,
  13. final
  14. long
  15. timeoutMillis,
  16. final
  17. RemotingCommand
  18. request,
  19. final
  20. SendCallback
  21. sendCallback,
  22. final
  23. TopicPublishInfo
  24. topicPublishInfo,
  25. final
  26. MQClientInstance
  27. instance,
  28. final
  29. int
  30. retryTimesWhenSendFailed,
  31. final
  32. AtomicInteger
  33. times,
  34. final
  35. SendMessageContext
  36. context,
  37. final
  38. DefaultMQProducerImpl
  39. producer
  40. )
  41. throws
  42. InterruptedException
  43. ,
  44. RemotingException
  45. {
  46. this
  47. .remotingClient.invokeAsync(addr, request, timeoutMillis,
  48. new
  49. InvokeCallback
  50. () {
  51. @Override
  52. public
  53. void
  54. operationComplete(
  55. ResponseFuture
  56. responseFuture) {
  57. //先从Server端返回的responseFuture变量中获取RemotingCommand的值
  58. RemotingCommand
  59. response = responseFuture.getResponseCommand();
  60. if
  61. (
  62. null
  63. == sendCallback && response !=
  64. null
  65. ) {
  66. try
  67. {
  68. //Client端处理发送消息的Reponse返回(包括对消息返回体的头部进行解码,
  69. //取得“topic”、“BrokerName”、“QueueId”等值)
  70. //随后构建sendResult对象并设置Context上下文中
  71. SendResult
  72. sendResult =
  73. MQClientAPIImpl
  74. .
  75. this
  76. .processSendResponse(brokerName, msg, response);
  77. if
  78. (context !=
  79. null
  80. && sendResult !=
  81. null
  82. ) {
  83. context.setSendResult(sendResult);
  84. context.getProducer().executeSendMessageHookAfter(context);
  85. }
  86. }
  87. catch
  88. (
  89. Throwable
  90. e) {
  91. }
  92. producer.updateFaultItem(brokerName,
  93. System
  94. .currentTimeMillis() – responseFuture.getBeginTimestamp(),
  95. false
  96. );
  97. return
  98. ;
  99. }
  100. //省略其他部分代码
  101. //……
  102. }

这里需要结合3.1节的内容和NettyRemotingAbstract抽象类的processResponseCommand方法,便可以明白Client端实现异步回调的大致流程了。在Client端发送异步消息时候(rocketmq-client模块最终调用sendMessageAsync方法时),会将InvokeCallback的接口注入,而在Server端的异步线程由上面所讲的业务线程池真正执行后,返回response给Client端时候才会去触发执行。

NettyRemotingAbstract抽象类的processResponseCommand方法的具体代码如下:

  1. public
  2. void
  3. processResponseCommand(
  4. ChannelHandlerContext
  5. ctx,
  6. RemotingCommand
  7. cmd) {
  8. //从RemotingCommand中获取opaque值
  9. final
  10. int
  11. opaque = cmd.getOpaque();‘
  12. //从本地缓存的responseTable这个Map中取出本次异步通信连接对应的ResponseFuture变量
  13. final
  14. ResponseFuture
  15. responseFuture = responseTable.
  16. get
  17. (opaque);
  18. if
  19. (responseFuture !=
  20. null
  21. ) {
  22. responseFuture.setResponseCommand(cmd);
  23. responseTable.remove(opaque);
  24. if
  25. (responseFuture.getInvokeCallback() !=
  26. null
  27. ) {
  28. //在这里真正去执行Client注入进来的异步回调方法
  29. executeInvokeCallback(responseFuture);
  30. }
  31. else
  32. {
  33. //否则释放responseFuture变量
  34. responseFuture.putResponse(cmd);
  35. responseFuture.release();
  36. }
  37. }
  38. else
  39. {
  40. log.warn(
  41. “receive response, but not matched any request, ”
  42. +
  43. RemotingHelper
  44. .parseChannelRemoteAddr(ctx.channel()));
  45. log.warn(cmd.toString());
  46. }
  47. }

三、总结

刚开始看RocketMQ源码—RPC通信模块可能觉得略微有点复杂,但是只要能够抓住Client端发送请求消息、Server端接收消息并处理的流程以及回调过程来分析和梳理,那么整体来说并不复杂。RPC通信部分也是RocketMQ源码中最重要的部分之一,想要对其中的全过程和细节有更为深刻的理解,还需要多在本地环境Debug和分析对应的日志。同时,鉴于篇幅所限,本篇还没有来得及对RocketMQ的Netty多线程模型进行介绍,将在消息中间件—RocketMQ的RPC通信(二)篇中来做详细地介绍。

文章来源:智云一二三科技

文章标题:消息中间件—RocketMQ的RPC通信(一)

文章地址:https://www.zhihuclub.com/196608.shtml

关于作者: 智云科技

热门文章

网站地图