RocketMQ In Action - NameServer

2016/08/15

RocketMQ.NameServer

什么是NameServer?

NameServer和Zookeeper的区别?

NameServer工作原理

1. NameServer功能

1)保存Topic的路由消息

提供给Producer来查询对于Broker地址信息用于发送消息用的。Producer发送消息是会先到NameServer获取Topic的路由信息,从这些路由信息中选择一个Broker进行消息发送。

路由信息是无状态的,默认存储在内存中。也可以通过配置将其持久化到文件中,不过实际应用中很少使用。

2)维护与Broker的心跳连接

Topic的路由消息由NameServer进行维护,NameServer负责与所有的Broker进行心跳维护。

3)维护Topic的订阅列表(NameServer会维护订阅信息吗?还是由Broker自己维护?)

订阅信息是指谁订阅了Topic,当有新的消息来的时候会通知这些订阅者。消息是由Producer直接发给Broker的,NameServer不会接收到新消息通知,所以订阅列表由Broker来维护。

3)NameServer的集群部署

NameServer可以通过部署多套实例来实现高可用场景,不过多实例直接是不互相备份的。所以客户端需要把所有的NameServer服务地址都配置上。

2. 核心概念

路由表

NameServer的主要作用就是维护Topic路由消息,这些路由信息是在NameServer服务启动的时候由每个Broker主动上报的。

路由消息

NameServer维护了5个路由表:

	  //Topic列表信息,保存Topic对应的每个Broker名称,QueueData还包含了mq的读写数量
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    //Broker地址信息,保存Broker名称对应的Broker地址信息(就是一台Broker机器)
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    //Broker集群信息,保存Broker集群对应的所有Broker名称
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //Broker更新信息,保存每台Broker机器的状态信息,NameServer会定期检查该状态
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    //Broker过滤信息,Broker服务端过滤
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

这里首先要介绍下Broker的启动流程。

Broker启动后会主动向NameServer进行注册?注册了哪些事件?上报了哪些信息?

Broker的服务注册 注册参数里面包括brokerId、brokerAddr、brokerName,topicConfig(包括了Topic的信息)。

NameServer源码解析

总结

RocketMQ概览

RocketMQ主要由Producer、Concumer、NameServer、Broker四个部分组成。NameServer用于管理Broker,接受Broker的注册并与其进行心跳检测维护。Broker用于消息的存储和分发,可以启动多个Broker来同步备份消息,并将自己注册到NameServer。Producer从NameServer选择一个Broker发送消息。Consumer向NameServer订阅消息。

可以启动多个NameServer(分布在多台机器)和Broker(每台机器启动一个Master和Slave角色的Broker)。

Broker配置文件重要参数介绍:

  • fileReservedTime=48 消息在磁盘上的存储时间,自动删除超时消息
  • deleteWhen=04 什么时候删除
  • brokerRole=SYNC_MASTER broker的消息同步机制:SYNC_MASTER、ASYNC_MASTER、SLAVE。SYNC表示当Slave和Master消息同步完成之后,再返回发送成功的状态。
  • flushDiskType=ASYNC_FLUSH 刷盘策略,分为SYNC_FLUSH和ASYNC_FLUSH两种,代表同步刷盘和异步刷盘。同步刷盘在消息真正写入磁盘后再返回成功状态;异步刷盘在消息写入page_cache后就返回成功状态。
  • listenPort=10911 Broker监听的端口号。
  • storePathRootDir=/home/rocketmq/store-a 消息和配置信息的存储目录

MQAdmin是RocketMQ自带的命令行管理工具,使用mqadmin命令,可以创建、修改Topic,更新Broker配置信息等。

apache/rocketmq-externals图界面管理消息队列。

消费者 消费者分为两种类型,DefaultMQPushConsumer和DefaultMQPullConsumer。DefaultMQPushConsumer由系统主动推送消息(内部也是使用“长轮询”实现的),DefaultMQPullConsumer由消费者主动拉取消息。

Push方式,是服务端接收到消息后,就主动把消息推送给Client端。首先加大了Server端的工作量,进而影响其性能。其次,Client的处理能力各不相同,可能造成Client消息堆积等各种潜在问题。

Pull方式,是客户端循环地从Server端拉取消息,主动权在Client端。但是要处理好循环的间隔策略。

