您的位置 首页 java

消息中间件—RocketMQ的RPC通信(二

作者:胡宗棠

来源:匠心独运的博客

在(一)篇中主要介绍了RocketMQ的协议格式,消息编解码,通信方式(同步/异步/单向)、消息发送/接收以及异步回调的主要通信流程。 而本篇将主要对RocketMQ消息队列RPC通信部分的Netty 多线程 模型进行重点介绍。

为何要使用Netty作为高性能的通信库?

在看RocketMQ的RPC通信部分时候,可能有不少同学有这样子的疑问,RocketMQ为何要选择Netty而不直接使用JDK的NIO进行网络编程呢?这里有必要先来简要介绍下Netty。 Netty是一个封装了JDK的NIO库的高性能网络通信开源框架。它提供异步的、 事件驱动 的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

下面主要列举了下一般系统的RPC通信模块会选择Netty作为底层通信库的理由(作者认为RocketMQ的RPC同样也是基于此选择了Netty):

(1)Netty的编程API使用简单,开发门槛低,无需编程者去关注和了解太多的NIO编程模型和概念;

(2)对于编程者来说,可根据业务的要求进行定制化地开发,通过Netty的ChannelHandler对通信框架进行灵活的定制化扩展;

(3)Netty框架本身支持拆包/解包,异常检测等机制,让编程者可以从JAVA NIO的繁琐细节中解脱,而只需要关注业务处理逻辑;

(4)Netty解决了(准确地说应该是采用了另一种方式完美规避了)JDK NIO的Bug(Epoll bug,会导致Selector空轮询,最终导致CPU 100%);

(5)Netty框架内部对 线程 ,selector做了一些细节的优化,精心设计的reactor多线程模型,可以实现非常高效地并发处理;

(6)Netty已经在多个开源项目(Hadoop的RPC框架avro使用Netty作为通信框架)中都得到了充分验证,健壮性/可靠性比较好。

RocketMQ中RPC通信的Netty多线程模型

RocketMQ的RPC通信部分采用了”1+N+M1+M2″的Reactor多线程模式,对网络通信部分进行了一定的扩展与优化,这一节主要让我们来看下这一部分的具体设计与实现内容。

  • Netty的Reactor多线程模型设计概念与简述

这里有必要先来简要介绍下Netty的Reactor多线程模型。Reactor多线程模型的设计思想是分而治之+事件驱动。

(1)分而治之

一般来说,一个网络请求连接的完整处理过程可以分为接受(accept)、数据读取(read)、解码/编码(decode/encode)、业务处理(process)、发送响应(send)这几步骤。Reactor模型将每个步骤都映射成为一个任务,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是这个任务,且采用以非阻塞方式执行。

(2)事件驱动

每个任务对应特定网络事件。当任务准备就绪时,Reactor收到对应的网络事件通知,并将任务分发给绑定了对应网络事件的 Handler 执行。

  • RocketMQ中RPC通信的1+N+M1+M2的Reactor多线程设计与实现

(1)RocketMQ中RPC通信的Reactor多线程设计与流程

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。下面先给出一张RocketMQ的RPC通信层的Netty多线程模型框架图,让大家对RocketMQ的RPC通信中的多线程分离设计有一个大致的了解。

从上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接后丢给Reactor 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),它负责将建立好连接的 socket 注册到 selector上去(RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(defaultEventExecutorGroup,即为上面的“M1”,源码中默认设置为8)。

为了更为高效的处理RPC的网络请求,这里的Worker线程池是专门用于处理Netty网络通信相关的(包括编码/解码、空闲链接管理、网络连接管理以及网络请求处理)。而处理业务操作放在业务线程池中执行(这个内容在“RocketMQ的RPC通信(一)篇”中也有提到),根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。

下面以表格的方式列举了下上面所述的“1+N+M1+M2”Reactor多线程模型

(2)RocketMQ中RPC通信的Reactor多线程的代码具体实现

说完了Reactor多线程整体的设计与流程,大家应该就对RocketMQ的RPC通信的Netty部分有了一个比较全面的理解了,那接下来就从源码上来看下一些细节部分(在看该部分代码时候需要读者对JAVA NIO和Netty的相关概念与技术点有所了解)。在NettyRemotingServer的实例初始化时,会初始化各个相关的变量包括serverBootstrap、nettyServerConfig参数、channelEventListener监听器并同时初始化eventLoopGroupBoss和eventLoopGroupSelector两个Netty的EventLoopGroup线程池(这里需要注意的是,如果是Linux平台,并且开启了native epoll,就用EpollEventLoopGroup,这个也就是用JNI,调的c写的epoll;否则,就用Java NIO的NioEventLoopGroup。)。

具体代码如下:

  1. public
  2. NettyRemotingServer
  3. (
  4. final
  5. NettyServerConfig
  6. nettyServerConfig,
  7. final
  8. ChannelEventListener
  9. channelEventListener) {
  10. super
  11. (nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
  12. this
  13. .serverBootstrap =
  14. new
  15. ServerBootstrap
  16. ();
  17. this
  18. .nettyServerConfig = nettyServerConfig;
  19. this
  20. .channelEventListener = channelEventListener;
  21. //省略部分代码
  22. //初始化时候nThreads设置为1,说明RemotingServer端的Disptacher链接管理和分发请求的线程为1,用于接收客户端的TCP连接
  23. this
  24. .eventLoopGroupBoss =
  25. new
  26. NioEventLoopGroup
  27. (
  28. 1
  29. ,
  30. new
  31. ThreadFactory
  32. () {
  33. private
  34. AtomicInteger
  35. threadIndex =
  36. new
  37. AtomicInteger
  38. (
  39. 0
  40. );
  41. @ Override
  42. public
  43. Thread
  44. newThread(
  45. Runnable
  46. r) {
  47. return
  48. new
  49. Thread
  50. (r,
  51. String
  52. .format(
  53. “NettyBoss_%d”
  54. ,
  55. this
  56. .threadIndex.incrementAndGet()));
  57. }
  58. });
  59. /**
  60. * 根据配置设置NIO还是Epoll来作为Selector线程池
  61. * 如果是Linux平台,并且开启了native epoll,就用EpollEventLoopGroup,这个也就是用JNI,调的c写的epoll;否则,就用Java NIO的NioEventLoopGroup。
  62. *
  63. */
  64. if
  65. (useEpoll()) {
  66. this
  67. .eventLoopGroupSelector =
  68. new
  69. EpollEventLoopGroup
  70. (nettyServerConfig.getServerSelectorThreads(),
  71. new
  72. ThreadFactory
  73. () {
  74. private
  75. AtomicInteger
  76. threadIndex =
  77. new
  78. AtomicInteger
  79. (
  80. 0
  81. );
  82. private
  83. int
  84. threadTotal = nettyServerConfig.getServerSelectorThreads();
  85. @Override
  86. public
  87. Thread
  88. newThread(
  89. Runnable
  90. r) {
  91. return
  92. new
  93. Thread
  94. (r,
  95. String
  96. .format(
  97. “NettyServerEPOLLSelector_%d_%d”
  98. , threadTotal,
  99. this
  100. .threadIndex.incrementAndGet()));
  101. }
  102. });
  103. }
  104. else
  105. {
  106. this
  107. .eventLoopGroupSelector =
  108. new
  109. NioEventLoopGroup
  110. (nettyServerConfig.getServerSelectorThreads(),
  111. new
  112. ThreadFactory
  113. () {
  114. private
  115. AtomicInteger
  116. threadIndex =
  117. new
  118. AtomicInteger
  119. (
  120. 0
  121. );
  122. private
  123. int
  124. threadTotal = nettyServerConfig.getServerSelectorThreads();
  125. @Override
  126. public
  127. Thread
  128. newThread(
  129. Runnable
  130. r) {
  131. return
  132. new
  133. Thread
  134. (r,
  135. String
  136. .format(
  137. “NettyServerNIOSelector_%d_%d”
  138. , threadTotal,
  139. this
  140. .threadIndex.incrementAndGet()));
  141. }
  142. });
  143. }
  144. //省略部分代码

在NettyRemotingServer实例初始化完成后,就会将其启动。Server端在启动阶段会将之前实例化好的1个acceptor线程(eventLoopGroupBoss),N个IO线程(eventLoopGroupSelector),M1个worker 线程(defaultEventExecutorGroup)绑定上去。前面部分也已经介绍过各个线程池的作用了。

这里需要说明的是,Worker线程拿到网络数据后,就交给Netty的ChannelPipeline(其采用责任链设计模式),从Head到 Tail 的一个个Handler执行下去,这些 Handler是在创建NettyRemotingServer实例时候指定的。NettyEncoder和NettyDecoder 负责网络传输数据和 RemotingCommand 之间的编解码。NettyServerHandler 拿到解码得到的 RemotingCommand 后,根据 RemotingCommand.type 来判断是 request 还是 response来进行相应处理,根据业务请求码封装成不同的task任务后,提交给对应的业务processor处理线程池处理。

  1. @Override
  2. public
  3. void
  4. start() {
  5. //默认的处理线程池组,使用默认的处理线程池组用于处理后面的多个Netty Handler的逻辑操作
  6. this
  7. .defaultEventExecutorGroup =
  8. new
  9. DefaultEventExecutorGroup
  10. (
  11. nettyServerConfig.getServerWorkerThreads(),
  12. new
  13. ThreadFactory
  14. () {
  15. private
  16. AtomicInteger
  17. threadIndex =
  18. new
  19. AtomicInteger
  20. (
  21. 0
  22. );
  23. @Override
  24. public
  25. Thread
  26. newThread(
  27. Runnable
  28. r) {
  29. return
  30. new
  31. Thread
  32. (r,
  33. “NettyServerCodecThread_”
  34. +
  35. this
  36. .threadIndex.incrementAndGet());
  37. }
  38. });
  39. /**
  40. * 首先来看下 RocketMQ NettyServer 的 Reactor 线程模型,
  41. * 一个 Reactor 主线程负责监听 TCP 连接请求;
  42. * 建立好连接后丢给 Reactor 线程池,它负责将建立好连接的 socket 注册到 selector
  43. * 上去(这里有两种方式,NIO和Epoll,可配置),然后监听真正的网络数据;
  44. * 拿到网络数据后,再丢给 Worker 线程池;
  45. *
  46. */
  47. //RocketMQ-> Java NIO的1+N+M模型:1个acceptor线程,N个IO线程,M1个worker 线程。
  48. ServerBootstrap
  49. childHandler =
  50. this
  51. .serverBootstrap.
  52. group
  53. (
  54. this
  55. .eventLoopGroupBoss,
  56. this
  57. .eventLoopGroupSelector)
  58. .channel(useEpoll() ?
  59. EpollServerSocketChannel
  60. .
  61. class
  62. :
  63. NioServerSocketChannel
  64. .
  65. class
  66. )
  67. .option(
  68. ChannelOption
  69. .SO_BACKLOG,
  70. 1024
  71. )
  72. //服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
  73. .option(
  74. ChannelOption
  75. .SO_REUSEADDR,
  76. true
  77. )
  78. //这个参数表示允许重复使用本地地址和端口
  79. .option(
  80. ChannelOption
  81. .SO_KEEPALIVE,
  82. false
  83. )
  84. //当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
  85. .childOption(
  86. ChannelOption
  87. .TCP_NODELAY,
  88. true
  89. )
  90. //该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
  91. .childOption(
  92. ChannelOption
  93. .SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
  94. //这两个参数用于操作接收缓冲区和发送缓冲区
  95. .childOption(
  96. ChannelOption
  97. .SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
  98. .localAddress(
  99. new
  100. InetSocketAddress
  101. (
  102. this
  103. .nettyServerConfig.getListenPort()))
  104. .childHandler(
  105. new
  106. ChannelInitializer
  107. <
  108. SocketChannel
  109. >() {
  110. @Override
  111. public
  112. void
  113. initChannel(
  114. SocketChannel
  115. ch)
  116. throws
  117. Exception
  118. {
  119. ch.pipeline()
  120. .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
  121. new
  122. HandshakeHandler
  123. (
  124. TlsSystemConfig
  125. .tlsMode))
  126. .addLast(defaultEventExecutorGroup,
  127. new
  128. NettyEncoder
  129. (),
  130. //rocketmq解码器,他们分别覆盖了父类的encode和decode方法
  131. new
  132. NettyDecoder
  133. (),
  134. //rocketmq编码器
  135. new
  136. IdleStateHandler
  137. (
  138. 0
  139. ,
  140. 0
  141. , nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
  142. //Netty自带的心跳管理器
  143. new
  144. NettyConnectManageHandler
  145. (),
  146. //连接管理器,他负责捕获新连接、连接断开、异常等事件,然后统一调度到NettyEventExecuter处理器处理。
  147. new
  148. NettyServerHandler
  149. ()
  150. //当一个消息经过前面的解码等步骤后,然后调度到channelRead0方法,然后根据消息类型进行分发
  151. );
  152. }
  153. });
  154. if
  155. (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
  156. childHandler.childOption(
  157. ChannelOption
  158. .ALLOCATOR,
  159. PooledByteBufAllocator
  160. .DEFAULT);
  161. }
  162. try
  163. {
  164. ChannelFuture
  165. sync =
  166. this
  167. .serverBootstrap.bind().sync();
  168. InetSocketAddress
  169. addr = (
  170. InetSocketAddress
  171. ) sync.channel().localAddress();
  172. this
  173. .port = addr.getPort();
  174. }
  175. catch
  176. (
  177. InterruptedException
  178. e1) {
  179. throw
  180. new
  181. RuntimeException
  182. (
  183. “this.serverBootstrap.bind().sync() InterruptedException”
  184. , e1);
  185. }
  186. if
  187. (
  188. this
  189. .channelEventListener !=
  190. null
  191. ) {
  192. this
  193. .nettyEventExecutor.start();
  194. }
  195. //定时扫描responseTable,获取返回结果,并且处理超时
  196. this
  197. .timer.scheduleAtFixedRate(
  198. new
  199. TimerTask
  200. () {
  201. @Override
  202. public
  203. void
  204. run() {
  205. try
  206. {
  207. NettyRemotingServer
  208. .
  209. this
  210. .scanResponseTable();
  211. }
  212. catch
  213. (
  214. Throwable
  215. e) {
  216. log.error(
  217. “scanResponseTable exception”
  218. , e);
  219. }
  220. }
  221. },
  222. 1000
  223. *
  224. 3
  225. ,
  226. 1000
  227. );
  228. }

从上面的描述中可以概括得出RocketMQ的RPC通信部分的Reactor线程池模型框图。

整体可以看出RocketMQ的RPC通信借助Netty的多线程模型,其服务端监听线程和IO线程分离,同时将RPC通信层的业务逻辑与处理具体业务的线程进一步相分离。时间可控的简单业务都直接放在RPC通信部分来完成,复杂和时间不可控的业务提交至后端业务线程池中处理,这样提高了通信效率和MQ整体的性能。(ps:其中抽象出NioEventLoop来表示一个不断循环执行处理任务的线程,每个NioEventLoop有一个selector,用于监听绑定在其上的socket链路。)

总结

仔细阅读RocketMQ的过程中收获了很多关于网络通信设计技术和知识点。对于刚接触开源版的RocketMQ的童鞋来说,想要自己掌握RPC通信部分的各个技术知识点,还需要不断地使用本地环境进行debug调试和阅读源码反复思考。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。

文章来源:智云一二三科技

文章标题:消息中间件—RocketMQ的RPC通信(二

文章地址:https://www.zhihuclub.com/194878.shtml

关于作者: 智云科技

热门文章

网站地图