Netty

2019/03/10

什么是Netty

Netty是一个异步、事情驱动的网络编程框架,基于JDK NIO实现。对NIO进行了分装和优化,简化的网络编程的复杂度,提高了稳定和更高的性能。

异步

Netty的异步是通过回调Future实现的。

回调:Netty内部使用回调来处理事件,当事件发生时,对应事件Handler的方法实现会被调用。比如ConnectHandler的channelActive方法,当一个新连接被建立时,该方法将会被调用。

Future:首先说明下Future的概念,future表示一个操作完成时通知应用程序的方式。

JDK原生Future

jdk提供的future实现其实是需要应用程序手动去检查或者一直阻塞直到它完成。

Netty提供的Future-ChannelFuture

Netty提供了自己的Future实现ChannelFuture,是基于事件监听来实现的。对JDK的Future进行扩展,增加了观察者模式。

比如客户端连接服务端事件,channel.connect方法返回的是一个channelFuture,表示连接是异步的。

我们通过给这个channelFuture添加事件监听器来监听连接是否成功。当连接成功时该监听器将会被调用。

事件驱动

使用事件来通知状态和操作的改变。

Netty中所有操作的发生都是采用事件的形式来通知的,当一个事件发生时,对应ChannelHandler的方法将会被调用。

IO模型

Netty 基于JDK NIO实现,JDK 1.4 NIO提供了非阻塞IO,基于多路复用的I/O实现。

I/O多路复用通过将多个I/O的阻塞复用到一个select的阻塞上,从而使用系统可以在单线程的情况下同时处理多个客户端请求。I/O多路复用最大的优势在于,系统开销小。

I/O多路复用是通过系统调用实现的,目前支持的系统调用有:select、poll和epoll。

select模式

将需要监听的套接字和事件放到一个集合fd_set上,select方法会一直监听这些套接字,遍历整个集合检查是否有事件发生,如果有事件发生就立即返回对应事件。

select模式的问题:

  1. 每次调用select函数,都需要把fd_set集合从用户态拷贝到内核态;
  2. select函数需要在内核态中遍历整个fd_set集合,线性扫描;
  3. 为了减少数据拷贝带来的性能损坏,内核对被监控的fd_set集合大小做了限制,默认为1024(可以通过修改系统参数进行限制)。

poll模式

对select的优化,集合改用polldf链表结构存储,所以没有了最大文件描述符的限制,但是也要线性扫描整个链表。

epoll模式

目前三者性能最优的一种模式,集合使用红黑树存储,没有最大文件描述符的限制。基于事件驱动实现,不需要扫描整个集合。当有事件发生时通过事件回调,将事件加入就绪队列,直接返回整个队列,队列使用双向链表结构存储。队列的返回即消息事件的通知,为了避免集合的来回拷贝,epoll使用了mmap技术,在用户空间和内核空间做了内存映射。

epoll的改进:

  1. 没有最大文件描述符的限制(仅受限于操作系统的最大文件句柄数);
  2. I/O效率不会随着文件描述符的增加而线性下降;
  3. 使用mmap加速内核与用户空间的消息传递。

线程模型

Netty线程模型,基于Reactor模型,同时支持Reactor单线程模型、多线程模型和主从多线程模型。Netty服务启动创建两个Reactor线程池,一个用于接收客户端的TCP连接,一个用于处理I/O请求。

核心组件

EventLoop

Netty的EventLoop其实就是对JDK Selector的封装,Channel(socket)添加到EventLoop上,注册读/写事件,调用select方法开始轮询监听。EventLoop是一个I/O线程,除了负责I/O的读写外,还负责系统任务和定时任务的处理。

EventLoop的主要工作:

  1. 轮询注册到selector上的所有channel的IO事件;
  2. 处理产生IO事件的channel;
  3. 处理任务队列。

这里先抛几个问题:

  1. 任务是什么时候加进来的?
  2. eventLoop是的是什么线程池?

EventLoopGroup

Netty的线程池实现,一个线程池维护多个EventLoop线程。Netty服务端启动创建两个EventLoopGroup,bossGroup负责Acceptor事件、wrokerGroup负责I/O事件。

EventLoopGroup内部使用ThreadPerTaskExecutor线程池,用于创建指定数量的EventLoop。

EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。

这里要注意,因为会有多个Channel共有一个EventLoop,那就会存在线程局部变量共享的问题。比如ThreadLocal在这种场景下就不好使用了,因为这些channel是共用一个Thread的。

Channel

bossGroup接收新的连接,将连接封装成channel交给workerGroup并注册读事件。

