前言
一直想做Rocketmq的源码解析系列,但是这块涉及到的组件较多比较庞大一下子不好下手,最近偶然发现NameServer这块的源码比较简单,所以准备以这块做为切入点逐步补完这个系列,当是为2020的开年立个flag吧。话不多说直接进入正题。
NameServer初始化流程
1 | public static void main(String[] args) { |
Rocketmq源码的工程划分还是挺清晰的,NameServer的代码都在namesrv这个工程中,具体的启动入口为NamesrvStartup类的main方法,直接调用的是main0方法,首先来看一下源码:
1 | public static NamesrvController main0(String[] args) { |
这里的主要工作是创建了一个NamesrvController对象,该对象是NameServer 的总控制器,负责所有服务的生命周期管理,可以看到该对象通过createNamesrvController方法创建,我们来看看创建过程中都做了些什么。
1 | public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { |
这里可以看到,该方法首先创建了一个NettyServerConfig的对象,该类属于remoting工程,基于netty实现了Rocketmq各个模块间的网络通信能力,包括producer、consumer和broker都有调用该模块,这里Netty相关内容不做展开。顺便从setListenPort(9876)可以看出,NameServer默认本地监控的就是9876端口。
接着是通过commandLine.hasOption('c')和commandLine.hasOption('p')处理启动参数,-c参数用于指定配置文件的位置,-p参数用于打印所有配置项的值,仅供调试之用。
最后通过new NamesrvController(namesrvConfig, nettyServerConfig)创建NamesrvController对象,controller的创建过程就完成了,接着通过start(controller)方法来启动该控制器。
1 | public static NamesrvController start(final NamesrvController controller) throws Exception { |
这里对controller的操作主要是三块,调用initialize()方法初始化、调用start()方法开始运行、系统关闭时调用shutdown()方法,下面一一来看一下:
首先是initialize()方法
1 | public boolean initialize() { |
这里比较重要的是以下几块:
new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService),该方法创建了一个remotingServer对象,里面主要工作是初始化ServerBootstrap、EventLoopGroup等元素,了解Netty的话你就知道这些是通过Netty进行网络编程所必须的元素,这里不做展开。- 两个定时任务
scheduledExecutorService.scheduleAtFixedRate,一个每10秒执行一次NamesrvController.this.routeInfoManager.scanNotActiveBroker()方法,另一个每10分钟执行一次NamesrvController.this.kvConfigManager.printAllPeriodically()方法。kvConfigManager.printAllPeriodically()用于打印配置信息,不多赘述。而routeInfoManager.scanNotActiveBroker()则需要好好说明一下。
RouteInfoManager是一个非常核心的对象,负责保存和管理集群路由信息,其定义中包括如下的属性,这5个map是NameServer数据存储的核心,具体的说明可以参见注释。
1 | //保存的是主题和队列信息,其中每个队列信息对应的类 QueueData 中,还保存了 brokerName。 |
那RouteInfoManager的scanNotActiveBroker()方法又做了什么呢,来看一看
1 | public void scanNotActiveBroker() { |
根据上面对brokerLiveTable的说明,我们知道了该方法主要是遍历该map,对最后心跳时间超过阈值BROKER_CHANNEL_EXPIRED_TIME的broker节点进行移除,调用RemotingUtil.closeChannel方法关闭连接,并且调用onChannelDestroy方法对其它4个map中相关的数据进行清除,需要说明的是由于这些map都不是并发容器,因此在onChannelDestroy中做清除操作时使用了ReentrantReadWriteLock来做并发控制。至于为什么不一开始就使用并发容器进行数据存储,猜想可能是因为清理操作更为低频,而平时非并发操作map情况更频繁,这样相对更节省开销吧。
- 顺带提一下
this.registerProcessor()方法,内部调用如下
1 | private void registerProcessor() { |
这里调用了remotingServer对象的registerDefaultProcessor方法
1 |
|
其实质是为defaultRequestProcessor属性创建了一个Pair类型的对象,后续的业务处理都会使用到这个对象。
到此initialize()方法分析完了,接着来看start()方法
1 | public void start() throws Exception { |
还是调用了之前初始化好的remotingServer的start()方法
1 |
|
熟悉Netty的话看到这里就很清楚了,如此长的一段代码几乎就是标准的Server端编码模板,通过serverBootstrap启动对本地端口的监听,而最重要的业务包的处理就会交由处理链上的NettyConnectManageHandler和NettyServerHandler类处理,关于这两个类后续再展开分析。
另外,该方法还调用了this.nettyEventExecutor.start(),这里的nettyEventExecutor是一个NettyEventExecutor类型的对象,该类是NettyRemotingAbstract类的内部类,实现了Runnable接口,其run方法实现为
1 | while (!this.isStopped()) { |
这里通过一个循环不停从eventQueue队列中取event,并根据event的类型调用监听器listener的不同方法,这里的listener溯源回去其实就是NettyRemotingServer初始化时传入的brokerHousekeepingService对象,其类定义为:
1 | public class BrokerHousekeepingService implements ChannelEventListener { |
可以看到其实不管是IDLE、CLOSE还是EXCEPTION事件最后调用的都是之前遇到过的RouteInfoManager的onChannelDestroy方法。
也就是说其实this.nettyEventExecutor.start()起了一个新线程一样是在定期做RouteInfoManager的清理工作。
分析完了start()方法最后来看一下controller注册的shutdown()方法:
1 | public void shutdown() { |
可以看到执行的都是之前使用的各种线程池的关闭,这样做非常优雅,避免了进程退出时各种处理中信息的丢失,很值得学习。
NameServer与其他组件的交互
讲完了NameServer的初始化,但没有讲Broker和NameServer,以及Producer、Consumer和NameServer之间是如何交互的,其实NameServer和各个组件都是基于Netty进行通信的,因此无论是和哪个组件进行交互基于Netty的基础框架部分都是一致的,正常有区别的其实是对不同组件发来请求的业务处理相关部分。
回到之前的remotingServer的start()方法
1 | @Override |
了解Netty的话就会知道,本地端口监听到的包都会由Handler链处理,除了NettyEncoder、NettyDecoder和IdleStateHandler这几个Netty自带的包以外,我们需要关注的就是NettyConnectManageHandler和NettyServerHandler这两个自定义的Handler。
首先看一下NettyConnectManageHandler类,该类继承了Netty的ChannelDuplexHandler类,实现了其中的channelRegistered、channelUnregistered、channelActive、channelInactive、userEventTriggered和exceptionCaught方法,用于处理channel的注册、连接和断开等事件,具体源码如下:
1 | class NettyConnectManageHandler extends ChannelDuplexHandler { |
这里以channelActive方法进行分析,当broker和namesrv建立起channel时,会触发该方法的调用,该方法中除了常规的调用super.channelActive(ctx)将消息继续向Handler链的下游传递,也就是NettyServerHandler类,另外该方法还调用了NettyRemotingServer.this.putNettyEvent方法,我们来看下该方法:
1 | public void putNettyEvent(final NettyEvent event) { |
又调用了nettyEventExecutor对象的putNettyEvent方法,接着来看一下:
1 | public void putNettyEvent(final NettyEvent event) { |
可以看到主要做的事情就是往eventQueue这个阻塞队列里放入event对象,也就是一个new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel())类型的对象。之前已经分析过,NameServer启动后会有一个NettyEventExecutor类型的对象循环不停从eventQueue队列中取event,并根据event的类型调用监听器listener的不同方法。
接着来看NettyServerHandler类,该类继承了Netty的SimpleChannelInboundHandler<I>类,但仅仅实现了channelRead0一个方法,源码如下:
1 | class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { |
该方法调用了processMessageReceived(ctx, msg)方法,来看一下
1 | public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { |
这里主要处理msg的类型为REQUEST_COMMAND和RESPONSE_COMMAND的两类消息,我们以REQUEST_COMMAND类型也就是请求消息为例进行分析,该类消息调用了processRequestCommand(ctx, cmd)方法
1 | public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { |
这个方法由一个大的if-else块组成,因为processorTable在Broker中用于缓存请求,而NameServer中并没有使用,因此得到的pair对象就是前面提到过的初始化了的defaultRequestProcessor对象。这里if分支主要做的事情是创建一个任务,并通过线程池执行它,我们先来看一下该任务做了些什么,其中的doBeforeRpcHooks和doAfterRpcHooks方法主要是在处理请求的前后做一些前后置工作,实际处理请求的是pair.getObject1().processRequest(ctx, cmd)方法,来看看这个方法做了什么。
1 |
|
这是一个非常典型的处理Request的路由分发过程,根据request.getCode() 来分发请求到对应的处理器中。例如Broker发给NameServer注册请求的Code为 REGISTER_BROKER,客户端获取路由信息的Code为GET_ROUTEINTO_BY_TOPIC。
看完了processRequest方法,接着看以下代码段
1 | if (!cmd.isOnewayRPC()) { |
可以看到,如果收到的请求不是oneway类型的,那么就需要用之前处理后得到的response对象去做回复。到此整个NameServer的业务流程大致就讲完了。
总结
本文通过源码分析了NameServer的初始化流程和请求处理这两块,了解了NameServer工作的大致流程,在了解namesrv工程的同时也顺带了解了一部分remoting工程,应该会为后续继续分析其他组件起到帮助。顺便说一下,这次源码其实看得挺快的,有一个重要原因是对Netty比较熟悉,所以多看一些底层的东西真的会很有帮助,越到后面越会体现出来。另外,这次是真的下决心要把Rocketmq系列做完的,这绝对是个大坑,希望我能坚持住吧。