RocketMQ.Consumer
消费者,消费消息。生产者将消息发送到Broker,Broker存储消息并将消息转发给消费者,消费者接收消息然后处理消息。消费者也可以订阅指定主题的消息,Broker收到该主题的消息后会主动将消息推送给已订阅的消费者。
Consumerd的功能
消息订阅
消息消费
源码解析
Consumer的启动类
Consumer启动时会向NameServer获取订阅Topic的路由信息,并与这些路由信息建立连接,维护心跳。Consumer端会一直维护这些连接,存放在channelTables中,key为broker地址,值为channel连接。
后台线程定时拉取消息
怎么拉?拉消息的动作都在PullMessageService线程里。PullMessageService轮询任务队列pullRequestQueue,一旦对里有新的PullRequest就取出来执行,最终调用DefaultMQPushConsumerImpl执行拉取操作。
PullRequest是由谁生成的?RebalanceService线程根据消费模式生成拉取请求。
总结
了解规则
Producer Group:发送同一类消息(topic)的生产者,如果某个Producer宕机,同Group下的其他producer会接手完成事务操作。
Consumer Group:接收同一类消息(topic+tag)的消费者,消费进度以Group为粒度,不同group消费进度彼此不影响;同一个Group下的Consumer订阅的topic和tag必须相同。 Topic:标识某一类消息,生产者和消费者都必须指定topic。 Tag:给topic打标签,消费者可以订阅指定tag的topic;一个Message消息只能有一个tag。Broker通过tag在ConsumeQueue中做hashcode过滤,提高查询效率。
MessageQueue:MQ,一个topic对应多个mq。生产者和消费者都是针对mq进行操作。每一条mq均对应一个文件(consumeQueue),这个文件存储了实际消息的索引信息。即使该文件被删除,也可以通过消息文件(commitlog)恢复回来。 Offset:offset标识mq的消费位置,min offset表示现存最小offset,max offset表示最新消息的offset+1。消息存储一段时间后,会被物理删除,min offset会对应增加。所以小于min offset的消息表示已不在broker上,无法被消费。 Consumer Offset: Consumer Group 在一条 MessageQueue 上的消费进度;
Offset保存位置:消费者集群模式下,offset保存在broker。broker主动推送消息,消费者拉取消息的时候需要指定offset
集群消费:
- 同一个Group内的Consumer分摊去消费消息,一条消息只会投递到Group下的一个实例。
- Consumer平均分摊MessageQueue拉取消息,一条MQ只会给一个Consumer;Producer发送消息轮询所有MessageQueue平均散落不同MQ上。
- 消费进度的存储会持久化到Broker。
广播消费:
- 同一个Group内的Consumer都会消费一遍消息,一条消息会投递到Group下的每一个实例。
- 消费进度的存储会持久化到本地。
顺序消费:
- 概念:消费顺序同发送顺序一致。
- MQ只绑定一个Consumer,Producer端使用MessageQueueSelector来选择同一个MQ队列,并单线程顺序发送。
消息:CommitLog消息文件和ConsumeQueue消息索引。ConsumeQueue的存储格式:commitLogOffset、size、messageTagHashcode。每个MessageQueue都有一个对应的ConsumeQueue文件。
如何提高消费处理能力:
- 添加Group(集群模式)下的Consumer实例。注意Consumer数量不要超过topic下的read queue数量,超过的接收不到消息。
- 提高单个Consumer实例的并行处理线程数(修改consumeThreadMin和consumeThreadMax???)。
负载均衡:在RocketMQ中,负载均衡或者消息分配是在consumer端代码完成的,consumer从broker获取全局信息,自己做负载均衡,只处理分给自己的那部分消息。
提高Producer发送速度:
- Oneway方式,只发送请求不等待应答。数据写入客户端的Socket缓冲器就返回。在一些对速度要求高,但是可靠性要求不高的场景下使用,比如日志收集类应用。
- 增加Producer数量,提高并发量。mq在broker端引入了一个并发窗口,在窗口内消息可并发写入DirectMem中,然后异步一起刷入文件系统。
复制策略:同步和异步复制,broker,Master和Slave数据复制。
刷盘策略:同步和异步刷盘,先写入PAGECACHE再一起刷盘。
Start
RocketMQ,Producer和Consumer的start方法业务逻辑代码是写在一起的。
ServiceState 状态
CREATE_JUST 服务刚创建还未启动 RUNNING 服务启动 SHUTDOWN_ALREADY 服务关闭 START_FAILED 服务启动失败
实例有四个状态,创建时默认为CREATE_JUST,启动成功后状态为RUNNING。
MessageModel 消费模式
默认为集群,CLUSTERING。集群模式的offsetStore是保存在Broker上,BROADCASTING广播模式保存在Local上。
MessageListener 消息监听器
- MessageListenerOrderly
- MessageListenerConcurrently
启动分析
start方法启动流程:
ServiceState.CREATE_JUST:
- checkConfig()
- copySubscription()
- getAndCreateMQClientInstance()
- offsetStore.load()
- messageListener
- consumeMessageService.start()
- mQClientFactory.registerConsumer()
- mQClientFactory.start()
- ServiceState.RUNNING
updateTopicSubscribeInfoWhenSubscriptionChanged() this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately();
mQClientFactory.start方法启动流程:
ServiceState.CREATE_JUST:
- mQClientAPIImpl.start()
- startScheduledTask()
- pullMessageService.start()
- rebalanceService.start()
- defaultMQProducer.start()
- ServiceState.RUNNING
定期执行的方法
sendHeartbeatToAllBrokerWithLock
MQClientInstance的sendHeartbeatToAllBrokerWithLock方法
Subscribe
创建一个新的消费者,可以给该消费者订阅某个Topic下的消息,然后为消费者添加消息处理程序,当有新消息到来时,自动调用消息处理程序处理消息。
消费者默认有两种实现,DefaultMQPushConsumer
和DefaultMQPullConsumer
。DefaultMQPushConsumer
类型消费者采用订阅的方式,订阅某个Topic,当Broker有这类Topic新消息时会主动推送给这类消费者。DefaultMQPullConsumer
类型消费者,自己主动轮询监听是否有最新的消息。默认我们使用DefaultMQPushConsumer
类型消费者。在RocketMQ里,消费者的Push主动推送的实现原理和Pull主动拉取采用一样的做法,都是长轮询Broker。Push采用自定义的规则订阅消息,而Pull由系统自动订阅所有消息。
消费者订阅Topic消息,DefaultMQPushConsumerImpl
将订阅消息封装成SubscriptionData
,保存到订阅队列中,最后更新Broker的订阅服务列表。Broker收到消费者的拉取消息请求后,根据请求携带的订阅规则进行校验,校验成功后读取消息,然后将消息发送给订阅者进行消费。
package com.alibaba.rocketmq.client.impl.consumer;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
...
/**
* 订阅Topic
* @param topic Topic名称
* @param subExpression 过滤表达式
* @throws MQClientException
*/
public void subscribe(String topic, String subExpression) throws MQClientException {
//将topic封装成订阅信息 SubscriptionData
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
//将订阅信息保存到订阅服务队列中
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
// Consumer刚启动的时候,mQClientFactory还未实例化,所以这里值为null。
if(this.mQClientFactory != null) {
//发现心跳消息以及更新Broker的订阅列表
this.mQClientfactory.sendHeartbeatToAllBrokerWithLock();
}
}
}
subscriptionInner是一个ConcurrentHashMap,key为topic,value为SubscriptionData对象。
package com.alibaba.rocketmq.client.impl.factory;
public class MQClientInstance {
...
// mQClientFactory.start()的时候会触发该方法
public void sendHeartbeatToAllBrokerWithLock() {
...
//发送心跳检查
this.sendHeartbeatToAllBroker();
//更新Broker的订阅服务列表
this.uploadFilterClassSource();
...
}
public void uploadFilterClassSource() {
....
// 获取消费者的订阅信息
Set<SubscriptionData> subscriptions = consumer.subscriptions();
// 遍历一个个上传
for (SubscriptionData sub : subscriptions) {
this.uploadFilterClassSourceToAllFilterServer(
consumerGroup, className, topic, filterClassSource);
}
...
}
public void uploadFilterClassSourceToAllFilterServer(
final String consumerGroup, final String fullClassName, final String topic,
final String filterClassSource) throws UnsupportedEncodingException {
...
this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, ...);
...
}
}
package com.alibaba.rocketmq.client.impl;
public class MQClientAPIImpl {
...
public void registerMessageFilterClass(final String addr,
final String consumerGroup,
final String topic,
final String className,
...) {
...
//设置请求消息类型为REGISTER_MASTER_FILTER_CLASS
RemotingCommand.createRequestCommand(RequestCode.REGISTER_MASTER_FILTER_CLASS, requestHeader);
//发送消息事件到Broker,更新Broker的订阅服务列表
this.remotingClient.invokeSync(addr, request, timeoutMillis);
...
}
}
Consumer启动后,订阅新的Topic,此时NameServer上是没有这个Topic信息。Producer发送该Topic类型的消息后,NameServer才维护Topic信息。
Topic默认有4个MessageQueue,消息会随机发往其中一个Queue。
Rebalance Service
主要概念:rocketmq中的负载均衡是在consumer端完成的,每个consumer启动后,会触发一个doRebalance动作;在同一个Group里加入新的consumer时,各个consumer都会触发doRebalance动作。
push consumer的六种负载均衡算法,默认AllocateMessageQueueAveragely平均分配。负载均衡与topic中message queue的数量和group中的consumer数量有关。负载均衡分配粒度只到message queue,把topic下的所有message queue分配到不同consumer中。
AllocateMessageQueueAveragely算法说明:将每个message queue平均分配到consuemr上,例如mq3个,consumer2个,则其中一个consumer会分配到2/3个mq,另一个分配1个mq。当consumer数量增加到4个,有一个consumer不会分配mq无法接收消息,其他3个consumer每个分配1个mq。所以consumer数量超过mq数量时,超过的consumer将无法消费消息。
pull consumer的负载均衡算法,consumer分配所有message queue,每个consumer的mq消费进度offset都有consumer自己管理。
消费者采用长轮询的方式向Broker请求消息。消费者启动后,开启RebalanceService
负载均衡服务,定时调用客户端实例MQClientInstance
的doRebalance
方法。doRebalance
方法遍历当前消费者的所有Topic列表,每个Topic调用rebalanceByTopic
方法。rebalanceByTopic
方法获取Topic的所有消息队列集合Set<MessageQueue>
。然后调用updateProcessQueueTableInRebalance
方法更新当前的消息处理队列集合,从消息处理队列集合中移除过期无用的消息队列ProcessQueue,ProcessQueue是MessageQueue的消费快照。将新的消息队列加入到消息处理队列集合中。然后调用dispatchPullRequest
方法将消息请求加入请求队列pullRequestQueue
。
PullMessageService
长轮询pullRequestQueue
,一旦有新的请求加入队列将处理请求。调用NettyRemotingClient
发送PULL_MESSAGE
请求。
package com.alibaba.rocketmq.client.impl.factory;
public class MQClientInstance {
...
//开启平衡服务
this.rebalanceService.start();
...
}
package com.alibaba.rocketmq.client.impl.consumer;
public class RebalanceService extends ServiceThread {
...
@Override
public void run() {
//定期执行doRebalance方法
while(!this.isStoped) {
this.waitForRunning(WaitInterval);
this.mqClientFactory.doRebalance();
}
}
...
}
package com.alibaba.rocketmq.client.impl.consumer;
public abstract class RebalanceImpl {
...
public boolean updateProcessQueueTableInRebalance(final String topic, fianl Set<MessageQueue> mqSet) {
...
//1.移除过期无用的MessageQueue
if(!mqSet.contains(mq)) {
this.removeUnnecessaryMessageQueue(mq, pq);
} else if (pq.isPullExpired) {
this.removeUnnecessaryMessageQueue(mq, pq);
}
//2.将新增的MessageQueue加入ProcessQueueTable消息处理队列集合中
this.processQueueTable.put(mq, pullRequest.getProcessQueue());
//3.将MessageQueue封装成PullRequest请求,然后发起请求
this.dispatchPullRequest(pullRequestList);
}
...
}
package com.alibaba.rocketmq.client.impl.consumer;
public class PullMessageService extends ServiceThread {
...
public void executePullRequestImmediately(fianl PullRequest pullRequest) {
//将请求加入请求队列中
this.pullRequestQueue.put(pullRequest);
}
@Override
public void run() {
while(!this.isStoped) {
//一直轮询监听请求队列
PullRequest pullRequest = this.pullRequestQueue.take();
if(pullRequest != null) {
//当一有新请求加入队列时,就处理该请求
this.pullMessage(pullReuqest);
}
}
}
}
Pull Message
PullAPIWrapper
类实现消息的拉取,核心方法pullKernelImpl
。
问题列表:
- 该方法由谁触发? –> DefaultMQPushConsumerImpl.pullMessage –> PullMessageService.pullMessage PullMessageService 一个单独的线程来负责消息的拉取。在MQClientInstance启动的时候创建了该线程。
- PullMessageService.run的逻辑,并不是定时pull,而是从pullRequestQueue.take阻塞。 pullRequestQueue是一个LinkedBlockingQueue结构,阻塞队列(无容量限制,初始化时建议指定大小,否则注意内存溢出问题。概念:take取数据方法,当队列为空时阻塞;put添加数据,当队列满时阻塞;LBQ内部由单链表实现,是读写分离的,读写操作可以并行执行;通过ReentrantLock可重入锁实现。)当pullRequestQueue有数据时(PullRequest),获取一个PullRequest消息拉取任务,调用pullMessage方法进行消息拉取。
- 这个queue里的数据是谁、何时放进去的?
PullMessageService.executePullRequestImmediately方法put入队 –> DefaultMQPushConsumerImpl.executePullRequestImmediately –> RebalanceImpl
居然最后是Rebalance来放进去的.
两种放进去方式:
- 拉取完一次消息后,又将PullRequest入队;
- RebalanceImpl中创建;
-
PullMessageService线程是谁启动的? 概念:PullMessageService只为PUSH模式服务,
- Rebalance怎么创建PullRequest的? updateProcessQueueTableInRebalance方法内创建 –> rebalanceByTopic –> doRebalance –> DefaultMQPushConsumerImpl.doRebalance –> RebalanceService 线程定期执行
RebalanceService定时线程处理逻辑:根据topic获取所有mq,根据消费模式集群还是广播来计算mq,集群模式使用负载均衡策略计算分配给group下该consumer的mq,最后创建pullRequest拉消息任务,PullMessageService执行拉消息任务。
集群模式下,consumer只会拉取分配给它的messageQueue。
pullKernelImpl
实现逻辑:
1)findBrokerAddressInSubscribe
2) mQClientAPIImpl.pullMessage
PullRequest
PullRequest数据结构:
- consumerGroup
- messageQueue 待拉取消费队列
- processQueue 消息处理队列,从broker拉取到的消息先存入processQueue,然后再提交到消费者线程池消费。
- nextOffset
- lockedFirst
mq和pq是一对一的,使用processQueueTable存储对应关系。
PullMessageService
PullMessageService
默认每次从消息服务端拉取32条消息,按消息的队列偏移顺序存放在ProcessQueue中。消费成功后从ProcessQueue中移除。
Consume Message
生产者成功发送消息请求后,Broker接收到该请求,校验成功后获取消息,然后将消息发送给消费者。消费者接收到消息后,回调已注册的消息处理程序PullCallBack
消费消息。
PullCallBack
接收到消息响应后,将消息响应提交到ConsumeMessageConcurrentlyService
服务类,ConsumeMessageConcurrentlyService
将消息响应封装成一个ConsumeRequest
Task任务提交到线程池consumeExecutor
中执行。ConsumeRequest
回调消费者自定义的消息处理程序,由该程序处理消息。
package com.alibaba.rocketmq.client.impl.consumer;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
...
/**
* 消费者注册自定义消息处理程序,当接收到新消息时会回调用该处理程序
* 处理消息。
* @param messageListener
*/
public void registerMessageListener(MessageListener messageListener) {
this.messageListenerInner = messageListener;
}
public void pullMessage(final PullRequest pullRequest) {
...
//消息请求回调方法
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullRequest) {
...
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispathToConsume
);
...
}
}
...
}
}
package com.alibaba.rocketmq.client.impl.consumer;
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
...
@Override
public void submitConsumeRequest(final List<MessageExt> msgs, ...) {
...
//1.将消息响应封装成一个Task任务
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
//2.将该任务提交到线程池
this.consumeExecutor.submit(consumeRequest);
...
}
class ConsumeRequest implements Runnable {
...
@Override
public void run() {
//用户自定义的消息处理回调方法
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
//调用用户自定义的方法
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
}
...
}
}
OffsetStore
消费进度,消费者获取最新消息,消费成功后,通知Broker更新消费进度位置。消费者下次拉取消息时,将从这个消费进度位置开始读取消息,防止消息被重复消费。
package com.alibaba.rocketmq.client.impl.consumer;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
...
// 消费进度存储
private OffsetStore offsetStore;
public void start() throws MQClientException {
...
if(this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 广播消费/集群消费
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore =
new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore =
new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
}
// 加载消费进度
this.offsetStore.load();
...
}
}
消费者启动时,先从Broker获取最新的消费进度,更新到本地。启动完成后,从最新的消费进度开始向Broker拉取消息。Broker接收到请求后,检查从这个消费进度开始后面是否有最新的消息,如果有则读取消息并返回,没有,则返回无最新消息。
消费者每次消费成功后,Broker都会更新消费者的消费进度,防止消费者重复消费消息。消费进度持久化到文件,保存到日志文件consumerOffset.json
中。
{
"offsetTable":{
"%RETRY%PushConsumer@PushConsumer":{0:0
},
"%RETRY%PushConsumer-1@PushConsumer-1":{0:0
},
// PushTopic-5 主题 PushConsumer 消费者组别 0,1,2,3 表示消息队列(默认有四个队列)
"PushTopic-5@PushConsumer":{0:2,1:3,2:0,3:0 // {0:2,...} 0 表示第一个消息队列,2表示该消费者消费的进度
},
"PushTopic-5@PushConsumer-1":{0:2,1:1,2:0,3:0
},
"PushTopic@PushConsumer":{0:0,1:0,2:1,3:1
}
}
}
package com.alibaba.rocketmq.broker.processor;
public class ClientManageProcessor implements NettyRequestProcessor {
...
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT: // 心跳请求
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT: // 注销客户端
return this.unregisterClient(ctx, request);
case RequestCode.GET_CONSUMER_LIST_BY_GROUP: // 获取消费者列表
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET: // 更新消费进度
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET: // 查询消费进度
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
/**
* 更新消费进度
* @param ctx
* @param request
* @return
* @throws RemotingCommandException
*/
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
...
// 反序列化请求
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
// 更新消费进度
this.brokerController.getConsumerOffsetManager().commitOffset(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
...
}
}
package com.alibaba.rocketmq.broker.offset;
/**
* Consumer消费进度管理
*
* @author shijia.wxr<vintage.wang@gmail.com>
* @since 2013-8-11
*/
public class ConsumerOffsetManager extends ConfigManager {
private static final String TOPIC_GROUP_SEPARATOR = "@";
// 消费进度队列
private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
/**
* 更新消费进度
* @param group 消费者组别
* @param topic 消息主题
* @param queueId 消息队列ID
* @param offset 消费进度
*/
public void commitOffset(final String group, final String topic, final int queueId, final long offset) {
// topic@group 例如 配置文件中保存的 "PushTopic-5@PushConsumer"
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(key, queueId, offset);
}
private void commitOffset(final String key, final int queueId, final long offset) {
ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map); // 保存到队列中
}
else {
map.put(queueId, offset);
}
}
/**
* 查询消费进度
* @param group 消费者组别
* @param topic 消息主题
* @param queueId 消息队列ID
*/
public long queryOffset(final String group, final String topic, final int queueId) {
// topic@group 根据 "PushTopic-5@PushConsumer" 来查询
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
if (null != map) {
Long offset = map.get(queueId);
if (offset != null)
return offset;
}
return -1;
}
}