RocketMQ In Action - Remoting

2016/08/28

RocketMQ.Remoting

Remoting,RocketMQ的通信系统。RocketMQ使用Netty作为底层通信框架,Netty是个事件驱动的网络编程框架。RocketMQ自定义了NettyRemotingServer和NettyRemotingClient两个客户端实例以及NettyRemotingAbstract公共抽象类。

RocketMQ各个模块间的通信,通过发送统一格式的自定义消息(RemotingCommand)来完成,大部分逻辑都是通过发送、接受并处理Command来完成。

RemotingService顶层接口,定义了startshutdown方法。接口RemotingServerRemotingClient继承了RemotingServiceRemotingServer服务端远程通信接口新增了事件处理程序方法:registerProcessor注册处理程序、registerDefaultProcess注册默认处理程序和getProcessPair获取处理程序,和三种通信方法:invokeSync同步通信、invokeAsync异步通信和invokeOneWay单通道通信。RemotingClient客户端远程通信接口新增了注册时间处理程序方法registerProcessor和三种通信方法,另外客户端要与服务端进行通信,需要先通过NameServer获取服务端的地址,所以还新增了获取和更新NameServer地址的方法:getNameServerAddressListupdateNameServerAddressList

NettyRemotingServerNettyRemotingClient为远程通信接口的默认实现类,同时还继承了NettyRemotingAbstract公共抽象类。

NettyRemotingAbstract

NettyRemotingAbstract实现了公共的方法:三种远程通信方法的实现,事件处理程序回调,处理远程通信请求。

invokeImpl

package com.alibaba.rocketmq.remoting.netty;

public abstract class NettyRemotingAbstract {
	...
	//默认事件处理程序,使用线程池执行事件处理程序
	protected Pair<NettyRequestProcessor/* 事件处理程序 */, ExecutorService/* 线程池 */> defaultRequestProcessor;

	//自定义事件处理程序,根据请求码调用不同的处理程序
	protected final HashMap<Integer/* 请求码 */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<~>(64);

	/**
     * 同步通信
     * @param channel
     * @param request
     * @param timeoutMillis
     * @return
     * @throws InterruptedException
     * @throws RemotingSendRequestException
     * @throws RemotingTimeoutException
     */
	public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, ...) {
		...
		final ResponseFuture responseFuture =
                    new ResponseFuture(request.getOpaque(), timeoutMillis, null, null);
		channel.writeAndFlush(request).addListener(new ChannelFutureListener() {

			/**
             * 通信结束回调方法
             * @param f
             * @throws Exception
             */	
			@Override
			public void operationComplete(ChannelFuture f) throws Exception {

				if(f.isSuccess()) {
					responseFuture.setSendRequestOK(true);
					return;
				} 
			}

			});

			//同步等待结果
			RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
			if(null == responseCommand) {
				...
				//抛出超时异常或者发送请求异常
			}

			return responseCommand
	}

	/**
     * 异步通信
     * @param channel
     * @param request
     * @param timeoutMillis
     * @param invokeCallback
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
	public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, ...) {
		...
		final ResponseFuture responseFuture =
                    new ResponseFuture(request.getOpaque(), timeoutMillis, invokeCallback, once);
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {

        	@Override
        	public void operationComplete(ChannelFuture f) throws Exception {
        		if(f.isSuccess()) {
        			responseFuture.setSendRequestOK(true);
        			return;
        		}
        		...
        	}
        	});

        // 异步通信不阻塞等待,但是会做通信 超时异常处理
	}

	/**
     * 单通道通信
     * @param channel
     * @param request
     * @param timeoutMillis
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
     public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, ...) {
     	...
     	//直接将发送请求
     	channel.writeAndFlush(request).addListener(new ChannelFutureListener() {

     		@Override
     		public void operationComplete(ChannelFuture f) throws Exception {
     			...
     			//不返回结果
     		}
     		});

     	...
     }

    /**
     * 处理消息接收事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
     public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
     	final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
            case REQUEST_COMMAND:
            	//处理请求类型的消息
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
            	//处理响应类型的消息
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
            }
        }
     }

	/**
     * 处理请求类型的消息事件
     * @param ctx
     * @param cmd
     */
     public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
     	//判断是否有自定义事件处理程序,如果有则使用自定义事件处理程序,如果没有则使用默认事件处理程序
     	final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair =
                null == matched ? this.defaultRequestProcessor : matched;

        if(pair != null) {
        	//将请求封装成一个任务,提交到线程池中执行
        	Runnable run = new Runnable() {
        		@Override
        		public void run() {
        			...
        			//调用事件处理程序的processRequest方法处理事件
        			pair.getObject1().processRequest(ctx, cmd);
        			...
        		}
        	}

        	//将任务提交到线程池中执行
        	pair.getObject2().submit(run);
        	...
    	} else {
    		...
    		//不支持该请求类型的消息
    	}
     }

    /**
     * 处理响应类型的事件
     * @param ctx
     * @param cmd
     */
    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    	...
    	//根据请求的Opaque,处理之前请求的回调事件
    }
}

NettyRemotingServer

NettyRemotingServer实现了RemotingServer接口同时继承了NettyRemotingAbstract公共抽象类。

