Rocketmq系列之NameServer源码解析

前言

一直想做Rocketmq的源码解析系列,但是这块涉及到的组件较多比较庞大一下子不好下手,最近偶然发现NameServer这块的源码比较简单,所以准备以这块做为切入点逐步补完这个系列,当是为2020的开年立个flag吧。话不多说直接进入正题。

NameServer初始化流程

1
2
3
public static void main(String[] args) {
main0(args);
}

Rocketmq源码的工程划分还是挺清晰的,NameServer的代码都在namesrv这个工程中,具体的启动入口为NamesrvStartup类的main方法,直接调用的是main0方法,首先来看一下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static NamesrvController main0(String[] args) {

try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

这里的主要工作是创建了一个NamesrvController对象,该对象是NameServer 的总控制器,负责所有服务的生命周期管理,可以看到该对象通过createNamesrvController方法创建,我们来看看创建过程中都做了些什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();

Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}

final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);

namesrvConfig.setConfigStorePath(file);

System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}

if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

return controller;
}

这里可以看到,该方法首先创建了一个NettyServerConfig的对象,该类属于remoting工程,基于netty实现了Rocketmq各个模块间的网络通信能力,包括producer、consumer和broker都有调用该模块,这里Netty相关内容不做展开。顺便从setListenPort(9876)可以看出,NameServer默认本地监控的就是9876端口。

接着是通过commandLine.hasOption('c')commandLine.hasOption('p')处理启动参数,-c参数用于指定配置文件的位置,-p参数用于打印所有配置项的值,仅供调试之用。

最后通过new NamesrvController(namesrvConfig, nettyServerConfig)创建NamesrvController对象,controller的创建过程就完成了,接着通过start(controller)方法来启动该控制器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static NamesrvController start(final NamesrvController controller) throws Exception {

if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}

boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));

controller.start();

return controller;
}

这里对controller的操作主要是三块,调用initialize()方法初始化、调用start()方法开始运行、系统关闭时调用shutdown()方法,下面一一来看一下:

首先是initialize()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean initialize() {

this.kvConfigManager.load();

this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

this.registerProcessor();

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);

***省略部分代码***

return true;
}

这里比较重要的是以下几块:

  1. new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService),该方法创建了一个remotingServer对象,里面主要工作是初始化ServerBootstrapEventLoopGroup等元素,了解Netty的话你就知道这些是通过Netty进行网络编程所必须的元素,这里不做展开。
  2. 两个定时任务scheduledExecutorService.scheduleAtFixedRate,一个每10秒执行一次NamesrvController.this.routeInfoManager.scanNotActiveBroker()方法,另一个每10分钟执行一次NamesrvController.this.kvConfigManager.printAllPeriodically()方法。kvConfigManager.printAllPeriodically()用于打印配置信息,不多赘述。而routeInfoManager.scanNotActiveBroker()则需要好好说明一下。

RouteInfoManager是一个非常核心的对象,负责保存和管理集群路由信息,其定义中包括如下的属性,这5个map是NameServer数据存储的核心,具体的说明可以参见注释。

1
2
3
4
5
6
7
8
9
10
11
12
//保存的是主题和队列信息,其中每个队列信息对应的类 QueueData 中,还保存了 brokerName。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//保存了集群中每个 brokerName 对应 Broker 信息,每个 Broker 信息用一个 BrokerData 对象表示,
//`BrokerData`中保存了集群名称cluster,brokerName 和一个保存Broker物理地址的 Map:brokerAddrs,
//它的Key是BrokerID,Value就是这个BrokerID对应的Broker的物理地址。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//保存的是集群名称与 BrokerName 的对应关系
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//保存了每个 Broker 当前的动态信息,包括心跳更新时间,路由数据版本等等
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//保存了每个 Broker 对应的消息过滤服务的地址,用于服务端消息过滤
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

RouteInfoManagerscanNotActiveBroker()方法又做了什么呢,来看一看

1
2
3
4
5
6
7
8
9
10
11
12
13
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}

根据上面对brokerLiveTable的说明,我们知道了该方法主要是遍历该map,对最后心跳时间超过阈值BROKER_CHANNEL_EXPIRED_TIME的broker节点进行移除,调用RemotingUtil.closeChannel方法关闭连接,并且调用onChannelDestroy方法对其它4个map中相关的数据进行清除,需要说明的是由于这些map都不是并发容器,因此在onChannelDestroy中做清除操作时使用了ReentrantReadWriteLock来做并发控制。至于为什么不一开始就使用并发容器进行数据存储,猜想可能是因为清理操作更为低频,而平时非并发操作map情况更频繁,这样相对更节省开销吧。

  1. 顺带提一下this.registerProcessor()方法,内部调用如下
1
2
3
4
5
6
7
8
9
10
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {

this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {

this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}

这里调用了remotingServer对象的registerDefaultProcessor方法

1
2
3
4
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}

其实质是为defaultRequestProcessor属性创建了一个Pair类型的对象,后续的业务处理都会使用到这个对象。

到此initialize()方法分析完了,接着来看start()方法

1
2
3
4
5
6
7
public void start() throws Exception {
this.remotingServer.start();

if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}

还是调用了之前初始化好的remotingServerstart()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});

ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});

if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}

try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}

if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}

this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}

熟悉Netty的话看到这里就很清楚了,如此长的一段代码几乎就是标准的Server端编码模板,通过serverBootstrap启动对本地端口的监听,而最重要的业务包的处理就会交由处理链上的NettyConnectManageHandlerNettyServerHandler类处理,关于这两个类后续再展开分析。

