RocketMQ In Action - Consumer

2016/08/18

RocketMQ.Consumer

消费者,消费消息。生产者将消息发送到Broker,Broker存储消息并将消息转发给消费者,消费者接收消息然后处理消息。消费者也可以订阅指定主题的消息,Broker收到该主题的消息后会主动将消息推送给已订阅的消费者。

Consumerd的功能

消息订阅

消息消费

源码解析

Consumer的启动类

Consumer启动时会向NameServer获取订阅Topic的路由信息,并与这些路由信息建立连接,维护心跳。Consumer端会一直维护这些连接,存放在channelTables中,key为broker地址,值为channel连接。

后台线程定时拉取消息

怎么拉?拉消息的动作都在PullMessageService线程里。PullMessageService轮询任务队列pullRequestQueue,一旦对里有新的PullRequest就取出来执行,最终调用DefaultMQPushConsumerImpl执行拉取操作。

PullRequest是由谁生成的?RebalanceService线程根据消费模式生成拉取请求。

总结

了解规则

Producer Group:发送同一类消息(topic)的生产者,如果某个Producer宕机,同Group下的其他producer会接手完成事务操作。

Consumer Group:接收同一类消息(topic+tag)的消费者,消费进度以Group为粒度,不同group消费进度彼此不影响;同一个Group下的Consumer订阅的topic和tag必须相同。 Topic:标识某一类消息,生产者和消费者都必须指定topic。 Tag:给topic打标签,消费者可以订阅指定tag的topic;一个Message消息只能有一个tag。Broker通过tag在ConsumeQueue中做hashcode过滤,提高查询效率。

MessageQueue:MQ,一个topic对应多个mq。生产者和消费者都是针对mq进行操作。每一条mq均对应一个文件(consumeQueue),这个文件存储了实际消息的索引信息。即使该文件被删除,也可以通过消息文件(commitlog)恢复回来。 Offset:offset标识mq的消费位置,min offset表示现存最小offset,max offset表示最新消息的offset+1。消息存储一段时间后,会被物理删除,min offset会对应增加。所以小于min offset的消息表示已不在broker上,无法被消费。 Consumer Offset: Consumer Group 在一条 MessageQueue 上的消费进度;

Offset保存位置:消费者集群模式下,offset保存在broker。broker主动推送消息,消费者拉取消息的时候需要指定offset

集群消费:

  1. 同一个Group内的Consumer分摊去消费消息,一条消息只会投递到Group下的一个实例。
  2. Consumer平均分摊MessageQueue拉取消息,一条MQ只会给一个Consumer;Producer发送消息轮询所有MessageQueue平均散落不同MQ上。
  3. 消费进度的存储会持久化到Broker。

广播消费:

  1. 同一个Group内的Consumer都会消费一遍消息,一条消息会投递到Group下的每一个实例。
  2. 消费进度的存储会持久化到本地。

顺序消费:

  1. 概念:消费顺序同发送顺序一致。
  2. MQ只绑定一个Consumer,Producer端使用MessageQueueSelector来选择同一个MQ队列,并单线程顺序发送。

消息:CommitLog消息文件和ConsumeQueue消息索引。ConsumeQueue的存储格式:commitLogOffset、size、messageTagHashcode。每个MessageQueue都有一个对应的ConsumeQueue文件。

如何提高消费处理能力:

  1. 添加Group(集群模式)下的Consumer实例。注意Consumer数量不要超过topic下的read queue数量,超过的接收不到消息。
  2. 提高单个Consumer实例的并行处理线程数(修改consumeThreadMin和consumeThreadMax???)。

负载均衡:在RocketMQ中,负载均衡或者消息分配是在consumer端代码完成的,consumer从broker获取全局信息,自己做负载均衡,只处理分给自己的那部分消息。

提高Producer发送速度:

  1. Oneway方式,只发送请求不等待应答。数据写入客户端的Socket缓冲器就返回。在一些对速度要求高,但是可靠性要求不高的场景下使用,比如日志收集类应用。
  2. 增加Producer数量,提高并发量。mq在broker端引入了一个并发窗口,在窗口内消息可并发写入DirectMem中,然后异步一起刷入文件系统。

复制策略:同步和异步复制,broker,Master和Slave数据复制。

刷盘策略:同步和异步刷盘,先写入PAGECACHE再一起刷盘。

Start

RocketMQ,Producer和Consumer的start方法业务逻辑代码是写在一起的。

ServiceState 状态

CREATE_JUST 服务刚创建还未启动 RUNNING 服务启动 SHUTDOWN_ALREADY 服务关闭 START_FAILED 服务启动失败