主要的几个I/O操作

  1. Channel.read(),从channel读取数据放入缓冲区(一个I/O操作),然后触发ChannelHandler.channelRead事件(一个I/O事件)。
  2. ChannelFuture.write(Object msg),将数据写入缓冲区。注意,这里的缓冲区是指应用程序缓冲区,数据还在用户空间。flush方法才会将数据写入内核空间。
  3. Channel.flush() 将应用程序缓冲区的数据写入内核缓冲区,即tcp发送缓冲区。

ChannelPipeline

Netty是基于事件驱动的,Channel进行I/O操作时会产生对应的I/O事件,事件会在ChannelPipeline中传播,然后对应的ChannelHandler对事件进行拦截和处理。

Netty中的事件分为I/O操作(网络I/O操作,比如读/写)和I/O事件(操作触发的各种事件)。I/O操作由DefaultChannelPipeline处理,I/O事件由ChannelPipeline处理。

ChannelPipeline基于责任链设计模式实现,结构是一个双向链表结构,链表上的每个节点实际是ChannelHandler处理器。

根据事件的流向,可以将Netty事件分为inbound事件和outbound事件。inbound事件由I/O线程触发,比如TCP连接、读等事件。outbound事件由用户线程触发,比如写事件等。

每个Channel都绑定一条ChannelPipeline,ChannelPipeline是线程安全的。因为一个Channel只会交给一个EventLoop线程处理。但是ChannelPipeline上的ChannelHandler不是线程安全的,因为ChannelHandler可以绑定到多个Pipeline上,会存在多线程并发调用情况。

Unsafe操作

Channel中的很多I/O操作都是交由Unsafe接口完成的。

ByteBuf

Netty中的ByteBuf是对JDK ByteBuffer的封装,提供堆内、堆外内存,内存池优化技术。前面说的数据缓冲区指的就是ByteBuf,Channel.write方法先将数据写入ByteBuf,然后Channel.flush方法再将数据从ByteBuf写入内核发送缓冲区。

ByteBuf提供堆内和堆外两种缓冲实现:

  • 堆内(HeapByteBuf)字节缓冲区:缓冲对象在JVM堆内存分配,速度快。缺点是写操作需要先将数据从堆内拷贝到堆外。
  • 堆外(DirectByteBuf)字节缓冲区:直接在JVM堆外分配,速度慢。优点是减少一次内存拷贝。

ByteBuf的实践推荐,网络I/O的操作比如读/写使用堆外缓冲区,业务线程的数据使用堆内缓冲区。

为了提高ByteBuf的利用率,降低对ByteBuf的频繁分配和回收,提供了基于对象池的ByteBuf,可以重用ByteBuf对象,提升内存利用率。

参数调优

ChannelOptions几个重要参数说明

  1. ChannelOption.SO_BACKLOG 设置 socket函数的listen(int socketfd, int backlog)的backlog值。
  2. ChannelOptions.SO_REUSEADDR 设置 SO_REUSEADDR 的值,允许复用IP地址和端口。
  3. ChannelOptions.SO_KEEPALVE 设置 SO_KEEPALVE 的值,用于tcp连接的空闲检测。
  4. ChannelOptions.SO_SNDBUF/SO_RVCBUF 设置SO_SNDBUF/SO_RVCBUF,接收和发送缓冲区的大小。
  5. ChannelOptions.SO_LINGER 设置 SO_LINGER 的值,调用close方法的处理逻辑。linux默认会继续把缓冲区的数据发完。
  6. ChannelOptions.TCP_NODELAY 设置 TCP_NODELAY 的值,关闭Nagle算法,立即发送数据。

高性能体现

  1. 每个Channel对应一个EventLoop线程,Channel上的所有操作都是串行话,避免了多线程的并发操作,实现了无锁串行执行。
  2. 基于Reactor主/从线程模型。
  3. 内存优化,ByteBuf的堆外内存,ByteBuf的对象池技术。

源码分析

启动流程

首先来看下引导类:

ServerBootstrap引导类

  1. group(bossGroup, workerGroup) 配置两大线程组

    bossGroup 不断接受accept新连接,然后将新连接交给 wrokerGroup 处理

  2. channel(NioServerSocketChannel.class) 配置NIO模型
    • NioServerSocketChannel -> ServerSocket
    • NioSocketChannel -> Socket
  3. handler() 用于服务端启动过程中的一些逻辑处理,比如初始化处理
  4. childHandler() 用于每一个新连接的逻辑处理,比如读写处理
    • NioSocketChannel
  5. bind() 绑定端口,异步绑定
  6. option() 设置服务端连接的一些网络相关配置,比如backlog
    • ChannelOption.SO_BACKLOG 1024 已完成三次握手的请求队列最大长度。
  7. childOption() 设置每条连接的TCP相关属性
    • ChannelOption.SO_KEEPALIVE 是否开启TCP底层心跳(这里好像跟socket里的keepalive不一样)
    • ChannelOption.TCP_NODELAY 是否开启Nagle算法,表示数据是否立马发送