GroupName用于把多个Consumer组织到一起,RocketMQ支持两种消息模式:Clustering和Broadcasting。Clustering模式,同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分。Broadcasting模式,同一个ConsumerGroup里的每个Consumer都能消费所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。

Topic用于标识消息的类型,如果不需要消费Topic下的所有消息,可以指定消息的Tag来过滤消息。

DefaultMQPushConsumer

RocketMQ的服务端“长轮询”模式做了一些优化,并不会一直循环空转: brokerSuspendMaxTimeMillis,Broker在没有消息时的最长阻塞时间,默认15秒,有消息会立刻返回。客户端发送请求到服务端,如果服务器这是没有新的消息,则会挂起请求,定期(每5秒)check一下消息队列,有就直接返回,没有直接最长阻塞时间再返回。“长轮询”的局限性是在挂起请求时会占用资源。

PushConsumer的流量控制,每个MessageQueue都有对应一个ProcessQueue,用于保存这个MessageQueue还未处理的消息。ProcessQueue对象里有一个TreeMap和一个读写锁。TreeMap以消息的Offset作为key,以消息内容作为value。读写锁控制多个线程并发访问。有了ProcessQueue对象后,消费者在pull消息时,会优先检查ProcessQueue内是否还有未处理的消息,如果有且超过一定值(消息个数、总大小、Offset跨度)则延迟pull消息,从而达到流量控制目的。ProcessQueue还可以辅助实现顺序消费的逻辑。

DefaultMQPullConsumer

需要客户端自己保持消息消费的进度Offset。?为什么要保存Offset呢?有点忘记了。因为主动权都设计在客户端,所以客户端可以对消息进行重复消费或者跳跃消费。设计在服务端,就要服务端保存每个客户端的消费进度。

生产者 支持同步发送、异步发送、延迟发送、发送事务消息等。延迟发送可以对消息设置指定时间,broker在收到消息后不会立马处理。可以使用MessageQueueSelector指定MessageQueue队列发送消息。 RocketMQ采用两阶段提交方式实现事务消息: 1)发送方向RocketMQ发送“待确认”消息; 2)RocketMQ收到“待确认”消息持久化成功后,返回发送成功。第一阶段消息发送完成。 3)发送方开始执行本地事件逻辑。 4)事件执行完后,再想RocketMQ发送二次确认(Commit或Rollback),RocketMQ收到Commit将第一阶段消息标记可投递,发送给订阅方;收到rollback则删除第一阶段消息,结束。