实例有四个状态,创建时默认为CREATE_JUST,启动成功后状态为RUNNING。

MessageModel 消费模式

默认为集群,CLUSTERING。集群模式的offsetStore是保存在Broker上,BROADCASTING广播模式保存在Local上。

MessageListener 消息监听器

  1. MessageListenerOrderly
  2. MessageListenerConcurrently

启动分析

start方法启动流程:

ServiceState.CREATE_JUST:

  1. checkConfig()
  2. copySubscription()
  3. getAndCreateMQClientInstance()
  4. offsetStore.load()
  5. messageListener
  6. consumeMessageService.start()
  7. mQClientFactory.registerConsumer()
  8. mQClientFactory.start()
  9. ServiceState.RUNNING

updateTopicSubscribeInfoWhenSubscriptionChanged() this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately();

mQClientFactory.start方法启动流程:

ServiceState.CREATE_JUST:

  1. mQClientAPIImpl.start()
  2. startScheduledTask()
  3. pullMessageService.start()
  4. rebalanceService.start()
  5. defaultMQProducer.start()
  6. ServiceState.RUNNING

定期执行的方法

sendHeartbeatToAllBrokerWithLock

MQClientInstance的sendHeartbeatToAllBrokerWithLock方法

Subscribe

创建一个新的消费者,可以给该消费者订阅某个Topic下的消息,然后为消费者添加消息处理程序,当有新消息到来时,自动调用消息处理程序处理消息。

消费者默认有两种实现,DefaultMQPushConsumerDefaultMQPullConsumerDefaultMQPushConsumer类型消费者采用订阅的方式,订阅某个Topic,当Broker有这类Topic新消息时会主动推送给这类消费者。DefaultMQPullConsumer类型消费者,自己主动轮询监听是否有最新的消息。默认我们使用DefaultMQPushConsumer类型消费者。在RocketMQ里,消费者的Push主动推送的实现原理和Pull主动拉取采用一样的做法,都是长轮询Broker。Push采用自定义的规则订阅消息,而Pull由系统自动订阅所有消息。

消费者订阅Topic消息,DefaultMQPushConsumerImpl将订阅消息封装成SubscriptionData,保存到订阅队列中,最后更新Broker的订阅服务列表。Broker收到消费者的拉取消息请求后,根据请求携带的订阅规则进行校验,校验成功后读取消息,然后将消息发送给订阅者进行消费。

package com.alibaba.rocketmq.client.impl.consumer;

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
	...

  	/**
	 * 订阅Topic
     * @param topic Topic名称
     * @param subExpression 过滤表达式
     * @throws MQClientException
     */
	public void subscribe(String topic, String subExpression) throws MQClientException {

		//将topic封装成订阅信息 SubscriptionData
		SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
			topic, subExpression);
		//将订阅信息保存到订阅服务队列中
		this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
		// Consumer刚启动的时候,mQClientFactory还未实例化,所以这里值为null。
		if(this.mQClientFactory != null) {
			//发现心跳消息以及更新Broker的订阅列表
			this.mQClientfactory.sendHeartbeatToAllBrokerWithLock();
		}
	}
}

subscriptionInner是一个ConcurrentHashMap,key为topic,value为SubscriptionData对象。

package com.alibaba.rocketmq.client.impl.factory;

public class MQClientInstance {
	...

	// mQClientFactory.start()的时候会触发该方法
	public void sendHeartbeatToAllBrokerWithLock() {
		...
		//发送心跳检查
		this.sendHeartbeatToAllBroker();
		//更新Broker的订阅服务列表
		this.uploadFilterClassSource();
		...
	}

	public void uploadFilterClassSource() {
		....
		// 获取消费者的订阅信息
		Set<SubscriptionData> subscriptions = consumer.subscriptions();
		// 遍历一个个上传
		for (SubscriptionData sub : subscriptions) {
			this.uploadFilterClassSourceToAllFilterServer(
			consumerGroup, className, topic, filterClassSource);
		}
		...
	}

	public void uploadFilterClassSourceToAllFilterServer(
		final String consumerGroup, final String fullClassName, final String topic,
		final String filterClassSource) throws UnsupportedEncodingException {
		...

		this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, ...);

		...
	}
}
package com.alibaba.rocketmq.client.impl;

public class MQClientAPIImpl {
	...

