RocketMQ In Action - Broker

2016/08/26

RocketMQ.Broker

Broker 负责消息的存储和转发。将消息存储到文件中,根据消费者请求的消息类型,从文件中获取消息然后发送给消费者。
Broker是RocketMQ的核心,大部分“重量级”工作都是由Broker完成的,包括接收Producer发过来的消息、处理Consumer的消费消息请求、消息的持久化存储、消息的HA机制以及服务端过滤功能等。

Broker分为Master和Slave,Master负责读和写,Slave只能读。所有Producer只能和Master连接写消息;Consumer可以和Master或者Slave连接读取消息(Consumer会自动选择从哪个角色读)。

如何达到发送端的高可用性?在创建Topic的时候,把Topic的多个MessageQueue创建在多个Broker组上,这样当一个Broker组的Master不可用后,其他组仍然可用。RocketMQ目前不支持把Slave自动转成Master。需要手动更改配置和停止来实现。

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave。同步复制是等Master和Slave均写成功才返回成功;异步复制只要Master写成功就返回成功。异步复制系统有较低的延迟和较高的吞吐量,但是如果Master出了故障,数据还没写入Slave,数据可能会丢失;同步复制,如果Master出了故障,Slave上有全部数据备份,容易恢复,但是同步复制会增加数据写入延迟,减低系统吞吐量。

一般情况下是把Master和Slave配置成ASYNC_FLUSH的刷盘方式,主从之间配置成功SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢。

服务端通过命令启动Broker,调用BrokerStartupmain方法。BrokerStartup加载默认的配置文件,BrokerConfig、NettyServerConfig、NettyClientConfig和MessageStoreConfig。

  • BrokerConfig 保存Broker的基本信息以及NameServer地址等信息
  • NettyServerConfig 用于启动Broker的服务端实例监听客户端的请求
  • NettyClientConfig 用于启动Netty服务端来监听NameServer地址变化服务
  • MessageStoreConfig 用于创建默认的DefaultMessageStore用于存储消息的文件的相关配置信息

什么是Broker

Broker的功能

Broker主要负责消息的存储和收发,主要功能有:

  1. 向NameServer注册信息,与NameServer保持心跳连接;
  2. 接收Producer发送来的消息,对消息进行存储;
  3. 将消息发送到订阅的Consumer(Broker并不会主动推送消息,只负责消息存储。Consumer向Broker发送消费请求,Broker读取消息将消息发送给消费者,消息的消费进度也由Consumer来维护。)。

服务注册

服务注册,由Broker与NameServer保持心跳连接实现的。

消息接收

消息存储

RocketMQ的消息是存储在磁盘文件里的,接收到Producer发来的消息后,有两种刷盘策略:

  1. 同步刷盘:消息写入到文件后,才返回消息写入成功。
  2. 异步刷盘:消息写入到内存后,就返回消息写入成功。

消息发送

要先从Consumer的启动来说起,Consumer启动流程,是否有向对应的Broker订阅消息监听?

消息文件

源码分析

总结

Broker and MessageQueue

先从topic的创建说起,topic创建有两种方式,第一种直接使用命令在指定的broker上创建topic,称为手动创建;第二种是在producer发送消息时指定topic,称为自动创建。

自动创建:producer发送topic前,先去nameserver查找topic对应的broker路由信息;如果没有找到,producer会使用默认TBW102的topic去nameserver查找broker路由信息,如果有则将TBW102的broker路由信息返回。producer从broker路由信息中选择一条messageQeueue发现消息。broker接收到消息后,判断topic是否存在,不存在则调用createTopicSendMessageMethod方法创建topic信息。同时将topic路由信息同步到nameserver。

每个Topic在Broker上创建会被划分为多个consumerQueue,分别保存topic消息的索引信息。consumerQueue与messageQueue一对一关系。

BrokerController

通过这四个配置文件创建了BrokerController辅助类,BrokerController启动了一个NettyRemotingServer服务端用于监听客户端的消息请求,启动了一个NettyRemotingClient客户端用于向Name Server注册所有的Broker,以及创建了一个MessageStore用于持久化消息。

package com.alibaba.rocketmq.broker;

public class BrokerController {
	...