两阶段提交可能出现的问题: 1)发送二次确认时,RocketMQ未收到或者超时。RocketMQ会在经过固定时间段后主动发送回查请求。发送方收到回查请求后,根据执行结果发送二次确认消息。 2)因为RocketMQ是顺序存消息的,二次确认需要update消息状态,所以需要查找之前的消息。这样会造成磁盘Catch的脏页过多,降低系统的性能。 RocketMQ事务消息回查设计方案[https://blog.csdn.net/qq_27529917/article/details/79802406]

存储队列位置信息 RocketMQ中,一种类型的消息会放到一个Topic里,为了能够并行,一般一个Topic会有多个MessageQueue,Offset是指某个Topic下的一条消息在某个MessageQueue里的位置。通过Offset可以定位到这条信息,或者从这条消息向后继续处理。MessageQueue其实只是一个对象,保存了topic名称、broker名称。

RouteInfoManager类用于保存各角色的状态信息,创建Topic命令会直接发往对应的Broker。Broker创建成功后,会向NameServer发送注册信息,NameSever更新后其他客户端才能发现新增的Topic。为什么不用Zookeeper,保持轻量级是首要目标。NameServer总共才8个类文件,非常好维护。

Name Server

Name Server 主要负责管理集群的路由信息,包括Topic队列信息和Broker地址信息。客户端可以通过Name Server获取当前topic信息,通过topic获取broker信息,通过broker获取broker地址信息等等。
NameServer本身是无状态的,不会持久化Broker、Topic等状态信息,都是由个角色定时上报并存储到内中的。

NamesrvStartup为NameServer的启动类,NamesrvStartup通过加装默认的配置文件,实例化NamesrvController控制器类来执行Name Server操作。NamesrvController根据配置文件创建Netty服务端ServerBootstrap用于监听客户端的请求,Netty服务端调用DefaultRequestProcessor处理程序来处理客户端的请求,DefaultRequestProcessor根据消息类型做出相应的操作更新RouteInfoManager路由信息。

Name Server 类图

NamesrvStartup

Name Server的启动,首先通过调用NamesrvStartup类的main方法进行执行。该类首先加载系统默认配置文件,NamesrvConfig和NettyServerConfig,顾名思义NamesrcConfig就是Name Server相关的配置信息,例如rocketmq的home目录等,NettyServerConfig就是启动Netty服务端时的相关配置信息,例如监听端口、工作线程池数、服务端发送接收缓存池的大小等。RocketMQ是使用Netty作为底层RPC通信框架的,所以Name Server实际是启动了一个Netty服务端ServerBootstrap来监听客户端的请求,同时通过向Netty注册监听处理程序来处理客户端的请求。

package com.alibaba.rocketmq.namesrv;

public class NamesrvStartup {
  
  /**
   * 程序或命令行启动,调用的入口
   * @param args
   */
  public static void main(String[] args) {
    main0(args);
  }

  public static NamesrcController main0(String[] args) {
    //...
    
    //创建NamesrvConfig配置文件
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //创建NettyServerConfig配置文件
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(9876);//设置Netty服务端监听端口,默认9876
    
    //创建NamesrvController,传入配置文件构造controller
    final NamesrvController controller = new NamesrvController(namesrcConfig, nettyServerConfig);
    controller.initialize();//初始化controller
    
    //...
  }
  
}

NameServer启动:

  1. 创建3个线程,一个工作线程池,两个定时器线程。定时器线程一个用于扫描失效的broker,另一个用于打印配置信息。
  2. 启动通信服务端remoteServer,用于监听接收broker、client等发送的请求。

NameServer的核心业务处理逻辑就是在接收到请求后,对请求的处理。逻辑主题是个switch语句,根据RequestCode调用不同函数处理。

NamesrvController

NamesrvController是NamesrvStartup的控制器类,它创建了用于接收客户端请求的Netty服务端,并向Netty注册了请求处理程序,Netty服务端接收到客户端的请求后,调用请求处理程序来处理客户端的请求。

NamesrcStartup创建了NamesrvConfig和NettyServerConfig配置文件后,通过这两个配置文件实例化NamesrvController控制器类,然后调用NamesrvController.initialize方法进行初始化。

NamesrvController在创建时,实例化了RouteInfoManager和BrokerHouseKeepingService两个对象。Name Server中最重要的就是RouteInfoManager类。Name Server所有的Topic和Borker信息都保存在RouteInfoManager中,RouteInfoManager保存所有的路由信息。BrokerHouseKeepingService用于处理Broker状态事件,当Broker失效、异常或者关闭,则将Broker从RouteInfoManager中移除。

NamesrvController初始化时,根据NettyServerConfig创建了Netty服务端,并向Netty服务端注册了请求处理程序,同时还创建了一个用于处理请求的线程池remotingExecutor。Netty服务端接收到客户端的请求后,将请求封装成一个Task任务,然后把该任务提交到线程池remotingExecutor进行处理。

package com.alibaba.rocketmq.namesrv;

public class NamesrvController {
  //Name Server 配置文件
  private final NamesrvCofnig namesrvConfig;
  //Netty 服务端配置文件
  private final NettyServerConfig nettyServerConfig;
  //Netty 服务端
  private RemotingServer remotingServer;
  //Broker状态监听处理程序
  private BrokerHouseKeepingServer brokerHouseKeepingService;
  //线程池 用于处理消息请求DefaultRequestProcessor
  private ExecutorService remotingService;
  
  //...
  
  /**
     * 实例化NamesrvController
     * @param namesrvConfig Name Server 配置文件
     * @param nettyServerConfig Netty 服务端配置文件
     */
  public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
    this.namesrvConfig = namesrvConfig;
    this.nettyServerConfig = nettyServerConfig;
    //...
    //保存所有的路由信息
    this.routeInfoManager = new RouteInfoManager();
    //Broker状态事件处理程序,处理Broker的异常、关闭等状态
    this.brokerHousekeepingService = new BrokerHousekeepingService(this);
  }
  
  /**
   * 初始化
   * @return
   */
  public boolean initialize() {
    //...
    
    //创建Netty服务端,nettyServerConfig为Netty服务端相关配置文件,例如前面在NamesrvStartup中配置了监听端口9876
    //brokerHouseKeepingServer broker状态处理程序
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    //创建一个固定大小的线程池,根据nettyServerConfig配置文件提供的默认工作线程数,默认值为8
    this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    //注册请求处理程序,当Netty服务端接收到客户端的请求时,会调用该处理程序
    this.registerProcessor();
    
    //...
  }

  /**
  	* 给Netty服务端注册请求处理程序,使用线程池来处理请求
    */
  private void registerProcessor() {
  	//...

  	this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingServer);
  }
}

