RocketMQ.NameServer
什么是NameServer?
NameServer和Zookeeper的区别?
NameServer工作原理
1. NameServer功能
1)保存Topic的路由消息
提供给Producer来查询对于Broker地址信息用于发送消息用的。Producer发送消息是会先到NameServer获取Topic的路由信息,从这些路由信息中选择一个Broker进行消息发送。
路由信息是无状态的,默认存储在内存中。也可以通过配置将其持久化到文件中,不过实际应用中很少使用。
2)维护与Broker的心跳连接
Topic的路由消息由NameServer进行维护,NameServer负责与所有的Broker进行心跳维护。
3)维护Topic的订阅列表(NameServer会维护订阅信息吗?还是由Broker自己维护?)
订阅信息是指谁订阅了Topic,当有新的消息来的时候会通知这些订阅者。消息是由Producer直接发给Broker的,NameServer不会接收到新消息通知,所以订阅列表由Broker来维护。
3)NameServer的集群部署
NameServer可以通过部署多套实例来实现高可用场景,不过多实例直接是不互相备份的。所以客户端需要把所有的NameServer服务地址都配置上。
2. 核心概念
路由表
NameServer的主要作用就是维护Topic路由消息,这些路由信息是在NameServer服务启动的时候由每个Broker主动上报的。
路由消息
NameServer维护了5个路由表:
//Topic列表信息,保存Topic对应的每个Broker名称,QueueData还包含了mq的读写数量
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//Broker地址信息,保存Broker名称对应的Broker地址信息(就是一台Broker机器)
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//Broker集群信息,保存Broker集群对应的所有Broker名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//Broker更新信息,保存每台Broker机器的状态信息,NameServer会定期检查该状态
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//Broker过滤信息,Broker服务端过滤
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
这里首先要介绍下Broker的启动流程。
Broker启动后会主动向NameServer进行注册?注册了哪些事件?上报了哪些信息?
Broker的服务注册 注册参数里面包括brokerId、brokerAddr、brokerName,topicConfig(包括了Topic的信息)。
NameServer源码解析
总结
RocketMQ概览
RocketMQ主要由Producer、Concumer、NameServer、Broker四个部分组成。NameServer用于管理Broker,接受Broker的注册并与其进行心跳检测维护。Broker用于消息的存储和分发,可以启动多个Broker来同步备份消息,并将自己注册到NameServer。Producer从NameServer选择一个Broker发送消息。Consumer向NameServer订阅消息。
可以启动多个NameServer(分布在多台机器)和Broker(每台机器启动一个Master和Slave角色的Broker)。
Broker配置文件重要参数介绍:
- fileReservedTime=48 消息在磁盘上的存储时间,自动删除超时消息
- deleteWhen=04 什么时候删除
- brokerRole=SYNC_MASTER broker的消息同步机制:SYNC_MASTER、ASYNC_MASTER、SLAVE。SYNC表示当Slave和Master消息同步完成之后,再返回发送成功的状态。
- flushDiskType=ASYNC_FLUSH 刷盘策略,分为SYNC_FLUSH和ASYNC_FLUSH两种,代表同步刷盘和异步刷盘。同步刷盘在消息真正写入磁盘后再返回成功状态;异步刷盘在消息写入page_cache后就返回成功状态。
- listenPort=10911 Broker监听的端口号。
- storePathRootDir=/home/rocketmq/store-a 消息和配置信息的存储目录
MQAdmin是RocketMQ自带的命令行管理工具,使用mqadmin命令,可以创建、修改Topic,更新Broker配置信息等。
apache/rocketmq-externals图界面管理消息队列。
消费者 消费者分为两种类型,DefaultMQPushConsumer和DefaultMQPullConsumer。DefaultMQPushConsumer由系统主动推送消息(内部也是使用“长轮询”实现的),DefaultMQPullConsumer由消费者主动拉取消息。
Push方式,是服务端接收到消息后,就主动把消息推送给Client端。首先加大了Server端的工作量,进而影响其性能。其次,Client的处理能力各不相同,可能造成Client消息堆积等各种潜在问题。
Pull方式,是客户端循环地从Server端拉取消息,主动权在Client端。但是要处理好循环的间隔策略。
GroupName用于把多个Consumer组织到一起,RocketMQ支持两种消息模式:Clustering和Broadcasting。Clustering模式,同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分。Broadcasting模式,同一个ConsumerGroup里的每个Consumer都能消费所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。
Topic用于标识消息的类型,如果不需要消费Topic下的所有消息,可以指定消息的Tag来过滤消息。
DefaultMQPushConsumer
RocketMQ的服务端“长轮询”模式做了一些优化,并不会一直循环空转: brokerSuspendMaxTimeMillis,Broker在没有消息时的最长阻塞时间,默认15秒,有消息会立刻返回。客户端发送请求到服务端,如果服务器这是没有新的消息,则会挂起请求,定期(每5秒)check一下消息队列,有就直接返回,没有直接最长阻塞时间再返回。“长轮询”的局限性是在挂起请求时会占用资源。
PushConsumer的流量控制,每个MessageQueue都有对应一个ProcessQueue,用于保存这个MessageQueue还未处理的消息。ProcessQueue对象里有一个TreeMap和一个读写锁。TreeMap以消息的Offset作为key,以消息内容作为value。读写锁控制多个线程并发访问。有了ProcessQueue对象后,消费者在pull消息时,会优先检查ProcessQueue内是否还有未处理的消息,如果有且超过一定值(消息个数、总大小、Offset跨度)则延迟pull消息,从而达到流量控制目的。ProcessQueue还可以辅助实现顺序消费的逻辑。
DefaultMQPullConsumer
需要客户端自己保持消息消费的进度Offset。?为什么要保存Offset呢?有点忘记了。因为主动权都设计在客户端,所以客户端可以对消息进行重复消费或者跳跃消费。设计在服务端,就要服务端保存每个客户端的消费进度。
生产者 支持同步发送、异步发送、延迟发送、发送事务消息等。延迟发送可以对消息设置指定时间,broker在收到消息后不会立马处理。可以使用MessageQueueSelector指定MessageQueue队列发送消息。 RocketMQ采用两阶段提交方式实现事务消息: 1)发送方向RocketMQ发送“待确认”消息; 2)RocketMQ收到“待确认”消息持久化成功后,返回发送成功。第一阶段消息发送完成。 3)发送方开始执行本地事件逻辑。 4)事件执行完后,再想RocketMQ发送二次确认(Commit或Rollback),RocketMQ收到Commit将第一阶段消息标记可投递,发送给订阅方;收到rollback则删除第一阶段消息,结束。
两阶段提交可能出现的问题: 1)发送二次确认时,RocketMQ未收到或者超时。RocketMQ会在经过固定时间段后主动发送回查请求。发送方收到回查请求后,根据执行结果发送二次确认消息。 2)因为RocketMQ是顺序存消息的,二次确认需要update消息状态,所以需要查找之前的消息。这样会造成磁盘Catch的脏页过多,降低系统的性能。 RocketMQ事务消息回查设计方案[https://blog.csdn.net/qq_27529917/article/details/79802406]
存储队列位置信息 RocketMQ中,一种类型的消息会放到一个Topic里,为了能够并行,一般一个Topic会有多个MessageQueue,Offset是指某个Topic下的一条消息在某个MessageQueue里的位置。通过Offset可以定位到这条信息,或者从这条消息向后继续处理。MessageQueue其实只是一个对象,保存了topic名称、broker名称。
RouteInfoManager类用于保存各角色的状态信息,创建Topic命令会直接发往对应的Broker。Broker创建成功后,会向NameServer发送注册信息,NameSever更新后其他客户端才能发现新增的Topic。为什么不用Zookeeper,保持轻量级是首要目标。NameServer总共才8个类文件,非常好维护。
Name Server
Name Server 主要负责管理集群的路由信息,包括Topic队列信息和Broker地址信息。客户端可以通过Name Server获取当前topic信息,通过topic获取broker信息,通过broker获取broker地址信息等等。
NameServer本身是无状态的,不会持久化Broker、Topic等状态信息,都是由个角色定时上报并存储到内中的。
NamesrvStartup为NameServer的启动类,NamesrvStartup通过加装默认的配置文件,实例化NamesrvController控制器类来执行Name Server操作。NamesrvController根据配置文件创建Netty服务端ServerBootstrap用于监听客户端的请求,Netty服务端调用DefaultRequestProcessor处理程序来处理客户端的请求,DefaultRequestProcessor根据消息类型做出相应的操作更新RouteInfoManager路由信息。
Name Server 类图
NamesrvStartup
Name Server的启动,首先通过调用NamesrvStartup类的main方法进行执行。该类首先加载系统默认配置文件,NamesrvConfig和NettyServerConfig,顾名思义NamesrcConfig就是Name Server相关的配置信息,例如rocketmq的home目录等,NettyServerConfig就是启动Netty服务端时的相关配置信息,例如监听端口、工作线程池数、服务端发送接收缓存池的大小等。RocketMQ是使用Netty作为底层RPC通信框架的,所以Name Server实际是启动了一个Netty服务端ServerBootstrap来监听客户端的请求,同时通过向Netty注册监听处理程序来处理客户端的请求。
package com.alibaba.rocketmq.namesrv;
public class NamesrvStartup {
/**
* 程序或命令行启动,调用的入口
* @param args
*/
public static void main(String[] args) {
main0(args);
}
public static NamesrcController main0(String[] args) {
//...
//创建NamesrvConfig配置文件
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//创建NettyServerConfig配置文件
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);//设置Netty服务端监听端口,默认9876
//创建NamesrvController,传入配置文件构造controller
final NamesrvController controller = new NamesrvController(namesrcConfig, nettyServerConfig);
controller.initialize();//初始化controller
//...
}
}
NameServer启动:
- 创建3个线程,一个工作线程池,两个定时器线程。定时器线程一个用于扫描失效的broker,另一个用于打印配置信息。
- 启动通信服务端remoteServer,用于监听接收broker、client等发送的请求。
NameServer的核心业务处理逻辑就是在接收到请求后,对请求的处理。逻辑主题是个switch语句,根据RequestCode调用不同函数处理。
NamesrvController
NamesrvController是NamesrvStartup的控制器类,它创建了用于接收客户端请求的Netty服务端,并向Netty注册了请求处理程序,Netty服务端接收到客户端的请求后,调用请求处理程序来处理客户端的请求。
NamesrcStartup创建了NamesrvConfig和NettyServerConfig配置文件后,通过这两个配置文件实例化NamesrvController控制器类,然后调用NamesrvController.initialize方法进行初始化。
NamesrvController在创建时,实例化了RouteInfoManager和BrokerHouseKeepingService两个对象。Name Server中最重要的就是RouteInfoManager类。Name Server所有的Topic和Borker信息都保存在RouteInfoManager中,RouteInfoManager保存所有的路由信息。BrokerHouseKeepingService用于处理Broker状态事件,当Broker失效、异常或者关闭,则将Broker从RouteInfoManager中移除。
NamesrvController初始化时,根据NettyServerConfig创建了Netty服务端,并向Netty服务端注册了请求处理程序,同时还创建了一个用于处理请求的线程池remotingExecutor。Netty服务端接收到客户端的请求后,将请求封装成一个Task任务,然后把该任务提交到线程池remotingExecutor进行处理。
package com.alibaba.rocketmq.namesrv;
public class NamesrvController {
//Name Server 配置文件
private final NamesrvCofnig namesrvConfig;
//Netty 服务端配置文件
private final NettyServerConfig nettyServerConfig;
//Netty 服务端
private RemotingServer remotingServer;
//Broker状态监听处理程序
private BrokerHouseKeepingServer brokerHouseKeepingService;
//线程池 用于处理消息请求DefaultRequestProcessor
private ExecutorService remotingService;
//...
/**
* 实例化NamesrvController
* @param namesrvConfig Name Server 配置文件
* @param nettyServerConfig Netty 服务端配置文件
*/
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
//...
//保存所有的路由信息
this.routeInfoManager = new RouteInfoManager();
//Broker状态事件处理程序,处理Broker的异常、关闭等状态
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
}
/**
* 初始化
* @return
*/
public boolean initialize() {
//...
//创建Netty服务端,nettyServerConfig为Netty服务端相关配置文件,例如前面在NamesrvStartup中配置了监听端口9876
//brokerHouseKeepingServer broker状态处理程序
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//创建一个固定大小的线程池,根据nettyServerConfig配置文件提供的默认工作线程数,默认值为8
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注册请求处理程序,当Netty服务端接收到客户端的请求时,会调用该处理程序
this.registerProcessor();
//...
}
/**
* 给Netty服务端注册请求处理程序,使用线程池来处理请求
*/
private void registerProcessor() {
//...
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingServer);
}
}
RouteInfoManager
RouteInfoManager保存所有的路由信息,包括topic和broker信息。Netty服务端接收到请求后,回调请求处理程序DefaultRequestProcessor,defaultRequestProcessor根据请求类型RequestCode,例如注册Broker或者新建Topic请求,来更新RouteInfoManager路由信息。
package com.alibaba.rocketmq.namesrv.routeinfo;
public class RouteInfoManager {
// Broker上报注册时会更新一下5个集合
// 使用读写锁控制broker注册并发
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//Topic列表信息,保存Topic对应的每个Broker名称,QueueData还包含了mq的读写数量
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//Broker地址信息,保存Broker名称对应的Broker地址信息(就是一台Broker机器)
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//Broker集群信息,保存Broker集群对应的所有Broker名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//Broker更新信息,保存每台Broker机器的状态信息,NameServer会定期检查该状态
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//Broker过滤信息,Broker服务端过滤
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//...
/**
* 注册新的Broker,将新的Broker添加到brokderAddrTable等信息中
*/
public RegisterBrokerResult registerBroker(...) {
//...
//更新Broker集群信息
this.clusterAddrTable.put(clusterName, brokerNames);
//更新Broker地址信息
this.brokerAddrTable.put(brokerName, brokerData);
//更新Broker状态信息
this.brokerLiveTable.put(...);
//更新Broker过滤信息
this.filterServerTable.put(brokerAddr, filterServerList);
// 更新Topic信息
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
//...
}
}
BrokerHouseKeepingService
BrokerHouseKeepingService专门处理broker是否存活。如果broker失效、异常或者关闭,则将broker从RouteInfoManager路由信息中移除,同时将与该broker相关的topic信息也一起删除。Netty服务端专门启动了一个线程用于监听管道的失效、异常或者关闭等的事件队列,当事件队列里面有新事件时,则取出事件并判断事件的类型,然后调用BrokerHouseKeepingService对应的方法来处理该事件。
package com.alibaba.rocketmq.remoting.netty;
/**
* Netty服务端公共抽象类
*/
public abstract class NettyRemotingAbstract {
//...
/**
* 新建线程监听事件队列
*/
class NettyEventExecuter extends ServiceThread {
//事件队列,这里使用LinkedBlockingQueue队列,基于链表的阻塞队列,是线程安全的队列。新事件自动加入到队尾,
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
//...
/**
* 添加新事件
* @param event
*/
public void putNettyEvent(final NettyEvent event) {
this.eventQueue.add(event);
}
@Override
public void run() {
//请求事件处理程序 BrokerHouseKeepingService
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
while(!this.isStoped()) {
//每3秒访问一次事件队列
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
//判断事件类型
switch (event.getType()) {
case IDLE://失效
//调用 BrokerHouseKeepingService 的 onChannelIdle
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE://关闭
//调用 BrokerHouseKeepingService 的 onChannelClose
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT://连接
//调用 BrokerHouseKeepingService 的 onChannelConnect
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION://异常
//调用 BrokerHouseKeepingService 的 onChannelException
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
}
}
}
}
package com.alibaba.rocketmq.namesrv.routeinfo;
/**
* Broker状态事件处理程序,处理Broker状态的服务,实现了ChannelEventListener接口。
* 当Netty有新事件时,将调用ChannelEventListener接口处理事件。
*/
public class BrokerHouseKeepingService implements ChannelEventListener {
//...
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
//管道关闭时,将broker从RouteInfoManager中移除
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
//管道异常时,将broker从RouteInfoManager中移除
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
//管道失效时,将broker从RouteInfoManager中移除
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
//...
}
package com.alibaba.rocketmq.namesrv.routeinfo;
public class RouteInfoManager {
//...
/**
* 销毁管道
* @param remoteAddr 客户端地址信息
* @param channel 客户端管道
*/
public void onChannelDestroy(String remoteAddr, Channel channel) {
//1.根据channel客户端管道找到brokerAddr对应的broker地址信息
//2.根据brokerAddr地址将broker从brokerLiveTable和filterServerTable移除
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
//3.根据brokerAddr遍历brokerAddrTable,将broker从brokerAddrTable中移除
this.brokerAddrTable.remove();
//4.根据brokerAddr遍历clusterAddrTable,将broker从clusterAddrTable中移除
this.clusterAddrTable.remove();
//4.根据brokerAddr遍历topicQueueTable,将broker从topicQueueTable中移除
this.topicQueueTable.remove();
}
}
NettyEventExecuter线程会每隔3秒定时轮询事件队列eventQueue,当有新事件时,则从队列中取出事件,判断事件类型然后调用BrokerHouseKeepingService处理事件。NettyEventExecuter还提供了putNettyEvent方法用于添加事件到队列中。
NettyRemotingServer
NettyRemotingServer就是Netty服务端,用于监听客户端的请求,然后更新RouteInfoManager路由表信息。
NamesrvController创建了Netty服务端NettyRemotingServer,根据NamesrvStartup对象提供的nettyServerConfig配置文件,以及将BrokerHouseKeepingService处理程序传入NettyRemotingServer的构造函数中,然后调用NettyRemotingServer的start方法,启动Netty。
Netty启动后,开始监听客户端的请求,当有新的请求到来时,将该请求封装成一个Task任务,然后调用线程池处理该Task。
package com.alibaba.rocketmq.remoting.netty;
/**
* Netty 服务端
*/
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
/**
* 通过Netty配置文件和管道事件监听器实例化Netty服务端
* @param nettyServerConfig Netty配置文件
* @param channelEventListener 管道事件监听器 BrokerHouseKeepingService实现了该接口
*/
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
//...
//Netty 服务端启动辅助类
this.serverBootstrap = new ServerBootstrap();
//Netty 配置文件
this.nettyServerConfig = nettyServerConfig;
//管道事件监听器 BrokerHouseKeepingService实现了该接口
this.channelEventListener = channelEventListener;
//...
}
@Override
public void start() {
//正常的启动Netty
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))//监听配置文件设置的9876端口
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new NettyEncoder(),//自定义Netty编码类
new NettyDecoder(),//自定义Netty解码类
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//心跳空闲检查时间
new NettyConnetManageHandler(),//处理管道连接、端口、异常等事件
new NettyServerHandler());//处理管道消息事件
}
});
this.nettyEventExecutor.start();//启动一个线程,监听事件队列。就是上面提到的管道断开、异常或关闭时的事件队列
}
}
Netty启动时,向pipeline管道添加了两个自定义的ChannelHandler,一个是NettyConnnetManageHandler
,一个是NettyServerHandler
。
上文有提到Netty专门启动了一个线程NettyEventExecuter,用于监听(管道断开、异常或关闭时触发的事件)事件队列的。NettyEventExecuter线程还提供了一个putNettyEvent方法用于添加事件到队列中。那么这个方法由谁来调用呢?就是NettyConnetManageHanlder处理程序来调用的。
NettyConnetManageHandler
继承于ChannelDuplexHandler,并且实现了channelInactive、userEventTriggered、exceptionCaught等方法。当有管道失效时,会自动触发channelInactive方法,然后在这个方法里面调用nettyEventExecuter.putNettyEvent(event)方法,将事件添加到事件队列中。
package com.alibaba.rocketmq.remoting.netty;
/**
* Netty 服务端
*/
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
//...
class NettyConnetManageHandler extends ChannelDuplexHandler {
//...
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//获取客户端远程地址
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
//创建一个NettyEvent事件,标记该事件类型为CLOSE,然后将该事件添加到nettyEventExecuter的eventQueue队列中
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
}
}
NettyServerHandler
用于处理请求信息,继承于SimpleChannelInboundHandler,并且实现了channelRead0方法。
package com.alibaba.rocketmq.remoting.netty;
/**
* Netty 服务端
*/
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
//...
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
//调用NettyRemoteAbstract的processMessageReceived方法来处理请求
processMessageReceived(ctx, msg);
}
}
NettyRemotingAbstract
根据请求类型,调用相应的处理程序。NamesrvController在创建NettyRemotingServer时,向NettyRemotingServer注册了默认的处理程序,并且传入了线程池用于处理该程序。NettyRemotingAbstract
将该请求封装成一个Task,然后使用该线程池执行该Task。
package com.alibaba.rocketmq.remoting.netty;
/**
* Netty服务端公共抽象类
* @author shijia.wxr
*/
public abstract class NettyRemotingAbstract {
//NamesrvController注册的默认处理程序,以及用于执行该处理程序的线程池
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
//判断事件类型
switch (cmd.getType()) {
case REQUEST_COMMAND://请求事件
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND://响应事件
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
/**
* 请求事件处理程序
* @param ctx
* @param cmd
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
//使用默认的请求处理程序
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
//把请求封装成一个Task
Runnable run = new Runnable() {
@Override
public void run() {
//...
//调用默认的请求处理程序处理请求 DefaultRequestProcessor的processRequest方法
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
//...
}
}
//将Task任务提交到线程池中执行
pair.getObject2().submit(run);
}
}
DefaultRequestProcessor
DefaultRequestProcessor
为默认的请求处理程序,Netty服务端接收到所有请求,都会交由其处理。DefaultRequestProcessor
在线程池中执行。
NamesrvController为NettyRemotingServer注册了请求处理程序DefaultRequestProcessor,当Netty服务端接收到消息请求时,调用remotingExecutor线程池执行DefaultRequestProcessor处理程序,DefaultRequestProcessor根据消息类型来做出相应的处理。
这种事件处理程序实现方法和broker的一样。
package com.alibaba.rocketmq.namesrv.processor;
/**
* 默认请求处理程序
*/
public class DefaultRequestProcessor implements NettyRequestProcessor {
/**
* 处理请求
* @param ctx 管道
* @param request 请求消息
* @return 响应消息
* @throws RemotingCommandException
*/
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
//...
//根据请求消息类型
switch (request.getCode()) {
case RequestCode.REGISTER_BROKER://注册新Broker
return this.registerBroker(ctx, request);
case RequestCode.UNREGISTER_BROKER://移除Broker
return this.unregisterBroker(ctx, request);
//...
default:
break;
}
return null;
}
/**
* 注册新Broker
* @param ctx
* @param request
* @return
* @throws RemotingCommandException
*/
public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
//...
//通过调用RouteInfoManager的registerBroker方法来注册新的Broker
this.namesrvController.getRouteInfoManager().registerBroker(...);
//...
}
}