	public boolean initizlize() {
		...
		//创建消息持久化服务
		this.messageStore = new DefaultMessageStore(this.meessageStoreConfig, ...);
		//创建了Netty服务端监听消息请求
		this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, ...);
		// 注册事件处理程序
		this.registerProcessor();
	}

	public void start() {
		...
		//开启消息持久化服务
		this.messageStore.start();
		//开启消息请求监听
		this.remotingServer.start();

		this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

			@Override
			public void run() {
				// 服务注册 定时向Name Server注册所有的Broker
				BrokerController.this.registerBrokerAll(true, false);
			}
		});
	}
}

register processor

实例this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMEssageExecutor);,方法NettyRemotingServer.registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor); 方法参数说明:requestCode消息类型,消息处理程序(NettyRequestProcessor接口实现类),消息处理线程。registerProcessor方法使用processorTable一个hashmap保存注册逻辑。将requestcode作为key,processor和executor作为vlue保存起来。请求到来时,根据requestcode从map中获取处理程序,使用对应线程进行处理。

小知识点:processor和executor的映射通过自定义类Pair<T1, T2>来实现。这种实现方式对于三类对应关系很常用,比如A/B/C三类关联对象,A作为key,将B和C封装成一个对象作为value,这样就可以通过一个map来保存三个映射关系。不需要再额外一个map来保存B和C的关系。

注册了七个处理程序

  • SendMessageProcessor
  • PullMessageProcessor
  • QueryMessageProcessor
  • ClientManageProcessor
  • ConsumerManageProcessor
  • EndTransactionProcessor
  • Default

所有的processor都继承自NettyRequestProcessor接口,实现processRequest方法。

	/**
	 * ConsumerManageProcessor
	 */
	ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
	// Consumer doBalance的时候会发送该请求
	this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);

GET_CONSUMER_LIST_BY_GROUP获取同一个Group下的Consumer列表。

ConsumerGroupInfo

Broker端保存的consumer信息,包括consumer列表channelInfoTable和subscription订阅列表subscriptionTable。consumer列表根据channel进行绑定,sub根据topic进行绑定。sub的update逻辑是直接put(topic, sub),覆盖旧的订阅逻辑。

问题: 1)Broker会去校验consumer订阅的topic是否存在吗? 答:不会,只是保存在sub集合里,并不校验。

Pull Message

根据topic进行消息拉取,

问题: 1)更新消费进度的时机?

update offset

更新消息进度

Receive Message

NettyRemotingServer的NettyServerHandler处理器接收到新消息请求,新消息的请求类型为SEND_MESSAGE,调用NettyRemotingAbstract的processMessageReceived方法处理消息。NettyRemotingAbstract将新消息请求封装成Task任务,将该任务提交到线程池sendMessageExecutor中处理。线程池调用SendMessageProcessor的processRequest方法处理任务。SendMessageProcess将消息放到DefaultMessageStore中。DefaultMessageStore将消息持久化到文件,并将消息分发给DispatchMessageService。DispatchMessageService将消息放入ConsumeQueue队列中。

package com.alibaba.rocketmq.broker.processor;

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
	...

	@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {

		switch (request.getCode()) {
		case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default:
        	SendMessageRequestHeader requestHeader = parseRequestHeader(request);
        	if (requestHeader == null) {
                return null;
            }
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
            //调用sendMessage处理消息
            final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
            this.executeSendMessageHookAfter(response, mqtraceContext);
            return response;
		}
	}

	private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
            final RemotingCommand request,
            final SendMessageContext mqtraceContext,
            final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
        ...
        //将消息放入DefaultMessageStore,持久化消息
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

        ...
    }
}
package com.alibaba.rocketmq.store;

public class DefaultMessageStore implements MessageStore {
	...

	public PutMessageResult putMessage(MessageExtBrokerInner msg) {
		...

		//将消息持久化到CommitLOg
		PutMessageResult result = this.commitLog.putMessage(msg);

		...
	}
}
package com.alibaba.rocketmq.store;

public class CommitLog {
	...

	public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
		...
		//将消息写入Commit Log
		MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
		...
		//并返回写入结果,包括消息在CommitLog中的偏移量位置offset、消息总字节数bytes、消息MessageId
		AppendMessageResult result = mapedFile.appendMessage(msg, this.appendMessageCallback);
		...