RouteInfoManager

RouteInfoManager保存所有的路由信息,包括topic和broker信息。Netty服务端接收到请求后,回调请求处理程序DefaultRequestProcessor,defaultRequestProcessor根据请求类型RequestCode,例如注册Broker或者新建Topic请求,来更新RouteInfoManager路由信息。

package com.alibaba.rocketmq.namesrv.routeinfo;

public class RouteInfoManager {

    // Broker上报注册时会更新一下5个集合

    // 使用读写锁控制broker注册并发
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

	  //Topic列表信息,保存Topic对应的每个Broker名称,QueueData还包含了mq的读写数量
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    //Broker地址信息,保存Broker名称对应的Broker地址信息(就是一台Broker机器)
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    //Broker集群信息,保存Broker集群对应的所有Broker名称
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //Broker更新信息,保存每台Broker机器的状态信息,NameServer会定期检查该状态
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    //Broker过滤信息,Broker服务端过滤
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
    //...
    
    /**
      * 注册新的Broker,将新的Broker添加到brokderAddrTable等信息中
      */
    public RegisterBrokerResult registerBroker(...) {
      //...
      //更新Broker集群信息
      this.clusterAddrTable.put(clusterName, brokerNames);
      //更新Broker地址信息
      this.brokerAddrTable.put(brokerName, brokerData);
      //更新Broker状态信息
      this.brokerLiveTable.put(...);
      //更新Broker过滤信息
      this.filterServerTable.put(brokerAddr, filterServerList);
      // 更新Topic信息
      this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
      //...
    }
}

BrokerHouseKeepingService

BrokerHouseKeepingService专门处理broker是否存活。如果broker失效、异常或者关闭,则将broker从RouteInfoManager路由信息中移除,同时将与该broker相关的topic信息也一起删除。Netty服务端专门启动了一个线程用于监听管道的失效、异常或者关闭等的事件队列,当事件队列里面有新事件时,则取出事件并判断事件的类型,然后调用BrokerHouseKeepingService对应的方法来处理该事件。

package com.alibaba.rocketmq.remoting.netty;

/**
 * Netty服务端公共抽象类
 */
public abstract class NettyRemotingAbstract {

  //...

  /**
   * 新建线程监听事件队列
   */
  class NettyEventExecuter extends ServiceThread {
  
    //事件队列,这里使用LinkedBlockingQueue队列,基于链表的阻塞队列,是线程安全的队列。新事件自动加入到队尾,
    private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
    //...
    
    /**
     * 添加新事件
     * @param event
     */
    public void putNettyEvent(final NettyEvent event) {
      this.eventQueue.add(event);
    }
    