Channel的创建

Channel创建:

bootstrap.bind(port)方法–>1.initAndRegister new Channel、init Channel和register Channel;

new Channel: 通过反射创建ServerBootstrap传入的NioServerSocketChannel对象。

JDK NIO的流程也是先创建Channel,设置非阻塞。然后打开Selector,将channel注册到selector上,并监听accept事件

创建ChannelPipeline headContext<->tailContext

init Channel: 将配置里的handler加入pipeline中(这个handler是哪个handler?)–SimpleServerHandler

把ServerBootstrapAcceptor连接器也加入pipeline中,连接器里面包含了workerGroup、childHandler

register Channel: 将evetloop绑定到Channel;

调用JDK的socket.bind方法绑定端口和地址

如何接收新连接

bossGroup负责accept事件,然后将连接封装成channel交给workerGroup。

bossGroup的工作内容有两个(上面有说过,这里再简单的回顾一下,workerGroup一样):

  1. 轮询selector上的I/O事件,以及处理接收到I/O事件;
  2. 处理提交过来的异步任务;

新连接的处理主要逻辑:

  1. accept到新连接事件;
  2. 获取连接,将连接封装成channel注册到workerGroup上;
  3. 并为新连接在workerGroup上注册read事件,workerGroup开始监听read事件。

上一步已分析,服务端启动会创建一条Server Channel(底层就是ServerSocket),然后将这条channel添加到bossGroup上并注册accpet事件,监听新的连接。

读取到新的连接,这里会触发在bootstrap引导类设置的handler()处理器,会依次执行对应的handler,告诉你有新连接来了。

那是如何获取这个新的连接呢?

Netty是基于JDK NIO实现的,所以也是通过NIO返回的。JDK NIO 返回一个Channel,然后Netty将JDK的这个Channel封装成自己框架定义的NioSocketChannel(假设是基于NIO模式)。NioSocketChannel在其构造函数内会自动注册read读事件(当前这里只是对参数进行了设置,真正注册的地方在后面),所以Netty一旦accept接收到新的连接请求,获取连接后接着会自动注册read读事件。

通过前面引导类我们可以看到,bootstrap有一个childHandler参数,Netty会为每一个新的连接绑定handler。那这个handler什么时候绑定的?也就是在NioSocketChannel创建的时候。Netty的handler的执行时通过Pipeline管道机制实现的,所以每个NioSocketChannel创建的时候都会绑定一个 Pipeline newChannelPipeline()

小结:

  1. boss线程池阻塞select,,有新的连接进入时SelectionKey.OP_ACCEPT事件,读取连接事件消息,调用JDK的accept方法获取新的连接SocketChannel,然后封装成NioSocketChannel。
  2. 封装的同时会注册Read事件SelectionKey.OP_READ到selector上,并设置为非阻塞模式ch.configureBlocking(false);
  3. 为该Channel创建了一个Pipeline。

    head->ChannelInitializer->tail

  4. 将Channel注册到WrokerGroup线程池,通过轮序选择一个Wroker线程NioEventLoop

    将Channel绑定到一个Selector上,一个Selector被一个Worker线程使用,后续的I/O事件轮询、事件处理、异步task执行都由该线程chuli

数据传输过程

  1. Netty编码器原理就是创建一个ByteBuf,将Java对象转换为ByteBuf,然后使用这个ByteBuf进行传递;
  2. write方法将数据写到netty缓冲区(是一个单向链表结构,可以循环使用,是DirectByteBuf类型),flush方法将netty缓冲区数据写入内核socket发送缓冲区(如果socket发送缓冲区满,flush会采用自旋方式确保数据全部写入成功);

数据拆包和粘包

先简单介绍下:

  • 拆包:是一个完整的最小数据包被拆分成多个tcp包发出,所以接收者需要将数据进行合并;
  • 粘包:是一个tcp包里面有多个数据包,接收者需要对这些数据包进行拆分。

如果二进制数据格式是基于长度来识别的,那可以使用LengthFieldBasedFrameDecoder来实现拆包和粘包功能。

Netty提供了两个简单的用于拆包和粘包的实现:LengthFieldBasedFrameDecoderLineBasedFrameDecoder

基于二进制的数据格式,一般都是将数据分为消息头和消息体。消息头包括消息类型、版本号等内容。协议约定固定前几个字节都是用来标记数据的功能作用。比如用1个字节表示这是数据包还是心跳包,用2个字节表示数据的长度等。

我们来看看Dubbo是如何实现的。