另外,该方法还调用了this.nettyEventExecutor.start(),这里的nettyEventExecutor是一个NettyEventExecutor类型的对象,该类是NettyRemotingAbstract类的内部类,实现了Runnable接口,其run方法实现为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    while (!this.isStopped()) {
try {
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;

}
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}

log.info(this.getServiceName() + " service end");
}

这里通过一个循环不停从eventQueue队列中取event,并根据event的类型调用监听器listener的不同方法,这里的listener溯源回去其实就是NettyRemotingServer初始化时传入的brokerHousekeepingService对象,其类定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class BrokerHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;

public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}

@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}

@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}

可以看到其实不管是IDLECLOSE还是EXCEPTION事件最后调用的都是之前遇到过的RouteInfoManageronChannelDestroy方法。

也就是说其实this.nettyEventExecutor.start()起了一个新线程一样是在定期做RouteInfoManager的清理工作。

分析完了start()方法最后来看一下controller注册的shutdown()方法:

1
2
3
4
5
6
7
8
9
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
this.scheduledExecutorService.shutdown();

if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}

可以看到执行的都是之前使用的各种线程池的关闭,这样做非常优雅,避免了进程退出时各种处理中信息的丢失,很值得学习。

NameServer与其他组件的交互

讲完了NameServer的初始化,但没有讲Broker和NameServer,以及Producer、Consumer和NameServer之间是如何交互的,其实NameServer和各个组件都是基于Netty进行通信的,因此无论是和哪个组件进行交互基于Netty的基础框架部分都是一致的,正常有区别的其实是对不同组件发来请求的业务处理相关部分。

回到之前的remotingServerstart()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public void start() {

***省略部分代码***

ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});

***省略部分代码***
}

了解Netty的话就会知道,本地端口监听到的包都会由Handler链处理,除了NettyEncoderNettyDecoderIdleStateHandler这几个Netty自带的包以外,我们需要关注的就是NettyConnectManageHandlerNettyServerHandler这两个自定义的Handler。

首先看一下NettyConnectManageHandler类,该类继承了Netty的ChannelDuplexHandler类,实现了其中的channelRegisteredchannelUnregisteredchannelActivechannelInactiveuserEventTriggeredexceptionCaught方法,用于处理channel的注册、连接和断开等事件,具体源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
super.channelRegistered(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
super.channelUnregistered(ctx);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);

if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
super.channelInactive(ctx);

if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}

ctx.fireUserEventTriggered(evt);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);

if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}

RemotingUtil.closeChannel(ctx.channel());
}
}

这里以channelActive方法进行分析,当broker和namesrv建立起channel时,会触发该方法的调用,该方法中除了常规的调用super.channelActive(ctx)将消息继续向Handler链的下游传递,也就是NettyServerHandler类,另外该方法还调用了NettyRemotingServer.this.putNettyEvent方法,我们来看下该方法:

1
2
3
public void putNettyEvent(final NettyEvent event) {
this.nettyEventExecutor.putNettyEvent(event);
}

又调用了nettyEventExecutor对象的putNettyEvent方法,接着来看一下:

1
2
3
4
5
6
7
public void putNettyEvent(final NettyEvent event) {
if (this.eventQueue.size() <= maxSize) {
this.eventQueue.add(event);
} else {
log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
}
}

可以看到主要做的事情就是往eventQueue这个阻塞队列里放入event对象,也就是一个new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel())类型的对象。之前已经分析过,NameServer启动后会有一个NettyEventExecutor类型的对象循环不停从eventQueue队列中取event,并根据event的类型调用监听器listener的不同方法。

接着来看NettyServerHandler类,该类继承了Netty的SimpleChannelInboundHandler<I>类,但仅仅实现了channelRead0一个方法,源码如下:

1
2
3
4
5
6
7
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}

该方法调用了processMessageReceived(ctx, msg)方法,来看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}

这里主要处理msg的类型为REQUEST_COMMANDRESPONSE_COMMAND的两类消息,我们以REQUEST_COMMAND类型也就是请求消息为例进行分析,该类消息调用了processRequestCommand(ctx, cmd)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();

if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {

}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());

if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};

if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}

try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}

if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}

这个方法由一个大的if-else块组成,因为processorTable在Broker中用于缓存请求,而NameServer中并没有使用,因此得到的pair对象就是前面提到过的初始化了的defaultRequestProcessor对象。这里if分支主要做的事情是创建一个任务,并通过线程池执行它,我们先来看一下该任务做了些什么,其中的doBeforeRpcHooksdoAfterRpcHooks方法主要是在处理请求的前后做一些前后置工作,实际处理请求的是pair.getObject1().processRequest(ctx, cmd)方法,来看看这个方法做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {

if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}


switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}

这是一个非常典型的处理Request的路由分发过程,根据request.getCode() 来分发请求到对应的处理器中。例如Broker发给NameServer注册请求的Code为 REGISTER_BROKER,客户端获取路由信息的Code为GET_ROUTEINTO_BY_TOPIC

看完了processRequest方法,接着看以下代码段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {

}
}

可以看到,如果收到的请求不是oneway类型的,那么就需要用之前处理后得到的response对象去做回复。到此整个NameServer的业务流程大致就讲完了。

总结

本文通过源码分析了NameServer的初始化流程和请求处理这两块,了解了NameServer工作的大致流程,在了解namesrv工程的同时也顺带了解了一部分remoting工程,应该会为后续继续分析其他组件起到帮助。顺便说一下,这次源码其实看得挺快的,有一个重要原因是对Netty比较熟悉,所以多看一些底层的东西真的会很有帮助,越到后面越会体现出来。另外,这次是真的下决心要把Rocketmq系列做完的,这绝对是个大坑,希望我能坚持住吧。