		GroupCommitRequest request = null;

		// Synchronization flush 默认是异步刷盘,将消息从内存中写入文件
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {

        	GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        	request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
        	service.putRequest(request);

        	boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

        	...
        }
	}
}

Store Message

SendMessageProcess将消息放到DefaultMessageStore中。DefaultMessageStore调用CommitLog来存储消息。CommitLog将消息写入MapedFileMapedFile调用默认的添加消息回调方法,将消息添加到缓存BygeBuff中。最后返回结果PUT_OK状态码给生产者。

package com.alibaba.rocketmq.broker;

public class BrokerController {
	...

	public boolean initialize() {
		boolean result = true;

		this.messageStore = new DefaultMessageStore(this.messageStoreConfig, ...);
		//加载消息文件
		result = result && this.messageStore.load();

		return result;
	}

}
package com.alibaba.rocketmq.store;

public class DefaultMessageStore implements MessageStore {
	...

	public boolean load() {
		boolean result = true;
		...
		//加载CommitLog,CommitLog保存所有消息
		result = result && this.commitLog.load();
		//加载ConsumeQueue,ConsumeQueue保存消息在CommitLog文件中的位置
		result = result && this.loadConsumeQueue();

		return result;
	}

	private boolean loadConsumeQueue() {
		File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
		File[] fileTopicList = dirLogic.listFiles();
			for(File fileTopic : fileTopicList) {
				//ConsumeQueue是根据Topic来分组的,一个Topic保存为一个文件夹
				//Topic文件下保存这它所属的消息位置
				String topic = fileTopic.getName();
				//遍历该Topic下的所有QueueId
				File[] fileQueueIdList = fileTopic.listFiles();
				for(File fileQueueId : fileQueueList) {
					int queueId = Integer.parseInt(fileQueueId.getName());
					//创建ConsumeQueue
					ConsumeQueue logic = new ConsumeQueue(
						topic,
						queueId,
						...);
					this.putConsumeQueue(topic, queueId, logic);
				}
			}
		}
		return true;
	}
}
package com.alibaba.rocketmq.store;

public class CommitLog {
	...

	public boolean load() {
		//将文件加载到mappedFileQueue队列中
		boolean result =this.mappedFileQueue.load();
		return result;
	}
}
package com.alibaba.rocketmq.store;

public class MappedFileQeueue {
	...

	public boolean load() {
		File dir = new File(this.storePath);
		//列出CommitLog下的所有文件
		File[] files = dir.listFiles();
		if(files != null) {
			Arrays.sort(files);
			for(File file : files) {
				//加载文件
				MapedFile mapedFile = new MapedFile(file.getPath);

				mapedFile.setWrotePosition(this.mapedFileSize);
				mapedFile.setCommittedPosition(this.mappedFileSize);
				this.mapedFiles.add(mapedFile);
			}
		}

		return true;
	}
}
package com.alibaba.rocketmq.store;

public class MappedFile extends ReferencesResource {
	...

	public MapedFile(final String fileName, final int fileSize) {
		this.fileName = fileName;
		this.fileSize = fileSize;
		this.file = new File(fileName);
		...
		this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
		//直接将文件拷贝到一份虚拟内存中
		this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
		TotalMapedVitualMemory.addAndGet(fileSize);
		TotalMapedFiles.incrementAndGet();
		...
	}
}

Send Message

消费者发生PULL_MESSAGE类型的消息请求,Broker接收到该类型的请求时,调用PullMessageProcessor处理程序来处理该请求。PullMessageProcessor首先对该请求做一系列的校验,校验通过后从DefaultMessageStore中获取消息。DefaultMessageStore根据请求的Topic和QueueId从ConsumeQueue队列中获取该消息,将消息添加到GetMessageResult缓存中。最后将GetMessageResult封装到RemotingCommand中返回给消费者。

Master/Slave

Master负责接收消息,然后把内容同步到Broker。但Master宕机后,Slave提供服务。Broker启动时判断是不是slave,如果是就启动定时器,同步数据。

HAService实现commitLog的内容同步。

Post Directory