RocketMQ In Action - Store

2016/08/29

RocketMQ.Store

Store,RocketMQ的存储系统,将消息持久化到文件系统中。使用CommitLog保存消息载体,使用ConsumeQueue保存消息在CommitLog的位置Offset。客户端根据ConsumeQueue获取到位置Offset后,再根据位置Offset到CommitLog中查找消息。

每个Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件。每台Broker上的CommitLog被本机器所有ConsumeQueue共享。在CommitLog中,一个消息的存储长度是不固定的。

消息写入RocketMQ,有两种写磁盘方式:

  • 异步刷盘:消息可能只是写入了内存的PageCache,写操作的返回快,吞吐量大;当内存消息量积累到一定程度时,统一出发写磁盘动作。
  • 同步刷盘:消息写入磁盘才返回成功。消息写入内存PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,返回成功。

DefaultMessageStore

DefaultMessageStore系统默认的消息存储服务,将接收到的消息持久化到CommitLog文件中,然后将消息在CommitLog文件中的偏移量Offset保存到ConsumeQueue队列中。默认map 1G的虚拟内存。

package com.alibaba.rocketmq.store;

public class CommitLog {
	...

	public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
		//设置消息存储时间
		msg.setStoreTimestamp(System.currentTimeMillis());

		AppendMessageResult result = null;

		String topic = msg.getTopic();
		int queueId = msg.getQueueId();
		...
		//获取文件队列中最后一个文件,将新消息添加到队尾。
		//这里使用同步锁,获取到的是最后一个文件。
		MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
 		//使用同步锁添加消息
		synchronized(this) {
			if(null == mapedFile || mapedFile.isFull()) {
				//如果MapedFile已满则在文件队列新建一个MapedFile,重新获取最后一个文件。
				mapedFile = this.mapedFileQueue.getLastMapedFile();
			}
			if(null == mapedFile) {
				//如果新建mapedFile失败,则直接返回错误
				return new PutMessageResult(PutMessageResult.CREATE_MAPEDFILE_FAILED, null);
			}
			//将消息添加到文件中,这里消息还只是暂存在虚拟缓存中,只有调用了刷盘操作,消息才真正写入磁盘中
			result = mapedFile.appendMessage(msg, this.appendMessageCallback);
			...

			//将消息在Commit Log文件中的位置,大小等信息写入Consume Queue
			DispatchRequest dispatchRequest = new DispatchRequest(//
            	topic,// 1 主题
                queueId,// 2 队列ID
                result.getWroteOffset(),// 3 写入位置
                result.getWroteBytes(),// 4 字节数
                tagsCode,// 5 Tag 标识
                msg.getStoreTimestamp(),// 6 存储时间
                result.getLogicsOffset(),// 7 逻辑位置
                msg.getKeys(),// 8 消息KEY
                /...);

            this.defaultMessageStore.putDispatchRequest(dispatchRequest);

			...
		}// 结束同步锁

		//构造消息添加成功返回结果
		PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

		//消息成功添加到文件中后,调用刷盘操作,将消息从内存写入磁盘文件中。

		GroupCommitRequest request = null;

		// 默认是异步刷屏操作
		if(FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
			// 同步刷屏操作
		} else {
			// 异步刷盘操作
			this.flushCommitLogService.wakeup();
		}

		return putMessageResult;
	}

	abstract class FlushCommitLogService extends ServiceThread {

	}

	class FlushRealTimeService extends FlushCommitLogService {
		...

		public void run() {
			while(!this.isStoped()) {
				boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

				//定时刷盘时间间隔
				int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
				...
				if(flushCommitLogTimed) {
					Thread.sleep(interval);
				} else {
					this.waitForRunning(interval);
				}
				//写入文件
				CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
				...
			}
		}
	}
}
package com.alibaba.rocketmq.store;

public class MapedFileQueue {
	...

	public boolean commit(final int flushLeastPages) {
		boolean result = true;
		MapedFile mapedFile = this.findMapedFileByOffset(this.committedWhere, true);
		if(mapedFile != null) {
			...
			//写入文件
			mapedFile.commit(flushLeastPages);
			...
		}

		return result;
	}
}
package com.alibaba.rocketmq.store;