package com.alibaba.rocketmq.remoting.netty;

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {

	//原理就是一个Netty的服务端实现
	private final ServerBootstrap serverBootstrap;
	private final EventLoopGroup eventLoopGroupSelector;
	private final EventLoopGroup eventLoopGroupBoss;
	//服务端配置
	private final NettyServerConfig nettyServerConfig;
	//管道监听器
	private final ChannelEventListener channelEventListener;

	public NettyRemotingServer(final NettyServerConfig, final ChannelEventListener channelEventListener) {
		this.serverBootstrap = new ServerBootstrap();
		....
	}

	@Override
	public void start() {
		this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
			.channel(NioServerSocketChannel.class)
			.option(ChannelOption.SO_KEEPALIVE, false)
			.childOption(ChannelOption.TCP_NODELAY, true)
			.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
			.childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {

					ch.pipeline().addLast(
						new NettyEncoder(),
						new NettyDecoder(),
						new IdleStateHandler(0, 0,nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
						new NettyConnetManageHandler(),
						new NettyServerHandler()
						);
				}
				});

		...
		ChannelFuture sync = this.serverBootstrap.bind().sync();
		...
	}

	/**
     * 注册默认事件处理程序
     * @param processor
     * @param executor
     */
    @Override
    public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
        this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
    }

    /**
     * 同步通信
     * @param channel
     * @param request
     * @param timeoutMillis
     * @return
     * @throws InterruptedException
     * @throws RemotingSendRequestException
     * @throws RemotingTimeoutException
     */
    @Override
    public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        return this.invokeSyncImpl(channel, request, timeoutMillis);
    }

    /**
     * 异步通信
     * @param channel
     * @param request
     * @param timeoutMillis
     * @param invokeCallback
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    @Override
    public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
    }

    /**
     * 单通道通信
     * @param channel
     * @param request
     * @param timeoutMillis
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    @Override
    public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException,
            RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        this.invokeOnewayImpl(channel, request, timeoutMillis);
    }

    /**
     * 服务端关闭
     */
    @Override
    public void shutdown() {

    	this.eventLoopGroupBoss.shutdownGracefully();
        this.eventLoopGroupSelector.shutdownGracefully();
    }
}

NettyRemotingClient

NettyRemotingServer实现了RemotingServer接口同时继承了NettyRemotingAbstract公共抽象类。

package com.alibaba.rocketmq.remoting.netty;

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
	
	//客户端配置
	private final NettyClientConfig nettyClientConfig;
	//原理就是一个Netty的客户端实现
	private final Bootstrap bootstrap = new Bootstrap();
	private final EventLoopGroup eventLoopGroupWroker;
	//管道监听器
	private final ChannelEventListener channelEventListener;

	@Override
	public void start() {

		Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
			.option(ChannelOption.TCP_NODELAY, true)
			.option(ChannelOption.SO_KEEPALIVE, false)
			.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(
						new NettyEncoder(),
						new NettyDecoder(),
						new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
						new NettyConnetManageHandler(),
						new NettyClientHandler());
				}
			});
			
	}

	@Override
	public void shutdown() {
		...
		this.eventLoopGroupWorker.shutdownGracefully();
		...
	}

	...
	//三种通信方法

	//注册事件处理程序

	//获取、更新NameServer地址列表
}

RemotingCommand

自定义通信协议格式,RemotingCommand。使用NettyEncoder对输出流进行编码,NettyDecoder对输入流进行解码。

package com.alibaba.rocketmq.remoting.protocol;

public class RemotingCommand {
	...

	public ByteBuffer encodeHeader() {
		return encodeHeader(this.body != null ? this.body.length : 0);
	}

	/**
     * 编码头部信息
     * @param bodyLength
     * @return
     */
	public ByteBuffer encodeHeader(final int bodyLength) {
		// 1. header length size 头部长度4个字节
		int length = 4;
		//2. header data length 头部数据字节数
		byte[] headerData;
		headerData = this.headerEncode();

		lenght += headerData.length;

		//3. body data length body数据字节数
		length += bodyLength;

		ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

		//length 添加长度
		result.putInt(length);

		// header length 添加头部长度
		result.put(markProtocalType(headerData.length, serializeTypeCurrentRPC));

		// header data 添加头部数据
		result.put(headerData);

		result.flip();

		return result;
	}

	/**
     * 解码
     * @param byteBuffer
     * @return
     */
	public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

	public byte[] getBody() {
		return body;
	}
}

NettyEncoder

package com.alibaba.rocketmq.remoting.netty;

public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.RemotingLogName);

    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
            throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }
}

NettyDecoder

package com.alibaba.rocketmq.remoting.netty;

public class NettyDecoder extends LengthFieldBasedFrameDecoder {
    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.RemotingLogName);
    private static final int FRAME_MAX_LENGTH = //
            Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "8388608"));


    public NettyDecoder() {
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }


    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            ByteBuffer byteBuffer = frame.nioBuffer();

            return RemotingCommand.decode(byteBuffer);
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }
}

MQClientInstance

Producer和Consumer都需要与Broker通信,底层功能都被抽象成同一个类,MQClientInstance。

MQClientInstance首先从NameServer获取各种配置信息并通过ScheduledTask来维护这些信息,同时通过MQClientAPIImpl实现消息的收发,MQClientAPIImpl负责底层消息通信。MQClientInstance可以被多个Consumer和Producer共用,通过工厂模式获取实例对象,也可以创建多个实例。

MQClientInstance会定时进行如下操作:

  1. 获取NameServer地址;
  2. 更新TopicRoute信息;
  3. 清理离线的Broker;
  4. 保存消费者的Offset。

Post Directory