Dubbo就是自定义了一个叫dubbo的协议,根据dubbo协议进行数据的编码和解码,双方数据传输需要按照dubbo协议对数据进行处理才能传输。

先说结论:Dubbo是使用buffer缓存来实现的,如果数据不完整会先将数据放入buffer中,然后等下次数据到来再一起合并成完整数据包再解码。

可以从InternalDecoder源码入手来看Dubbo是怎么实现的。

小结一番:

  1. dubbo接收到数据后,会将数据与buffer中的数据一起合并了再进行decoder解码处理;
  2. 解码是关键,解码会判断这次的数据包是否是一个完整的dubbo数据包,至少数据长度要大于等于一个dubbo协议数据
    • 拆包情况:比如,解码时发现这次接收到的数据还没有一个消息头长,说明是一个不完整包;或者通过解析消息头,获取消息体长度,然后发现剩余的数据长度小于消息体长度,说明也是一个不完整包。那就把这次接收到的数据直接放入buffer中,等下一次数据到来再一次解码;
    • 粘包情况:这个简单,每次解码一个数据包,会循环解析,如果最后刚好到末尾就不用处理,如果还是部分数据不足一个数据包那就放入buffer中。

EventLoop

EventLoop的核心方法就是一个run,run方法的本质就是一个 for(;;) 不断轮询 selector

EventLoop的顶层接口是Executor,并继承了AbstractExecutorService,所以EventLoop就是一个Netty自定义的线程池(准确点应该是EventLoopGroup)。

但是,前面说了EventLoop除了selector监听事件,还会处理定时任务,所以这个select肯定不能永久阻塞,当有任务要执行时是需要被唤醒的。netty使用的是带有超时等待时间的select(timeoutMillis)方法。

run方法的主要逻辑:

  1. 如果定时任务时间快到了,就break跳出循环去执行定时任务;
  2. 如果有新任务加入,也break去执行新任务;
  3. 都没有任务要执行,调用select(timeoutMillis)超时阻塞等待I/O事件;
  4. 隐藏bug,记录空事件次数和时间,超过一定次数就重建一个selector。

所以netty的select什么情况下会被唤醒?

  1. 本质的有I/O事件来就被唤醒;
  2. 队列里有任务了;
  3. 第一个定时任务要被执行了;
  4. 还有一个,添加任务的时候,外部线程也会唤醒select。

处理I/O事件

EventLoop监听到I/O事件后,是如何处理的呢?

I/O事件触发后,Netty有两种处理方式(这里先省略,有一大堆代码和逻辑),反正就是优化和处理。JDK selector 默认返回的是HashSet集合,Netty将其转换成数组,方便遍历。

先简单小结:

  1. 对于bossGroup的NioEventLoop来说,轮询到的是基本上就是accept连接事件,后续的事情就通过它的Pipeline将连接扔给一个workerGroup NioEventLoop处理
  2. 对于workerGroup的NioEventLoop来说,轮询到的基本上都是read/write IO读写事件,后续的事情就是通过它的Pipeline将读取到的字节流传递给每个channelHandler来处理

处理task

netty居然还有任务要处理,那这个任务是哪来的呢?

netty有两种类型的任务(其实都是用户提交的):

  1. 用户(用户线程)自己提交的(通过当前channel获取到绑定的eventloop,然后通过eventloop.execute方法提交任务)
  2. 用户提交的定时任务(eventloop.schedule)。

那任务提交到的队列,这个队列是什么结构?

  • 定时任务存先放在优先级队列里,到时了再放入普通队列里。
  • 普通任务存放在正常的队列里

那Netty又是如何去检测定时任务可以执行了?

EventLoopGroup

Netty实现的线程池,用于创建EventLoop线程。那线程池里默认的线程数是多少?Netty会根据系统的CPU核心进行参数设置。

回顾下JDK的线程池,有几个参数,我们先说几个,看Netty是如何实现的

  1. 线程工厂ThreadFactory

Netty里的线程工厂实现是ThreadPerTaskExecutor类,用于创建新的线程。每个EventLoop关联一个Thread。

Future

Netty的Future设计非常美妙。

JDK的Future:对于一个异步操作,可以暂时返回一个Future对象,然后去做别的事情,最后通过get方法拿到结果。如果get时异步操作还没有完成,则进行阻塞状态。

Netty对JDK Future类进行了扩展,增加了观察者模式。这样当异步操作执行完时,Netty就可以主动通知(回调)监听者(这种实现方式牛逼)。

但是请注意,这里的回调是由I/O线程发起的,即Reactor线程。所以传入的回调方法里不要执行耗时的任务,如果任务比较耗时最好再提交到业务线程里去执行。

Post Directory