	public void registerMessageFilterClass(final String addr,
		final String consumerGroup,
		final String topic,
		final String className,
		...) {

		...

		//设置请求消息类型为REGISTER_MASTER_FILTER_CLASS
		RemotingCommand.createRequestCommand(RequestCode.REGISTER_MASTER_FILTER_CLASS, requestHeader);
		//发送消息事件到Broker,更新Broker的订阅服务列表
		this.remotingClient.invokeSync(addr, request, timeoutMillis);

		...
	}
}

Consumer启动后,订阅新的Topic,此时NameServer上是没有这个Topic信息。Producer发送该Topic类型的消息后,NameServer才维护Topic信息。

Topic默认有4个MessageQueue,消息会随机发往其中一个Queue。

Rebalance Service

主要概念:rocketmq中的负载均衡是在consumer端完成的,每个consumer启动后,会触发一个doRebalance动作;在同一个Group里加入新的consumer时,各个consumer都会触发doRebalance动作。

push consumer的六种负载均衡算法,默认AllocateMessageQueueAveragely平均分配。负载均衡与topic中message queue的数量和group中的consumer数量有关。负载均衡分配粒度只到message queue,把topic下的所有message queue分配到不同consumer中。

AllocateMessageQueueAveragely算法说明:将每个message queue平均分配到consuemr上,例如mq3个,consumer2个,则其中一个consumer会分配到2/3个mq,另一个分配1个mq。当consumer数量增加到4个,有一个consumer不会分配mq无法接收消息,其他3个consumer每个分配1个mq。所以consumer数量超过mq数量时,超过的consumer将无法消费消息。

pull consumer的负载均衡算法,consumer分配所有message queue,每个consumer的mq消费进度offset都有consumer自己管理。

消费者采用长轮询的方式向Broker请求消息。消费者启动后,开启RebalanceService负载均衡服务,定时调用客户端实例MQClientInstancedoRebalance方法。doRebalance方法遍历当前消费者的所有Topic列表,每个Topic调用rebalanceByTopic方法。rebalanceByTopic方法获取Topic的所有消息队列集合Set<MessageQueue>。然后调用updateProcessQueueTableInRebalance方法更新当前的消息处理队列集合,从消息处理队列集合中移除过期无用的消息队列ProcessQueue,ProcessQueue是MessageQueue的消费快照。将新的消息队列加入到消息处理队列集合中。然后调用dispatchPullRequest方法将消息请求加入请求队列pullRequestQueue

PullMessageService长轮询pullRequestQueue,一旦有新的请求加入队列将处理请求。调用NettyRemotingClient发送PULL_MESSAGE请求。

package com.alibaba.rocketmq.client.impl.factory;

public class MQClientInstance {
	...

	//开启平衡服务
	this.rebalanceService.start();

	...
}
package com.alibaba.rocketmq.client.impl.consumer;

public class RebalanceService extends ServiceThread {
	...

	@Override
	public void run() {

		//定期执行doRebalance方法
		while(!this.isStoped) {
			this.waitForRunning(WaitInterval);
			this.mqClientFactory.doRebalance();
		}
	}

	...
}
package com.alibaba.rocketmq.client.impl.consumer;

public abstract class RebalanceImpl {
	...

	public  boolean updateProcessQueueTableInRebalance(final String topic, fianl Set<MessageQueue> mqSet) {
		...

		//1.移除过期无用的MessageQueue
		if(!mqSet.contains(mq)) {
			this.removeUnnecessaryMessageQueue(mq, pq);
		} else if (pq.isPullExpired) {
			this.removeUnnecessaryMessageQueue(mq, pq);
		}
		//2.将新增的MessageQueue加入ProcessQueueTable消息处理队列集合中
		this.processQueueTable.put(mq, pullRequest.getProcessQueue());
		//3.将MessageQueue封装成PullRequest请求,然后发起请求
		this.dispatchPullRequest(pullRequestList);
	}

	...
}
package com.alibaba.rocketmq.client.impl.consumer;

public class PullMessageService extends ServiceThread {
	...

	public void executePullRequestImmediately(fianl PullRequest pullRequest) {
		//将请求加入请求队列中
		this.pullRequestQueue.put(pullRequest);
	}

	@Override
	public void run() {

		while(!this.isStoped) {
			//一直轮询监听请求队列
			PullRequest pullRequest = this.pullRequestQueue.take();
			if(pullRequest != null) {
				//当一有新请求加入队列时,就处理该请求
				this.pullMessage(pullReuqest);
			}
		}
	}
}

Pull Message

PullAPIWrapper类实现消息的拉取,核心方法pullKernelImpl

