RocketMQ.Producer
消息的生产者,负责产生消息。通过Name Server获取到Topic信息,从Topic中选择一个消息队列进行消息发送。根据选择的消息队列,获取该消息队列的Broker地址,然后生产者与该Broker建立连接,将消息发送到Broker上。
消息发送分为同步发送、异步发送、延迟发送和事务发送消息。可以设置发送失败重试次数,这里就可能导致重复发送。
Producer的功能
Producer消息发送
先介绍下路由信息结构
- Producer的路由信息结构TopicPublishInfo
- NameServer返回的路由信息结构TopicRouteData
Producer的消息发送逻辑为以下三步骤:
1)获取路由信息
先从本地路由缓存中获取路由信息,如果没找到,则去NameServer上获取路由信息,如果NameServer上也没找到,NameServer则会根据配置判断是否自动创建路由信息,并将新创建的路由信息返回给Producer。
2)按负载均衡方式,选择路由
路由信息的数据结构为1个Topic对应多个路由信息,所以Producer在发送消息时需要根据策略从这些路由信息队列中选择一条路由进行发送。
路由选择策略是根据路由信息队列长度取模轮询顺序选择一条。路由选择好后,Producer会将路由信息分装成MessageQueue。
3)根据路由,发送消息到Broker
将消息进行分装,通过Netty将消息发送到对应的Broker上。
路由信息有哪些内容?
Producer是根据Topic去NameServer找对应路由信息,NameServer返回对应Topic的路由信息。告知Producer有哪些Broker提供了Topic存储服务。
消息的发送方式
Producer有三种消息发送方式:
- 同步发送:消息发送后,一直阻塞直到消息发送成功。
- 异步发送:消息发送后,立即返回,可以通过事件回调监听消息发送结果。
- 单向发送:消息发送后,立即返回。
Producer源码分析
总结
DefaultMQProducer
一类消息划分为一个Topic,一个Topic包含多条消息队列。生产者在发送消息时,需要指定消息所属的Topic类型,然后根据Topic选择其中一条消息队列进行消息发送。顶级接口MQAdmin
定义了创建Topic方法。MQProducer
接口继承MQAdmin
并定义了发送消息方法。DefaultMQProducer
实现了`MQProducer接口,作为生产者的默认实现类。
Producer调用start方法启动,DefaultMQProducer调用DefaultMQProducerImpl的start方法,DefaultMQProducerImpl为该生产者创建了一个消息队列客户端实例MQClientInstance,MQClientInstance分别创建了用于客户端远程通信辅助类MQClientAPIImpl、拉取消息服务类PullMessageService(DefaultMQPushConsumerImpl才使用)以及RebalanceService负载均衡服务类(消费者才使用到)。
Send Message
发送消息需要指定消息所属的Topic,生产者首先根据Topic在本地Topic路由信息表查找,如果本地没有再去Name Server获取Topic的队列信息,如果Topic在Name Server中不存在,Name Server会使用系统默认的Topic消息队列返回给生产者。生产者根据Name Server返回回来的消息队列更新本地的Topic信息表。然后从Topic信息表中选择一条消息队列进行通信。生产者根据消息队列的Broker去Name Server查找该Broker的地址信息。最后与该Broker建立连接,将消息发送给该Broker。
问题:
1)topic的broker路由信息包括哪些内容?
TopicRouteData
路由信息结构,主要有以下几个属性:
List<QueueData> queueDatas
-> brokerName, readQueueNums, writeQueueNums,该Topic下有多少个BrokerName。List<BrokerData> brokerDatas
-> BrokerName 1-* BrokerAddr 一个BrokerName可以包含多个Broker实例,包括一个Master和多个Slave。
换一种思路,看broker是怎么注册到nameserver的? RouteInfoManager.registerBroker方法注册broker,方式使用ReentrantReadWriteLock读写锁。
2)消息写到messageQueue后,是只写在其中一个broker上?如果没有配置集群,那是否就单例?
package com.alibaba.rocketmq.client.impl.producer;
public class DefaultMQProducerImpl implements MQProducerInner {
...
private SendResult sendDefaultImpl(Message, msg, //...) {
...
//1.根据消息设定的Topic查找Topic信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic);
...
//2.从Topic中选择一条消息队列
MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
//3.调用消息发送方法
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
...
}
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, //...) {
//4.根据Broker查找Broker远程地址信息
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
//5.调用消息发送方法
SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr, // Broker地址
mq.getBrokerName(), // Broker名称
msg, // 消息
...
);
...
}
}
package com.alibaba.rocketmq.client.impl;
public class MQClientAPIImpl {
...
public SendResult sendMessage(
final String addr, final String brokerName, final Message msg, ...) {
...
//默认采用同步方式发送消息
return this.sendMessageSync(addr, brokerName, msg, ...);
...
}
private SendResult sendMessageSync(
final String addr, final String brokerName, final Message msg, ...) {
//调用Netty客户端往指定addr地址发送消息请求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
...
}
}
package com.alibaba.rocketmq.remoting.netty;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
...
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) {
//根据addr地址获取channel管道,如果不存在则新建channel
final Channel channel = this.getAndCreateChannel(addr);
...
//实际发送消息的地方
this.invokeSyncImpl(channel, request, timeoutMillis);
...
}
}
package com.alibaba.rocketmq.remoting.netty;
public abstract class NettyRemotingAbstract {
...
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis) {
...
//往管道写入消息并刷新,消息发送完成
channel.writeAndFlush(request);
...
}
}
TopicPublishInfo
消息必须属于某个Topic,一个Topic包含多个消息队列,消息队列保存着对应Broker的名称信息。
pakcage com.alibaba.rocketmq.client.impl.producer;
/**
* Topic信息
*/
public class TopicPublishInfo {
//保存一个消息队列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
}
pakcage com.alibaba.rocketmq.common.message;
/**
* 消息队列信息
*/
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
//保存Topic名称
private String topic;
//保存Broker名称
private String brokerName;
}
Producer
- 启动了几个线程?如何与Broker/NameServer通信?如何发送消息?绑定了哪些Handler?
Producer启动会创建一个MQClientInstance(Consumer也是创建这个实例),MQClientInstance创建了MQClientAPIImpl用于与服务端通信(这里服务端是指NameServer或Broker)。
MQClientAPIImpl提供了与服务端交互的所有方法:start方法就是启动Netty客户端(NettyRemotingClient);send方法消息就是调用NettyRemoteClient的invoke方法。
NettyRemoteClient启动1个Worker线程
- eventLoopGroupWorker bossWorker
- ExecutorService publicExecutor CPU核数对应线程数 ClientCallbackExecutorThreads(服务端才使用到)
- DefaultEventExecutorGroup defaultEventExecutorGroup (clientWorkerThreads = 4,使用这个线程池来处理handler)
- NettyEventExecutor nettyEventExecutor (父类NettyRemotingAbstract线程)
bootstrap pipeline绑定的handler:
- NettyConnectManageHandler 应该是响应连接的,与服务端连接成功后会做什么?这个服务端是指谁?当连接成功后,会被封装成事件然后添加到defaultEventExecutorGroup线程池处理事件?
- NettyClientHandler 处理消息的channelRead0
createChannel的时候才向服务端发起connect请求。
channelTables保存请求IP地址和Channel对应关系。使用ChannelWrapper静态类来封装Channel,Channel的状态通过定义方法直接获取。(这个对Client来说没用,Client的channelEventListener为null,不做任何处理)
-
RocketMQ的消息类型为RemotingCommand
-
三种发送方式
invokeAsyncImpl -> 构建ResponseFuture,添加到集合responseTable(缓存所有正在发送的请求消息),然后channel.writeAndFlush(request).addListener -> 发送成功将 responseFuture标记为OK。
回调函数什么时候被调用呢?channelRead0到消息后,判断消息类型,如果是Response就是服务端返回的响应,然后根据消息里的ID opaque到responseTable集合找到对应的ReponseFuture,获取绑定的回调函数,提交到
inveokeSyncImpl -> 发送消息后,调用responseFuture.waitResponse(timeoutMills)阻塞等待指定时间。
- Producer发送完消息就关闭Channel?Channel是什么时候创建、什么时候关闭的?是有维护Channel的心跳?
sync消息发送时间是哪里指定的?配置文件吗?
回调的putResposne是谁来执行的? 由NettyClientHandler来执行
异步回调也是在NettyClientHandler中执行,判断ReponseFuture是否有设置回调函数,如果有就调用回调函数。
为什么不使用线程池?而不是一个addr共用一个连接Channel,而且oneway和async方式要用信号量来控制并发读(因为这两种方式都是只要写就可以,防止过快写导致消息错乱。不是,看了源码,说是为了控制发送数量的,防止内存爆掉,这些都是异步写。),sync却不用?
有点想不明白,这样的模式如何保证高性能的发送消息?
Producer发送消息的时候会根据发送的addr从map中取出对应的channel,如果channel可用就使用该channel发送消息,根据选择的发送方式分为同步发送、异步发送、ONEWAY模式;如果channel不可用或者为null,就重新创建一个新的channel,(那旧的channel呢?谁来维护?继续看源码发现直接简单的map.remove掉。)
- NettyRemotingServer是如何处理接收到的消息的?好像是根据code都注册了处理程序和对应的执行线程池?
NettyRemotingServer.registerProcessor code 就是RequestCode定义的代码,这里的服务端主要是Broker,所以Broker启动后会注册所有的请求类型和处理事件以及线程池。
- 客户端(Producer或Consumer)启动服务的时候会做哪些事情?
- channelTables维护每个addr对应的channel, 对table的操作使用reentranlock锁来控制并发访问问题(目前代码里面没看到心跳,看到ClientRemotingProcessor代码,是不是服务端主动与客户端心跳?)。
- responseTable保存所有正在发送消息但还未返回结果的消息回调函数列表ResponseFuture。
- 给每条连接都添加NettyClientHandler处理器,客户端主要处理response消息,收到消息后从responseTable中取出对应消息回调函数,将返回的消息添加到ResponseFuture中,并调用绑定在Future上的回调事件,提交到callbackExecutor线程池执行。
- callbackExecutor线程池用来处理所有的回调函数。
- processorTable 注册requestcode,设置对应的处理函数ClientRemotingProcessor。
额外补充: MQClientInstance是客户端启动的实例,每个客户端都会启动一个这样的实例。MQClientInstance使用MQClientAPIImpl类来与远程进行通信,MQClientAPIImpl类封装了所有通信操作的API接口(内部其实是调用NettyRemotingClient来进行交互)。
因为Producer和Consumer都是使用MQClientInstance类,所以要把里面哪些属于Producer的,哪些属于Consumer的要梳理出来,逻辑才不会乱。
- 现在先来看下NameSrv,看看它是不是一个服务端,以及如何与客户端交互做了哪些事情。
- 一打开代码,RemotingServer就映入眼帘了,所以NameSrv就是一个服务端,使用了NettyRemoteServer。
- BrokerHousekeepingService 看名字就知道是用来与Broker保存心跳的服务,这个就是NettyRemotingServer里面的channelEventListener!(channelEventListener还不知道是做什么的)
- 创建了些线程池: 1)Executors.newFixedThreadPool(serverWorkerThreads 8 ) 创建了一个固定的线程池8,用来处理客户端发来的请求的。 2)scheduleAtFixedRate,routeInfoManager.scanNotActiveBroker 扫描不存活的Broker,看来是用来检测Broker是否存活的 3)scheduleAtFixedRate,kvConfigManager.printAllPeriodically 应该是打印一些统计信息的
- registerProcessor 注册可以处理的消息类型,将接收到的消息提交给前面创建的那个fixedThreadPool
现在看来NameSrv代码好简单啊,就是启动一个NettyServer监听对应消息类型的请求,然后处理,现在看看是怎么处理的,是只是转发还是真的有处理。内部应该还是有维护一些节点信息才对。都是基于内存的集合操作。
RouteInfoManager就是NameSrv维护的所有信息: 1)topicQueueTable topic和broker的对应关系,哪些broker可以接收topic类型的消息 2)brokerAddrTable broker对应的addr信息 3)brokerLiveTable 存活的broker节点 4)filterServerTable 服务端过滤,不知道是不是在namesrc就进行过滤了???
NameSrc使用了读写锁ReentrantReadWriteLock来更新这些信息?好像不是,稍等看看。
这些信息是如何维护的呢?就是在Processor里面回调处理的,接收到对应客户的发来的消息后,来更新这些信息。
registerBroker: 比如broker节点信息,目前有哪些broker可用?Broker启动后会发送registerBroker消息来向NameSrv注册broker节点。register的时候使用了writeLock写锁,不允许其他broker同时写入。clusterName是哪里来的?