RocketMQ.Store
Store,RocketMQ的存储系统,将消息持久化到文件系统中。使用CommitLog保存消息载体,使用ConsumeQueue保存消息在CommitLog的位置Offset。客户端根据ConsumeQueue获取到位置Offset后,再根据位置Offset到CommitLog中查找消息。
每个Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件。每台Broker上的CommitLog被本机器所有ConsumeQueue共享。在CommitLog中,一个消息的存储长度是不固定的。
消息写入RocketMQ,有两种写磁盘方式:
- 异步刷盘:消息可能只是写入了内存的PageCache,写操作的返回快,吞吐量大;当内存消息量积累到一定程度时,统一出发写磁盘动作。
- 同步刷盘:消息写入磁盘才返回成功。消息写入内存PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,返回成功。
DefaultMessageStore
DefaultMessageStore
系统默认的消息存储服务,将接收到的消息持久化到CommitLog文件中,然后将消息在CommitLog文件中的偏移量Offset保存到ConsumeQueue队列中。默认map 1G的虚拟内存。
package com.alibaba.rocketmq.store;
public class CommitLog {
...
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
//设置消息存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result = null;
String topic = msg.getTopic();
int queueId = msg.getQueueId();
...
//获取文件队列中最后一个文件,将新消息添加到队尾。
//这里使用同步锁,获取到的是最后一个文件。
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
//使用同步锁添加消息
synchronized(this) {
if(null == mapedFile || mapedFile.isFull()) {
//如果MapedFile已满则在文件队列新建一个MapedFile,重新获取最后一个文件。
mapedFile = this.mapedFileQueue.getLastMapedFile();
}
if(null == mapedFile) {
//如果新建mapedFile失败,则直接返回错误
return new PutMessageResult(PutMessageResult.CREATE_MAPEDFILE_FAILED, null);
}
//将消息添加到文件中,这里消息还只是暂存在虚拟缓存中,只有调用了刷盘操作,消息才真正写入磁盘中
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
...
//将消息在Commit Log文件中的位置,大小等信息写入Consume Queue
DispatchRequest dispatchRequest = new DispatchRequest(//
topic,// 1 主题
queueId,// 2 队列ID
result.getWroteOffset(),// 3 写入位置
result.getWroteBytes(),// 4 字节数
tagsCode,// 5 Tag 标识
msg.getStoreTimestamp(),// 6 存储时间
result.getLogicsOffset(),// 7 逻辑位置
msg.getKeys(),// 8 消息KEY
/...);
this.defaultMessageStore.putDispatchRequest(dispatchRequest);
...
}// 结束同步锁
//构造消息添加成功返回结果
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
//消息成功添加到文件中后,调用刷盘操作,将消息从内存写入磁盘文件中。
GroupCommitRequest request = null;
// 默认是异步刷屏操作
if(FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷屏操作
} else {
// 异步刷盘操作
this.flushCommitLogService.wakeup();
}
return putMessageResult;
}
abstract class FlushCommitLogService extends ServiceThread {
}
class FlushRealTimeService extends FlushCommitLogService {
...
public void run() {
while(!this.isStoped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//定时刷盘时间间隔
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
...
if(flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
//写入文件
CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
...
}
}
}
}
package com.alibaba.rocketmq.store;
public class MapedFileQueue {
...
public boolean commit(final int flushLeastPages) {
boolean result = true;
MapedFile mapedFile = this.findMapedFileByOffset(this.committedWhere, true);
if(mapedFile != null) {
...
//写入文件
mapedFile.commit(flushLeastPages);
...
}
return result;
}
}
package com.alibaba.rocketmq.store;
public class MapedFile extends ReferenceResource {
...
public int commit(final int flushLeastPages) {
...
//将ByteBuffer写入磁盘
this.mappedByteBuffer.force();
...
return this.getCommittedPosition();
}
}
CommitLog
CommitLog文件保存了消息的所有信息,包括消息内容和消息元数据。CommitLog首先将消息文件映射到虚拟内存,通过直接操作虚拟内存提高读写速度,然后更过定期刷盘操作,将虚拟内存的数据写入磁盘文件,持久化消息数据。
ConsumeQueue
ConsumeQueue保存了消息在CommitLog中的位置信息。客户端读取消息首先通过ConsumeQueue获取消息的位置信息,然后通过位置信息从CommitLog中读取消息。
ConsumeQueue按Topic进行分类存储。
package com.alibaba.rocketmq.store;
public class DefaultMessageStore implements MessageStore {
...
/**
* 分发消息服务,将消息位置写入Consume Queue
*/
class DispathMessageService extends ServiceThread {
private volatile List<DispatchRequest> requestsWrite;
private volatile List<DispatchRequest> requestsRead;
...
public void putRequest(fianl DispatchRequest dispatchRequest) {
...
synchronized(this) {
//加入写请求队列
this.requestsWrite.add(dispatchRequest);
...
}
...
}
private void swapRequests() {
//交换读写请求队列,写请求队列负责添加新的请求,读请求队列负责将处理请求
List<DispatchRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doDispatch() {
if(!this.requestsRead.isEmpty()) {
//处理可读队列的请求
for(DispatchRequest req : this.requestsRead) {
...
//将消息添加到ConsumeQueue
DefaultMessageStore.this.putMessagePositionInfo(
req.getTopic(), req.getQueueId, ...);
...
}
...
this.requestsRead.clear();
}
}
public void run() {
while(!this.isStoped()) {
this.waitForRunning(0);
this.doDispatch();//轮询执行分发请求,将消息写入ConsumeQueue
}
}
}
/*
* 将消息写入ConsuemeQueue文件
*
*/
public void putMessagePositionInfo(String topic, int queueId, long offset, int size, ...) {
//1.查找该Topic的ConsumeQueue是否存在,不存在则新建
ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
//2.将ConsumeQueue写入文件
cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
}
}
package com.alibaba.rocketmq.store;
/**
* 消费队列
*/
public class ConsumeQueue {
...
public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, ...) {
final int MaxRetries = 5;//ConsumeQueue如果写入失败,则重新尝试次数
boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();//判断当前存储层是否可写
//最多尝试5次写文件
for(int i = 0; i < MaxRetries && canWrite; i++) {
//写入文件
boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
if(result) {
//写入成功
...
return;
} else {
//写入失败
try {
Thread.sleep(1000);//1秒后重试
} catch(InterruptedException e) {
log.warn("", e);
}
}
}
//如果5次都写入失败,则停止存储层对外写服务,但仍然还可以读。
this.defaultMessageStore.getRunningsFlags().makeLogicsQueueError();
}
/**
* 将ConsumeQueue写入文件,文件存储一个20字节的信息
*/
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) {
...
this.byteBufferIndex.flip();
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
//获取最后一个MapedFile
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(expectLogicOffset);
if(mapedFile != null) {
...
//写入文件
return mapedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
}
MapedFile
ReocketMQ通过直接将消息文件映射成虚拟内存,减少由于读写消息文件造成的用户空间和内核空间的切换,提高读写效率。