问题列表:

  1. 该方法由谁触发? –> DefaultMQPushConsumerImpl.pullMessage –> PullMessageService.pullMessage PullMessageService 一个单独的线程来负责消息的拉取。在MQClientInstance启动的时候创建了该线程。
  2. PullMessageService.run的逻辑,并不是定时pull,而是从pullRequestQueue.take阻塞。 pullRequestQueue是一个LinkedBlockingQueue结构,阻塞队列(无容量限制,初始化时建议指定大小,否则注意内存溢出问题。概念:take取数据方法,当队列为空时阻塞;put添加数据,当队列满时阻塞;LBQ内部由单链表实现,是读写分离的,读写操作可以并行执行;通过ReentrantLock可重入锁实现。)当pullRequestQueue有数据时(PullRequest),获取一个PullRequest消息拉取任务,调用pullMessage方法进行消息拉取。
  3. 这个queue里的数据是谁、何时放进去的? PullMessageService.executePullRequestImmediately方法put入队 –> DefaultMQPushConsumerImpl.executePullRequestImmediately –> RebalanceImpl 居然最后是Rebalance来放进去的. 两种放进去方式:
    1. 拉取完一次消息后,又将PullRequest入队;
    2. RebalanceImpl中创建;
  4. PullMessageService线程是谁启动的? 概念:PullMessageService只为PUSH模式服务,

  5. Rebalance怎么创建PullRequest的? updateProcessQueueTableInRebalance方法内创建 –> rebalanceByTopic –> doRebalance –> DefaultMQPushConsumerImpl.doRebalance –> RebalanceService 线程定期执行

RebalanceService定时线程处理逻辑:根据topic获取所有mq,根据消费模式集群还是广播来计算mq,集群模式使用负载均衡策略计算分配给group下该consumer的mq,最后创建pullRequest拉消息任务,PullMessageService执行拉消息任务。

集群模式下,consumer只会拉取分配给它的messageQueue。

pullKernelImpl实现逻辑: 1)findBrokerAddressInSubscribe 2) mQClientAPIImpl.pullMessage

PullRequest

PullRequest数据结构:

  • consumerGroup
  • messageQueue 待拉取消费队列
  • processQueue 消息处理队列,从broker拉取到的消息先存入processQueue,然后再提交到消费者线程池消费。
  • nextOffset
  • lockedFirst

mq和pq是一对一的,使用processQueueTable存储对应关系。

PullMessageService

PullMessageService默认每次从消息服务端拉取32条消息,按消息的队列偏移顺序存放在ProcessQueue中。消费成功后从ProcessQueue中移除。

Consume Message

生产者成功发送消息请求后,Broker接收到该请求,校验成功后获取消息,然后将消息发送给消费者。消费者接收到消息后,回调已注册的消息处理程序PullCallBack消费消息。

PullCallBack接收到消息响应后,将消息响应提交到ConsumeMessageConcurrentlyService服务类,ConsumeMessageConcurrentlyService将消息响应封装成一个ConsumeRequestTask任务提交到线程池consumeExecutor中执行。ConsumeRequest回调消费者自定义的消息处理程序,由该程序处理消息。

package com.alibaba.rocketmq.client.impl.consumer;

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
	...

	/**
     * 消费者注册自定义消息处理程序,当接收到新消息时会回调用该处理程序
     * 处理消息。
     * @param messageListener
     */
    public void registerMessageListener(MessageListener messageListener) {
        this.messageListenerInner = messageListener;
    }

    public void pullMessage(final PullRequest pullRequest) {

    	...

    	//消息请求回调方法
    	PullCallback pullCallback = new PullCallback() {

    		@Override
    		public void onSuccess(PullResult pullRequest) {
    			...

    			DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    				pullResult.getMsgFoundList(),
    				processQueue,
    				pullRequest.getMessageQueue(),
    				dispathToConsume
    				);

    			...
    		}

    	}

    	...
    }
}
package com.alibaba.rocketmq.client.impl.consumer;

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
	...

	@Override
	public void submitConsumeRequest(final List<MessageExt> msgs, ...) {
		...
		//1.将消息响应封装成一个Task任务
		ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
		//2.将该任务提交到线程池
		this.consumeExecutor.submit(consumeRequest);
		...
	}

	class ConsumeRequest implements Runnable {
		...

		@Override
		public void run() {
			//用户自定义的消息处理回调方法
			MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
			//调用用户自定义的方法
			status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
		}

		...

	}
}

OffsetStore

消费进度,消费者获取最新消息,消费成功后,通知Broker更新消费进度位置。消费者下次拉取消息时,将从这个消费进度位置开始读取消息,防止消息被重复消费。