public class MapedFile extends ReferenceResource {
	...

	public int commit(final int flushLeastPages) {
		...
		//将ByteBuffer写入磁盘
		this.mappedByteBuffer.force();
		...

		return this.getCommittedPosition();
	}
}

CommitLog

CommitLog文件保存了消息的所有信息,包括消息内容和消息元数据。CommitLog首先将消息文件映射到虚拟内存,通过直接操作虚拟内存提高读写速度,然后更过定期刷盘操作,将虚拟内存的数据写入磁盘文件,持久化消息数据。

ConsumeQueue

ConsumeQueue保存了消息在CommitLog中的位置信息。客户端读取消息首先通过ConsumeQueue获取消息的位置信息,然后通过位置信息从CommitLog中读取消息。

ConsumeQueue按Topic进行分类存储。

package com.alibaba.rocketmq.store;

public class DefaultMessageStore implements MessageStore {
	...

	/**
     * 分发消息服务,将消息位置写入Consume Queue
     */
	class DispathMessageService extends ServiceThread {
		private volatile List<DispatchRequest> requestsWrite;
        private volatile List<DispatchRequest> requestsRead;

        ...

        public void putRequest(fianl DispatchRequest dispatchRequest) {
        	...
        	synchronized(this) {
        		//加入写请求队列
        		this.requestsWrite.add(dispatchRequest);
        		...
        	}
        	...
        }

        private void swapRequests() {
        	//交换读写请求队列,写请求队列负责添加新的请求,读请求队列负责将处理请求
        	List<DispatchRequest> tmp = this.requestsWrite;
        	this.requestsWrite = this.requestsRead;
        	this.requestsRead = tmp;
        }

        private void doDispatch() {
        	if(!this.requestsRead.isEmpty()) {
        		//处理可读队列的请求
        		for(DispatchRequest req : this.requestsRead) {
        			...
        			//将消息添加到ConsumeQueue
        			DefaultMessageStore.this.putMessagePositionInfo(
        				req.getTopic(), req.getQueueId, ...);
        			...
        		}
        		...
        		this.requestsRead.clear();
        	}
        }

        public void run() {
        	while(!this.isStoped()) {
        		this.waitForRunning(0);
        		this.doDispatch();//轮询执行分发请求,将消息写入ConsumeQueue
        	}
        }
	}

	/*
	 * 将消息写入ConsuemeQueue文件
	 *
	 */
	public void putMessagePositionInfo(String topic, int queueId, long offset, int size, ...) {
		//1.查找该Topic的ConsumeQueue是否存在,不存在则新建
		ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
		//2.将ConsumeQueue写入文件
		cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
	}
}
package com.alibaba.rocketmq.store;

/**
 * 消费队列
 */
public class ConsumeQueue {
	...

	public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, ...) {
		final int MaxRetries = 5;//ConsumeQueue如果写入失败,则重新尝试次数
		boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();//判断当前存储层是否可写
		//最多尝试5次写文件
		for(int i = 0; i < MaxRetries && canWrite; i++) {
			//写入文件
			boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
			if(result) {
				//写入成功
				...
				return;
			} else {
				//写入失败

				try {
					Thread.sleep(1000);//1秒后重试
				} catch(InterruptedException e) {
					log.warn("", e);
				}
			}
		}

		//如果5次都写入失败,则停止存储层对外写服务,但仍然还可以读。
		this.defaultMessageStore.getRunningsFlags().makeLogicsQueueError();
	}

	/**
     * 将ConsumeQueue写入文件,文件存储一个20字节的信息
     */
	private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) {
		...

		this.byteBufferIndex.flip();
		this.byteBufferIndex.putLong(offset);
		this.byteBufferIndex.putInt(size);
		this.byteBufferIndex.putLong(tagsCode);

		//获取最后一个MapedFile
		MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(expectLogicOffset);
		if(mapedFile != null) {
			...
			//写入文件
			return mapedFile.appendMessage(this.byteBufferIndex.array());
		}

		return false;
	}
}

MapedFile

ReocketMQ通过直接将消息文件映射成虚拟内存,减少由于读写消息文件造成的用户空间和内核空间的切换,提高读写效率。

Post Directory