    @Override
    public void run() {
      //请求事件处理程序 BrokerHouseKeepingService
      final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
      
      while(!this.isStoped()) {
        //每3秒访问一次事件队列
        NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
        if (event != null && listener != null) {
          //判断事件类型
          switch (event.getType()) {
          case IDLE://失效
            //调用 BrokerHouseKeepingService 的 onChannelIdle
            listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
            break;
          case CLOSE://关闭
            //调用 BrokerHouseKeepingService 的 onChannelClose
            listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
            break;
          case CONNECT://连接
            //调用 BrokerHouseKeepingService 的 onChannelConnect
            listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
            break;
          case EXCEPTION://异常
            //调用 BrokerHouseKeepingService 的 onChannelException
            listener.onChannelException(event.getRemoteAddr(), event.getChannel());
            break;
          default:
              break;
          }
        }
      }
    }
  }
  
}
  package com.alibaba.rocketmq.namesrv.routeinfo;

  /**
   * Broker状态事件处理程序,处理Broker状态的服务,实现了ChannelEventListener接口。
   * 当Netty有新事件时,将调用ChannelEventListener接口处理事件。
   */
  public class BrokerHouseKeepingService implements ChannelEventListener {
  
    //...
    
    @Override
    public void onChannelClose(String remoteAddr, Channel channel) {
      //管道关闭时,将broker从RouteInfoManager中移除
      this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
    
    @Override
    public void onChannelException(String remoteAddr, Channel channel) {
      //管道异常时,将broker从RouteInfoManager中移除
      this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }

    @Override
    public void onChannelIdle(String remoteAddr, Channel channel) {
      //管道失效时,将broker从RouteInfoManager中移除
      this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
    
    //...
  }
package com.alibaba.rocketmq.namesrv.routeinfo;

public class RouteInfoManager {

	//...
    
    /**
     * 销毁管道
     * @param remoteAddr 客户端地址信息
     * @param channel 客户端管道
     */
    public void onChannelDestroy(String remoteAddr, Channel channel) {
    
      //1.根据channel客户端管道找到brokerAddr对应的broker地址信息
      //2.根据brokerAddr地址将broker从brokerLiveTable和filterServerTable移除
      this.brokerLiveTable.remove(brokerAddrFound);
      this.filterServerTable.remove(brokerAddrFound);
      //3.根据brokerAddr遍历brokerAddrTable,将broker从brokerAddrTable中移除
      this.brokerAddrTable.remove();
      //4.根据brokerAddr遍历clusterAddrTable,将broker从clusterAddrTable中移除
      this.clusterAddrTable.remove();
      //4.根据brokerAddr遍历topicQueueTable,将broker从topicQueueTable中移除
      this.topicQueueTable.remove();
      
    }
}

NettyEventExecuter线程会每隔3秒定时轮询事件队列eventQueue,当有新事件时,则从队列中取出事件,判断事件类型然后调用BrokerHouseKeepingService处理事件。NettyEventExecuter还提供了putNettyEvent方法用于添加事件到队列中。

NettyRemotingServer

NettyRemotingServer就是Netty服务端,用于监听客户端的请求,然后更新RouteInfoManager路由表信息。
NamesrvController创建了Netty服务端NettyRemotingServer,根据NamesrvStartup对象提供的nettyServerConfig配置文件,以及将BrokerHouseKeepingService处理程序传入NettyRemotingServer的构造函数中,然后调用NettyRemotingServer的start方法,启动Netty。
Netty启动后,开始监听客户端的请求,当有新的请求到来时,将该请求封装成一个Task任务,然后调用线程池处理该Task。

package com.alibaba.rocketmq.remoting.netty;

/**
 * Netty 服务端
 */
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
  
  /**
   * 通过Netty配置文件和管道事件监听器实例化Netty服务端
   * @param nettyServerConfig Netty配置文件
   * @param channelEventListener 管道事件监听器 BrokerHouseKeepingService实现了该接口
   */
  public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
    //...
    //Netty 服务端启动辅助类
    this.serverBootstrap = new ServerBootstrap();
    //Netty 配置文件
    this.nettyServerConfig = nettyServerConfig;
    //管道事件监听器 BrokerHouseKeepingService实现了该接口
    this.channelEventListener = channelEventListener;
    //...
  }
  
  @Override
  public void start() {
    //正常的启动Netty
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
      .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))//监听配置文件设置的9876端口
      .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
          ch.pipeline().addLast(
            new NettyEncoder(),//自定义Netty编码类
            new NettyDecoder(),//自定义Netty解码类
            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//心跳空闲检查时间
            new NettyConnetManageHandler(),//处理管道连接、端口、异常等事件
            new NettyServerHandler());//处理管道消息事件
        }
      });
      
    this.nettyEventExecutor.start();//启动一个线程,监听事件队列。就是上面提到的管道断开、异常或关闭时的事件队列
  }
  
}

Netty启动时,向pipeline管道添加了两个自定义的ChannelHandler,一个是NettyConnnetManageHandler,一个是NettyServerHandler

