RocketMQ.Remoting
Remoting,RocketMQ的通信系统。RocketMQ使用Netty作为底层通信框架,Netty是个事件驱动的网络编程框架。RocketMQ自定义了NettyRemotingServer和NettyRemotingClient两个客户端实例以及NettyRemotingAbstract公共抽象类。
RocketMQ各个模块间的通信,通过发送统一格式的自定义消息(RemotingCommand)来完成,大部分逻辑都是通过发送、接受并处理Command来完成。
RemotingService
顶层接口,定义了start
和shutdown
方法。接口RemotingServer
和RemotingClient
继承了RemotingService
,RemotingServer
服务端远程通信接口新增了事件处理程序方法:registerProcessor
注册处理程序、registerDefaultProcess
注册默认处理程序和getProcessPair
获取处理程序,和三种通信方法:invokeSync
同步通信、invokeAsync
异步通信和invokeOneWay
单通道通信。RemotingClient
客户端远程通信接口新增了注册时间处理程序方法registerProcessor
和三种通信方法,另外客户端要与服务端进行通信,需要先通过NameServer获取服务端的地址,所以还新增了获取和更新NameServer地址的方法:getNameServerAddressList
和updateNameServerAddressList
。
NettyRemotingServer
和NettyRemotingClient
为远程通信接口的默认实现类,同时还继承了NettyRemotingAbstract
公共抽象类。
NettyRemotingAbstract
NettyRemotingAbstract
实现了公共的方法:三种远程通信方法的实现,事件处理程序回调,处理远程通信请求。
invokeImpl
package com.alibaba.rocketmq.remoting.netty;
public abstract class NettyRemotingAbstract {
...
//默认事件处理程序,使用线程池执行事件处理程序
protected Pair<NettyRequestProcessor/* 事件处理程序 */, ExecutorService/* 线程池 */> defaultRequestProcessor;
//自定义事件处理程序,根据请求码调用不同的处理程序
protected final HashMap<Integer/* 请求码 */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<~>(64);
/**
* 同步通信
* @param channel
* @param request
* @param timeoutMillis
* @return
* @throws InterruptedException
* @throws RemotingSendRequestException
* @throws RemotingTimeoutException
*/
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, ...) {
...
final ResponseFuture responseFuture =
new ResponseFuture(request.getOpaque(), timeoutMillis, null, null);
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
/**
* 通信结束回调方法
* @param f
* @throws Exception
*/
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if(f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
}
});
//同步等待结果
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if(null == responseCommand) {
...
//抛出超时异常或者发送请求异常
}
return responseCommand
}
/**
* 异步通信
* @param channel
* @param request
* @param timeoutMillis
* @param invokeCallback
* @throws InterruptedException
* @throws RemotingTooMuchRequestException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
*/
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, ...) {
...
final ResponseFuture responseFuture =
new ResponseFuture(request.getOpaque(), timeoutMillis, invokeCallback, once);
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if(f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
...
}
});
// 异步通信不阻塞等待,但是会做通信 超时异常处理
}
/**
* 单通道通信
* @param channel
* @param request
* @param timeoutMillis
* @throws InterruptedException
* @throws RemotingTooMuchRequestException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
*/
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, ...) {
...
//直接将发送请求
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
...
//不返回结果
}
});
...
}
/**
* 处理消息接收事件
* @param ctx
* @param msg
* @throws Exception
*/
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
//处理请求类型的消息
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
//处理响应类型的消息
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
/**
* 处理请求类型的消息事件
* @param ctx
* @param cmd
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//判断是否有自定义事件处理程序,如果有则使用自定义事件处理程序,如果没有则使用默认事件处理程序
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair =
null == matched ? this.defaultRequestProcessor : matched;
if(pair != null) {
//将请求封装成一个任务,提交到线程池中执行
Runnable run = new Runnable() {
@Override
public void run() {
...
//调用事件处理程序的processRequest方法处理事件
pair.getObject1().processRequest(ctx, cmd);
...
}
}
//将任务提交到线程池中执行
pair.getObject2().submit(run);
...
} else {
...
//不支持该请求类型的消息
}
}
/**
* 处理响应类型的事件
* @param ctx
* @param cmd
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
...
//根据请求的Opaque,处理之前请求的回调事件
}
}
NettyRemotingServer
NettyRemotingServer
实现了RemotingServer
接口同时继承了NettyRemotingAbstract
公共抽象类。
package com.alibaba.rocketmq.remoting.netty;
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
//原理就是一个Netty的服务端实现
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupSelector;
private final EventLoopGroup eventLoopGroupBoss;
//服务端配置
private final NettyServerConfig nettyServerConfig;
//管道监听器
private final ChannelEventListener channelEventListener;
public NettyRemotingServer(final NettyServerConfig, final ChannelEventListener channelEventListener) {
this.serverBootstrap = new ServerBootstrap();
....
}
@Override
public void start() {
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0,nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyServerHandler()
);
}
});
...
ChannelFuture sync = this.serverBootstrap.bind().sync();
...
}
/**
* 注册默认事件处理程序
* @param processor
* @param executor
*/
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
/**
* 同步通信
* @param channel
* @param request
* @param timeoutMillis
* @return
* @throws InterruptedException
* @throws RemotingSendRequestException
* @throws RemotingTimeoutException
*/
@Override
public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
}
/**
* 异步通信
* @param channel
* @param request
* @param timeoutMillis
* @param invokeCallback
* @throws InterruptedException
* @throws RemotingTooMuchRequestException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
*/
@Override
public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}
/**
* 单通道通信
* @param channel
* @param request
* @param timeoutMillis
* @throws InterruptedException
* @throws RemotingTooMuchRequestException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
*/
@Override
public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeOnewayImpl(channel, request, timeoutMillis);
}
/**
* 服务端关闭
*/
@Override
public void shutdown() {
this.eventLoopGroupBoss.shutdownGracefully();
this.eventLoopGroupSelector.shutdownGracefully();
}
}
NettyRemotingClient
NettyRemotingServer
实现了RemotingServer
接口同时继承了NettyRemotingAbstract
公共抽象类。
package com.alibaba.rocketmq.remoting.netty;
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
//客户端配置
private final NettyClientConfig nettyClientConfig;
//原理就是一个Netty的客户端实现
private final Bootstrap bootstrap = new Bootstrap();
private final EventLoopGroup eventLoopGroupWroker;
//管道监听器
private final ChannelEventListener channelEventListener;
@Override
public void start() {
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyClientHandler());
}
});
}
@Override
public void shutdown() {
...
this.eventLoopGroupWorker.shutdownGracefully();
...
}
...
//三种通信方法
//注册事件处理程序
//获取、更新NameServer地址列表
}
RemotingCommand
自定义通信协议格式,RemotingCommand
。使用NettyEncoder对输出流进行编码,NettyDecoder对输入流进行解码。
package com.alibaba.rocketmq.remoting.protocol;
public class RemotingCommand {
...
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}
/**
* 编码头部信息
* @param bodyLength
* @return
*/
public ByteBuffer encodeHeader(final int bodyLength) {
// 1. header length size 头部长度4个字节
int length = 4;
//2. header data length 头部数据字节数
byte[] headerData;
headerData = this.headerEncode();
lenght += headerData.length;
//3. body data length body数据字节数
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
//length 添加长度
result.putInt(length);
// header length 添加头部长度
result.put(markProtocalType(headerData.length, serializeTypeCurrentRPC));
// header data 添加头部数据
result.put(headerData);
result.flip();
return result;
}
/**
* 解码
* @param byteBuffer
* @return
*/
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
public byte[] getBody() {
return body;
}
}
NettyEncoder
package com.alibaba.rocketmq.remoting.netty;
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.RemotingLogName);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
NettyDecoder
package com.alibaba.rocketmq.remoting.netty;
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.RemotingLogName);
private static final int FRAME_MAX_LENGTH = //
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "8388608"));
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
}
MQClientInstance
Producer和Consumer都需要与Broker通信,底层功能都被抽象成同一个类,MQClientInstance。
MQClientInstance首先从NameServer获取各种配置信息并通过ScheduledTask来维护这些信息,同时通过MQClientAPIImpl实现消息的收发,MQClientAPIImpl负责底层消息通信。MQClientInstance可以被多个Consumer和Producer共用,通过工厂模式获取实例对象,也可以创建多个实例。
MQClientInstance会定时进行如下操作:
- 获取NameServer地址;
- 更新TopicRoute信息;
- 清理离线的Broker;
- 保存消费者的Offset。