package com.alibaba.rocketmq.client.impl.consumer;

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
	...

	// 消费进度存储
	private OffsetStore offsetStore;

	public void start() throws MQClientException {
		...

		if(this.defaultMQPushConsumer.getOffsetStore() != null) {
			this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
		} else {
			// 广播消费/集群消费
            switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                this.offsetStore =
                        new LocalFileOffsetStore(this.mQClientFactory,
                            this.defaultMQPushConsumer.getConsumerGroup());
                break;
            case CLUSTERING:
                this.offsetStore =
                        new RemoteBrokerOffsetStore(this.mQClientFactory,
                            this.defaultMQPushConsumer.getConsumerGroup());
                break;
            default:
                break;
            }
		}

		// 加载消费进度
        this.offsetStore.load();

        ...
	}
}

消费者启动时,先从Broker获取最新的消费进度,更新到本地。启动完成后,从最新的消费进度开始向Broker拉取消息。Broker接收到请求后,检查从这个消费进度开始后面是否有最新的消息,如果有则读取消息并返回,没有,则返回无最新消息。

消费者每次消费成功后,Broker都会更新消费者的消费进度,防止消费者重复消费消息。消费进度持久化到文件,保存到日志文件consumerOffset.json中。

{
	"offsetTable":{
		"%RETRY%PushConsumer@PushConsumer":{0:0
		},
		"%RETRY%PushConsumer-1@PushConsumer-1":{0:0
		},
		// PushTopic-5 主题 PushConsumer 消费者组别 0,1,2,3 表示消息队列(默认有四个队列)
		"PushTopic-5@PushConsumer":{0:2,1:3,2:0,3:0 // {0:2,...} 0 表示第一个消息队列,2表示该消费者消费的进度
		},
		"PushTopic-5@PushConsumer-1":{0:2,1:1,2:0,3:0
		},
		"PushTopic@PushConsumer":{0:0,1:0,2:1,3:1
		}
	}
}
package com.alibaba.rocketmq.broker.processor;

public class ClientManageProcessor implements NettyRequestProcessor {
	...

	@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
        switch (request.getCode()) {
        case RequestCode.HEART_BEAT: // 心跳请求
            return this.heartBeat(ctx, request);
        case RequestCode.UNREGISTER_CLIENT: // 注销客户端
            return this.unregisterClient(ctx, request);
        case RequestCode.GET_CONSUMER_LIST_BY_GROUP: // 获取消费者列表
            return this.getConsumerListByGroup(ctx, request);
        case RequestCode.UPDATE_CONSUMER_OFFSET: // 更新消费进度
            return this.updateConsumerOffset(ctx, request);
        case RequestCode.QUERY_CONSUMER_OFFSET: // 查询消费进度
            return this.queryConsumerOffset(ctx, request);
        default:
            break;  
        }
        return null;
    }

	/**
	 * 更新消费进度
	 * @param ctx
	 * @param request
	 * @return
	 * @throws RemotingCommandException
	 */
 	private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
        ...

        // 反序列化请求
        final UpdateConsumerOffsetRequestHeader requestHeader =
                (UpdateConsumerOffsetRequestHeader) request
                    .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);

        // 更新消费进度
        this.brokerController.getConsumerOffsetManager().commitOffset(requestHeader.getConsumerGroup(),
            requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());

        ...
    }
}
package com.alibaba.rocketmq.broker.offset;

/**
 * Consumer消费进度管理
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-8-11
 */
public class ConsumerOffsetManager extends ConfigManager {

	private static final String TOPIC_GROUP_SEPARATOR = "@";

	// 消费进度队列
	private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
            new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);

	/**
	 * 更新消费进度
	 * @param group 消费者组别
	 * @param topic 消息主题
	 * @param queueId 消息队列ID
	 * @param offset 消费进度
	 */
    public void commitOffset(final String group, final String topic, final int queueId, final long offset) {
        // topic@group 例如 配置文件中保存的 "PushTopic-5@PushConsumer"
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        this.commitOffset(key, queueId, offset);
    }

    private void commitOffset(final String key, final int queueId, final long offset) {
        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map); // 保存到队列中
        }
        else {
            map.put(queueId, offset);
        }
    }

	/**
	 * 查询消费进度
	 * @param group 消费者组别
	 * @param topic 消息主题
	 * @param queueId 消息队列ID
	 */
    public long queryOffset(final String group, final String topic, final int queueId) {
        // topic@group 根据 "PushTopic-5@PushConsumer" 来查询
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
        if (null != map) {
            Long offset = map.get(queueId);
            if (offset != null)
                return offset;
        }

        return -1;
    }
}

Post Directory