上文有提到Netty专门启动了一个线程NettyEventExecuter,用于监听(管道断开、异常或关闭时触发的事件)事件队列的。NettyEventExecuter线程还提供了一个putNettyEvent方法用于添加事件到队列中。那么这个方法由谁来调用呢?就是NettyConnetManageHanlder处理程序来调用的。

NettyConnetManageHandler继承于ChannelDuplexHandler,并且实现了channelInactive、userEventTriggered、exceptionCaught等方法。当有管道失效时,会自动触发channelInactive方法,然后在这个方法里面调用nettyEventExecuter.putNettyEvent(event)方法,将事件添加到事件队列中。

package com.alibaba.rocketmq.remoting.netty;

/**
 * Netty 服务端
 */
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
  //...
  
  class NettyConnetManageHandler extends ChannelDuplexHandler {
    //...
    
   @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      //获取客户端远程地址
      final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
      //创建一个NettyEvent事件,标记该事件类型为CLOSE,然后将该事件添加到nettyEventExecuter的eventQueue队列中
      NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
  }
}

NettyServerHandler用于处理请求信息,继承于SimpleChannelInboundHandler,并且实现了channelRead0方法。

package com.alibaba.rocketmq.remoting.netty;

/**
 * Netty 服务端
 */
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
  //...
  
  class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
      //调用NettyRemoteAbstract的processMessageReceived方法来处理请求
      processMessageReceived(ctx, msg);
    }
}

NettyRemotingAbstract根据请求类型,调用相应的处理程序。NamesrvController在创建NettyRemotingServer时,向NettyRemotingServer注册了默认的处理程序,并且传入了线程池用于处理该程序。NettyRemotingAbstract将该请求封装成一个Task,然后使用该线程池执行该Task。

package com.alibaba.rocketmq.remoting.netty;

/**
 * Netty服务端公共抽象类
 * @author shijia.wxr
 */
public abstract class NettyRemotingAbstract {
  //NamesrvController注册的默认处理程序,以及用于执行该处理程序的线程池
  protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
  
  public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    //判断事件类型
    switch (cmd.getType()) {
      case REQUEST_COMMAND://请求事件
          processRequestCommand(ctx, cmd);
          break;
      case RESPONSE_COMMAND://响应事件
          processResponseCommand(ctx, cmd);
          break;
      default:
          break;
    }
  }
  
  /**
   * 请求事件处理程序
   * @param ctx
   * @param cmd
   */
  public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    //使用默认的请求处理程序
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    
    //把请求封装成一个Task
    Runnable run = new Runnable() {
      @Override
      public void run() {
        //...
        
        //调用默认的请求处理程序处理请求 DefaultRequestProcessor的processRequest方法
        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
        
        //...
      }
    }
    //将Task任务提交到线程池中执行
    pair.getObject2().submit(run);
  }
}

DefaultRequestProcessor

DefaultRequestProcessor为默认的请求处理程序,Netty服务端接收到所有请求,都会交由其处理。DefaultRequestProcessor在线程池中执行。

NamesrvController为NettyRemotingServer注册了请求处理程序DefaultRequestProcessor,当Netty服务端接收到消息请求时,调用remotingExecutor线程池执行DefaultRequestProcessor处理程序,DefaultRequestProcessor根据消息类型来做出相应的处理。

这种事件处理程序实现方法和broker的一样。

package com.alibaba.rocketmq.namesrv.processor;

/**
  * 默认请求处理程序
  */
public class DefaultRequestProcessor implements NettyRequestProcessor {
  
  /**
   * 处理请求
   * @param ctx 管道
   * @param request 请求消息
   * @return 响应消息
   * @throws RemotingCommandException
   */
  @Override
  public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    //...
    //根据请求消息类型
    switch (request.getCode()) {
    case RequestCode.REGISTER_BROKER://注册新Broker
      return this.registerBroker(ctx, request);
    case RequestCode.UNREGISTER_BROKER://移除Broker
      return this.unregisterBroker(ctx, request);
    //...
    default:
      break;
    }
    return null;
  }
  
  /**
   * 注册新Broker
   * @param ctx
   * @param request
   * @return
   * @throws RemotingCommandException
   */
  public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    //...
    //通过调用RouteInfoManager的registerBroker方法来注册新的Broker
    this.namesrvController.getRouteInfoManager().registerBroker(...);
    //...
  }
}

Post Directory