RocketMQ.Broker
Broker 负责消息的存储和转发。将消息存储到文件中,根据消费者请求的消息类型,从文件中获取消息然后发送给消费者。
Broker是RocketMQ的核心,大部分“重量级”工作都是由Broker完成的,包括接收Producer发过来的消息、处理Consumer的消费消息请求、消息的持久化存储、消息的HA机制以及服务端过滤功能等。
Broker分为Master和Slave,Master负责读和写,Slave只能读。所有Producer只能和Master连接写消息;Consumer可以和Master或者Slave连接读取消息(Consumer会自动选择从哪个角色读)。
如何达到发送端的高可用性?在创建Topic的时候,把Topic的多个MessageQueue创建在多个Broker组上,这样当一个Broker组的Master不可用后,其他组仍然可用。RocketMQ目前不支持把Slave自动转成Master。需要手动更改配置和停止来实现。
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave。同步复制是等Master和Slave均写成功才返回成功;异步复制只要Master写成功就返回成功。异步复制系统有较低的延迟和较高的吞吐量,但是如果Master出了故障,数据还没写入Slave,数据可能会丢失;同步复制,如果Master出了故障,Slave上有全部数据备份,容易恢复,但是同步复制会增加数据写入延迟,减低系统吞吐量。
一般情况下是把Master和Slave配置成ASYNC_FLUSH的刷盘方式,主从之间配置成功SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢。
服务端通过命令启动Broker,调用BrokerStartup
的main
方法。BrokerStartup
加载默认的配置文件,BrokerConfig、NettyServerConfig、NettyClientConfig和MessageStoreConfig。
- BrokerConfig 保存Broker的基本信息以及NameServer地址等信息
- NettyServerConfig 用于启动Broker的服务端实例监听客户端的请求
- NettyClientConfig 用于启动Netty服务端来监听NameServer地址变化服务
- MessageStoreConfig 用于创建默认的DefaultMessageStore用于存储消息的文件的相关配置信息
什么是Broker
Broker的功能
Broker主要负责消息的存储和收发,主要功能有:
- 向NameServer注册信息,与NameServer保持心跳连接;
- 接收Producer发送来的消息,对消息进行存储;
- 将消息发送到订阅的Consumer(Broker并不会主动推送消息,只负责消息存储。Consumer向Broker发送消费请求,Broker读取消息将消息发送给消费者,消息的消费进度也由Consumer来维护。)。
服务注册
服务注册,由Broker与NameServer保持心跳连接实现的。
消息接收
消息存储
RocketMQ的消息是存储在磁盘文件里的,接收到Producer发来的消息后,有两种刷盘策略:
- 同步刷盘:消息写入到文件后,才返回消息写入成功。
- 异步刷盘:消息写入到内存后,就返回消息写入成功。
消息发送
要先从Consumer的启动来说起,Consumer启动流程,是否有向对应的Broker订阅消息监听?
消息文件
源码分析
总结
Broker and MessageQueue
先从topic的创建说起,topic创建有两种方式,第一种直接使用命令在指定的broker上创建topic,称为手动创建;第二种是在producer发送消息时指定topic,称为自动创建。
自动创建:producer发送topic前,先去nameserver查找topic对应的broker路由信息;如果没有找到,producer会使用默认TBW102的topic去nameserver查找broker路由信息,如果有则将TBW102的broker路由信息返回。producer从broker路由信息中选择一条messageQeueue发现消息。broker接收到消息后,判断topic是否存在,不存在则调用createTopicSendMessageMethod方法创建topic信息。同时将topic路由信息同步到nameserver。
每个Topic在Broker上创建会被划分为多个consumerQueue,分别保存topic消息的索引信息。consumerQueue与messageQueue一对一关系。
BrokerController
通过这四个配置文件创建了BrokerController辅助类,BrokerController启动了一个NettyRemotingServer服务端用于监听客户端的消息请求,启动了一个NettyRemotingClient客户端用于向Name Server注册所有的Broker,以及创建了一个MessageStore用于持久化消息。
package com.alibaba.rocketmq.broker;
public class BrokerController {
...
public boolean initizlize() {
...
//创建消息持久化服务
this.messageStore = new DefaultMessageStore(this.meessageStoreConfig, ...);
//创建了Netty服务端监听消息请求
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, ...);
// 注册事件处理程序
this.registerProcessor();
}
public void start() {
...
//开启消息持久化服务
this.messageStore.start();
//开启消息请求监听
this.remotingServer.start();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 服务注册 定时向Name Server注册所有的Broker
BrokerController.this.registerBrokerAll(true, false);
}
});
}
}
register processor
实例this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMEssageExecutor);
,方法NettyRemotingServer.registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor);
方法参数说明:requestCode消息类型,消息处理程序(NettyRequestProcessor接口实现类),消息处理线程。registerProcessor
方法使用processorTable一个hashmap保存注册逻辑。将requestcode作为key,processor和executor作为vlue保存起来。请求到来时,根据requestcode从map中获取处理程序,使用对应线程进行处理。
小知识点:processor和executor的映射通过自定义类Pair<T1, T2>
来实现。这种实现方式对于三类对应关系很常用,比如A/B/C三类关联对象,A作为key,将B和C封装成一个对象作为value,这样就可以通过一个map来保存三个映射关系。不需要再额外一个map来保存B和C的关系。
注册了七个处理程序
- SendMessageProcessor
- PullMessageProcessor
- QueryMessageProcessor
- ClientManageProcessor
- ConsumerManageProcessor
- EndTransactionProcessor
- Default
所有的processor都继承自NettyRequestProcessor接口,实现processRequest方法。
/**
* ConsumerManageProcessor
*/
ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
// Consumer doBalance的时候会发送该请求
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
GET_CONSUMER_LIST_BY_GROUP
获取同一个Group下的Consumer列表。
ConsumerGroupInfo
Broker端保存的consumer信息,包括consumer列表channelInfoTable和subscription订阅列表subscriptionTable。consumer列表根据channel进行绑定,sub根据topic进行绑定。sub的update逻辑是直接put(topic, sub),覆盖旧的订阅逻辑。
问题: 1)Broker会去校验consumer订阅的topic是否存在吗? 答:不会,只是保存在sub集合里,并不校验。
Pull Message
根据topic进行消息拉取,
问题: 1)更新消费进度的时机?
update offset
更新消息进度
Receive Message
NettyRemotingServer的NettyServerHandler处理器接收到新消息请求,新消息的请求类型为SEND_MESSAGE,调用NettyRemotingAbstract的processMessageReceived方法处理消息。NettyRemotingAbstract将新消息请求封装成Task任务,将该任务提交到线程池sendMessageExecutor中处理。线程池调用SendMessageProcessor的processRequest方法处理任务。SendMessageProcess将消息放到DefaultMessageStore中。DefaultMessageStore将消息持久化到文件,并将消息分发给DispatchMessageService。DispatchMessageService将消息放入ConsumeQueue队列中。
package com.alibaba.rocketmq.broker.processor;
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
...
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
//调用sendMessage处理消息
final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext mqtraceContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
...
//将消息放入DefaultMessageStore,持久化消息
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
...
}
}
package com.alibaba.rocketmq.store;
public class DefaultMessageStore implements MessageStore {
...
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
...
//将消息持久化到CommitLOg
PutMessageResult result = this.commitLog.putMessage(msg);
...
}
}
package com.alibaba.rocketmq.store;
public class CommitLog {
...
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
...
//将消息写入Commit Log
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
...
//并返回写入结果,包括消息在CommitLog中的偏移量位置offset、消息总字节数bytes、消息MessageId
AppendMessageResult result = mapedFile.appendMessage(msg, this.appendMessageCallback);
...
GroupCommitRequest request = null;
// Synchronization flush 默认是异步刷盘,将消息从内存中写入文件
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
...
}
}
}
Store Message
SendMessageProcess将消息放到DefaultMessageStore中。DefaultMessageStore
调用CommitLog
来存储消息。CommitLog
将消息写入MapedFile
,MapedFile
调用默认的添加消息回调方法,将消息添加到缓存BygeBuff中。最后返回结果PUT_OK状态码给生产者。
package com.alibaba.rocketmq.broker;
public class BrokerController {
...
public boolean initialize() {
boolean result = true;
this.messageStore = new DefaultMessageStore(this.messageStoreConfig, ...);
//加载消息文件
result = result && this.messageStore.load();
return result;
}
}
package com.alibaba.rocketmq.store;
public class DefaultMessageStore implements MessageStore {
...
public boolean load() {
boolean result = true;
...
//加载CommitLog,CommitLog保存所有消息
result = result && this.commitLog.load();
//加载ConsumeQueue,ConsumeQueue保存消息在CommitLog文件中的位置
result = result && this.loadConsumeQueue();
return result;
}
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
File[] fileTopicList = dirLogic.listFiles();
for(File fileTopic : fileTopicList) {
//ConsumeQueue是根据Topic来分组的,一个Topic保存为一个文件夹
//Topic文件下保存这它所属的消息位置
String topic = fileTopic.getName();
//遍历该Topic下的所有QueueId
File[] fileQueueIdList = fileTopic.listFiles();
for(File fileQueueId : fileQueueList) {
int queueId = Integer.parseInt(fileQueueId.getName());
//创建ConsumeQueue
ConsumeQueue logic = new ConsumeQueue(
topic,
queueId,
...);
this.putConsumeQueue(topic, queueId, logic);
}
}
}
return true;
}
}
package com.alibaba.rocketmq.store;
public class CommitLog {
...
public boolean load() {
//将文件加载到mappedFileQueue队列中
boolean result =this.mappedFileQueue.load();
return result;
}
}
package com.alibaba.rocketmq.store;
public class MappedFileQeueue {
...
public boolean load() {
File dir = new File(this.storePath);
//列出CommitLog下的所有文件
File[] files = dir.listFiles();
if(files != null) {
Arrays.sort(files);
for(File file : files) {
//加载文件
MapedFile mapedFile = new MapedFile(file.getPath);
mapedFile.setWrotePosition(this.mapedFileSize);
mapedFile.setCommittedPosition(this.mappedFileSize);
this.mapedFiles.add(mapedFile);
}
}
return true;
}
}
package com.alibaba.rocketmq.store;
public class MappedFile extends ReferencesResource {
...
public MapedFile(final String fileName, final int fileSize) {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
...
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//直接将文件拷贝到一份虚拟内存中
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TotalMapedVitualMemory.addAndGet(fileSize);
TotalMapedFiles.incrementAndGet();
...
}
}
Send Message
消费者发生PULL_MESSAGE类型的消息请求,Broker接收到该类型的请求时,调用PullMessageProcessor
处理程序来处理该请求。PullMessageProcessor
首先对该请求做一系列的校验,校验通过后从DefaultMessageStore
中获取消息。DefaultMessageStore
根据请求的Topic和QueueId从ConsumeQueue队列中获取该消息,将消息添加到GetMessageResult
缓存中。最后将GetMessageResult
封装到RemotingCommand
中返回给消费者。
Master/Slave
Master负责接收消息,然后把内容同步到Broker。但Master宕机后,Slave提供服务。Broker启动时判断是不是slave,如果是就启动定时器,同步数据。
HAService实现commitLog的内容同步。