什么是Netty
Netty是一个异步、事情驱动的网络编程框架,基于JDK NIO实现。对NIO进行了分装和优化,简化的网络编程的复杂度,提高了稳定和更高的性能。
异步
Netty的异步是通过回调和Future实现的。
回调:Netty内部使用回调来处理事件,当事件发生时,对应事件Handler的方法实现会被调用。比如ConnectHandler的channelActive方法,当一个新连接被建立时,该方法将会被调用。
Future:首先说明下Future的概念,future表示一个操作完成时通知应用程序的方式。
JDK原生Future
jdk提供的future实现其实是需要应用程序手动去检查或者一直阻塞直到它完成。
Netty提供的Future-ChannelFuture
Netty提供了自己的Future实现ChannelFuture,是基于事件监听来实现的。对JDK的Future进行扩展,增加了观察者模式。
比如客户端连接服务端事件,channel.connect方法返回的是一个channelFuture,表示连接是异步的。
我们通过给这个channelFuture添加事件监听器来监听连接是否成功。当连接成功时该监听器将会被调用。
事件驱动
使用事件来通知状态和操作的改变。
Netty中所有操作的发生都是采用事件的形式来通知的,当一个事件发生时,对应ChannelHandler的方法将会被调用。
IO模型
Netty 基于JDK NIO实现,JDK 1.4 NIO提供了非阻塞IO,基于多路复用的I/O实现。
I/O多路复用通过将多个I/O的阻塞复用到一个select的阻塞上,从而使用系统可以在单线程的情况下同时处理多个客户端请求。I/O多路复用最大的优势在于,系统开销小。
I/O多路复用是通过系统调用实现的,目前支持的系统调用有:select、poll和epoll。
select模式
将需要监听的套接字和事件放到一个集合fd_set上,select方法会一直监听这些套接字,遍历整个集合检查是否有事件发生,如果有事件发生就立即返回对应事件。
select模式的问题:
- 每次调用select函数,都需要把fd_set集合从用户态拷贝到内核态;
- select函数需要在内核态中遍历整个fd_set集合,线性扫描;
- 为了减少数据拷贝带来的性能损坏,内核对被监控的fd_set集合大小做了限制,默认为1024(可以通过修改系统参数进行限制)。
poll模式
对select的优化,集合改用polldf链表结构存储,所以没有了最大文件描述符的限制,但是也要线性扫描整个链表。
epoll模式
目前三者性能最优的一种模式,集合使用红黑树存储,没有最大文件描述符的限制。基于事件驱动实现,不需要扫描整个集合。当有事件发生时通过事件回调,将事件加入就绪队列,直接返回整个队列,队列使用双向链表结构存储。队列的返回即消息事件的通知,为了避免集合的来回拷贝,epoll使用了mmap技术,在用户空间和内核空间做了内存映射。
epoll的改进:
- 没有最大文件描述符的限制(仅受限于操作系统的最大文件句柄数);
- I/O效率不会随着文件描述符的增加而线性下降;
- 使用mmap加速内核与用户空间的消息传递。
线程模型
Netty线程模型,基于Reactor模型,同时支持Reactor单线程模型、多线程模型和主从多线程模型。Netty服务启动创建两个Reactor线程池,一个用于接收客户端的TCP连接,一个用于处理I/O请求。
核心组件
EventLoop
Netty的EventLoop其实就是对JDK Selector的封装,Channel(socket)添加到EventLoop上,注册读/写事件,调用select方法开始轮询监听。EventLoop是一个I/O线程,除了负责I/O的读写外,还负责系统任务和定时任务的处理。
EventLoop的主要工作:
- 轮询注册到selector上的所有channel的IO事件;
- 处理产生IO事件的channel;
- 处理任务队列。
这里先抛几个问题:
- 任务是什么时候加进来的?
- eventLoop是的是什么线程池?
EventLoopGroup
Netty的线程池实现,一个线程池维护多个EventLoop线程。Netty服务端启动创建两个EventLoopGroup,bossGroup负责Acceptor事件、wrokerGroup负责I/O事件。
EventLoopGroup内部使用ThreadPerTaskExecutor线程池,用于创建指定数量的EventLoop。
EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。
这里要注意,因为会有多个Channel共有一个EventLoop,那就会存在线程局部变量共享的问题。比如ThreadLocal在这种场景下就不好使用了,因为这些channel是共用一个Thread的。
Channel
bossGroup接收新的连接,将连接封装成channel交给workerGroup并注册读事件。
主要的几个I/O操作
- Channel.read(),从channel读取数据放入缓冲区(一个I/O操作),然后触发ChannelHandler.channelRead事件(一个I/O事件)。
- ChannelFuture.write(Object msg),将数据写入缓冲区。注意,这里的缓冲区是指应用程序缓冲区,数据还在用户空间。flush方法才会将数据写入内核空间。
- Channel.flush() 将应用程序缓冲区的数据写入内核缓冲区,即tcp发送缓冲区。
ChannelPipeline
Netty是基于事件驱动的,Channel进行I/O操作时会产生对应的I/O事件,事件会在ChannelPipeline中传播,然后对应的ChannelHandler对事件进行拦截和处理。
Netty中的事件分为I/O操作(网络I/O操作,比如读/写)和I/O事件(操作触发的各种事件)。I/O操作由DefaultChannelPipeline处理,I/O事件由ChannelPipeline处理。
ChannelPipeline基于责任链设计模式实现,结构是一个双向链表结构,链表上的每个节点实际是ChannelHandler处理器。
根据事件的流向,可以将Netty事件分为inbound事件和outbound事件。inbound事件由I/O线程触发,比如TCP连接、读等事件。outbound事件由用户线程触发,比如写事件等。
每个Channel都绑定一条ChannelPipeline,ChannelPipeline是线程安全的。因为一个Channel只会交给一个EventLoop线程处理。但是ChannelPipeline上的ChannelHandler不是线程安全的,因为ChannelHandler可以绑定到多个Pipeline上,会存在多线程并发调用情况。
Unsafe操作
Channel中的很多I/O操作都是交由Unsafe接口完成的。
ByteBuf
Netty中的ByteBuf是对JDK ByteBuffer的封装,提供堆内、堆外内存,内存池优化技术。前面说的数据缓冲区指的就是ByteBuf,Channel.write方法先将数据写入ByteBuf,然后Channel.flush方法再将数据从ByteBuf写入内核发送缓冲区。
ByteBuf提供堆内和堆外两种缓冲实现:
- 堆内(HeapByteBuf)字节缓冲区:缓冲对象在JVM堆内存分配,速度快。缺点是写操作需要先将数据从堆内拷贝到堆外。
- 堆外(DirectByteBuf)字节缓冲区:直接在JVM堆外分配,速度慢。优点是减少一次内存拷贝。
ByteBuf的实践推荐,网络I/O的操作比如读/写使用堆外缓冲区,业务线程的数据使用堆内缓冲区。
为了提高ByteBuf的利用率,降低对ByteBuf的频繁分配和回收,提供了基于对象池的ByteBuf,可以重用ByteBuf对象,提升内存利用率。
参数调优
ChannelOptions几个重要参数说明
- ChannelOption.SO_BACKLOG 设置 socket函数的listen(int socketfd, int backlog)的backlog值。
- ChannelOptions.SO_REUSEADDR 设置 SO_REUSEADDR 的值,允许复用IP地址和端口。
- ChannelOptions.SO_KEEPALVE 设置 SO_KEEPALVE 的值,用于tcp连接的空闲检测。
- ChannelOptions.SO_SNDBUF/SO_RVCBUF 设置SO_SNDBUF/SO_RVCBUF,接收和发送缓冲区的大小。
- ChannelOptions.SO_LINGER 设置 SO_LINGER 的值,调用close方法的处理逻辑。linux默认会继续把缓冲区的数据发完。
- ChannelOptions.TCP_NODELAY 设置 TCP_NODELAY 的值,关闭Nagle算法,立即发送数据。
高性能体现
- 每个Channel对应一个EventLoop线程,Channel上的所有操作都是串行话,避免了多线程的并发操作,实现了无锁串行执行。
- 基于Reactor主/从线程模型。
- 内存优化,ByteBuf的堆外内存,ByteBuf的对象池技术。
源码分析
启动流程
首先来看下引导类:
ServerBootstrap引导类
- group(bossGroup, workerGroup) 配置两大线程组
bossGroup 不断接受accept新连接,然后将新连接交给 wrokerGroup 处理
- channel(NioServerSocketChannel.class) 配置NIO模型
- NioServerSocketChannel -> ServerSocket
- NioSocketChannel -> Socket
- handler() 用于服务端启动过程中的一些逻辑处理,比如初始化处理
- childHandler() 用于每一个新连接的逻辑处理,比如读写处理
- NioSocketChannel
- bind() 绑定端口,异步绑定
- option() 设置服务端连接的一些网络相关配置,比如backlog
- ChannelOption.SO_BACKLOG 1024 已完成三次握手的请求队列最大长度。
- childOption() 设置每条连接的TCP相关属性
- ChannelOption.SO_KEEPALIVE 是否开启TCP底层心跳(这里好像跟socket里的keepalive不一样)
- ChannelOption.TCP_NODELAY 是否开启Nagle算法,表示数据是否立马发送
Channel的创建
Channel创建:
bootstrap.bind(port)方法–>1.initAndRegister new Channel、init Channel和register Channel;
new Channel: 通过反射创建ServerBootstrap传入的NioServerSocketChannel对象。
JDK NIO的流程也是先创建Channel,设置非阻塞。然后打开Selector,将channel注册到selector上,并监听accept事件。
创建ChannelPipeline headContext<->tailContext
init Channel: 将配置里的handler加入pipeline中(这个handler是哪个handler?)–SimpleServerHandler
把ServerBootstrapAcceptor连接器也加入pipeline中,连接器里面包含了workerGroup、childHandler
register Channel: 将evetloop绑定到Channel;
调用JDK的socket.bind方法绑定端口和地址
如何接收新连接
bossGroup负责accept事件,然后将连接封装成channel交给workerGroup。
bossGroup的工作内容有两个(上面有说过,这里再简单的回顾一下,workerGroup一样):
- 轮询selector上的I/O事件,以及处理接收到I/O事件;
- 处理提交过来的异步任务;
新连接的处理主要逻辑:
- accept到新连接事件;
- 获取连接,将连接封装成channel注册到workerGroup上;
- 并为新连接在workerGroup上注册read事件,workerGroup开始监听read事件。
上一步已分析,服务端启动会创建一条Server Channel(底层就是ServerSocket),然后将这条channel添加到bossGroup上并注册accpet事件,监听新的连接。
读取到新的连接,这里会触发在bootstrap引导类设置的handler()处理器,会依次执行对应的handler,告诉你有新连接来了。
那是如何获取这个新的连接呢?
Netty是基于JDK NIO实现的,所以也是通过NIO返回的。JDK NIO 返回一个Channel,然后Netty将JDK的这个Channel封装成自己框架定义的NioSocketChannel(假设是基于NIO模式)。NioSocketChannel在其构造函数内会自动注册read读事件(当前这里只是对参数进行了设置,真正注册的地方在后面),所以Netty一旦accept接收到新的连接请求,获取连接后接着会自动注册read读事件。
通过前面引导类我们可以看到,bootstrap有一个childHandler参数,Netty会为每一个新的连接绑定handler。那这个handler什么时候绑定的?也就是在NioSocketChannel创建的时候。Netty的handler的执行时通过Pipeline管道机制实现的,所以每个NioSocketChannel创建的时候都会绑定一个 Pipeline newChannelPipeline()
。
小结:
- boss线程池阻塞select,,有新的连接进入时SelectionKey.OP_ACCEPT事件,读取连接事件消息,调用JDK的accept方法获取新的连接SocketChannel,然后封装成NioSocketChannel。
- 封装的同时会注册Read事件SelectionKey.OP_READ到selector上,并设置为非阻塞模式ch.configureBlocking(false);
- 为该Channel创建了一个Pipeline。
head->ChannelInitializer->tail
- 将Channel注册到WrokerGroup线程池,通过轮序选择一个Wroker线程NioEventLoop
将Channel绑定到一个Selector上,一个Selector被一个Worker线程使用,后续的I/O事件轮询、事件处理、异步task执行都由该线程chuli
数据传输过程
- Netty编码器原理就是创建一个ByteBuf,将Java对象转换为ByteBuf,然后使用这个ByteBuf进行传递;
- write方法将数据写到netty缓冲区(是一个单向链表结构,可以循环使用,是DirectByteBuf类型),flush方法将netty缓冲区数据写入内核socket发送缓冲区(如果socket发送缓冲区满,flush会采用自旋方式确保数据全部写入成功);
数据拆包和粘包
先简单介绍下:
- 拆包:是一个完整的最小数据包被拆分成多个tcp包发出,所以接收者需要将数据进行合并;
- 粘包:是一个tcp包里面有多个数据包,接收者需要对这些数据包进行拆分。
如果二进制数据格式是基于长度来识别的,那可以使用LengthFieldBasedFrameDecoder
来实现拆包和粘包功能。
Netty提供了两个简单的用于拆包和粘包的实现:
LengthFieldBasedFrameDecoder
和LineBasedFrameDecoder
。
基于二进制的数据格式,一般都是将数据分为消息头和消息体。消息头包括消息类型、版本号等内容。协议约定固定前几个字节都是用来标记数据的功能作用。比如用1个字节表示这是数据包还是心跳包,用2个字节表示数据的长度等。
我们来看看Dubbo是如何实现的。
Dubbo就是自定义了一个叫dubbo的协议,根据dubbo协议进行数据的编码和解码,双方数据传输需要按照dubbo协议对数据进行处理才能传输。
先说结论:Dubbo是使用buffer缓存来实现的,如果数据不完整会先将数据放入buffer中,然后等下次数据到来再一起合并成完整数据包再解码。
可以从InternalDecoder
源码入手来看Dubbo是怎么实现的。
小结一番:
- dubbo接收到数据后,会将数据与buffer中的数据一起合并了再进行decoder解码处理;
- 解码是关键,解码会判断这次的数据包是否是一个完整的dubbo数据包,至少数据长度要大于等于一个dubbo协议数据
- 拆包情况:比如,解码时发现这次接收到的数据还没有一个消息头长,说明是一个不完整包;或者通过解析消息头,获取消息体长度,然后发现剩余的数据长度小于消息体长度,说明也是一个不完整包。那就把这次接收到的数据直接放入buffer中,等下一次数据到来再一次解码;
- 粘包情况:这个简单,每次解码一个数据包,会循环解析,如果最后刚好到末尾就不用处理,如果还是部分数据不足一个数据包那就放入buffer中。
EventLoop
EventLoop的核心方法就是一个run
,run方法的本质就是一个 for(;;)
不断轮询 selector
。
EventLoop的顶层接口是Executor,并继承了AbstractExecutorService,所以EventLoop就是一个Netty自定义的线程池(准确点应该是EventLoopGroup)。
但是,前面说了EventLoop除了selector监听事件,还会处理定时任务,所以这个select肯定不能永久阻塞,当有任务要执行时是需要被唤醒的。netty使用的是带有超时等待时间的select(timeoutMillis)
方法。
run方法的主要逻辑:
- 如果定时任务时间快到了,就break跳出循环去执行定时任务;
- 如果有新任务加入,也break去执行新任务;
- 都没有任务要执行,调用select(timeoutMillis)超时阻塞等待I/O事件;
- 隐藏bug,记录空事件次数和时间,超过一定次数就重建一个selector。
所以netty的select什么情况下会被唤醒?
- 本质的有I/O事件来就被唤醒;
- 队列里有任务了;
- 第一个定时任务要被执行了;
- 还有一个,添加任务的时候,外部线程也会唤醒select。
处理I/O事件
EventLoop监听到I/O事件后,是如何处理的呢?
I/O事件触发后,Netty有两种处理方式(这里先省略,有一大堆代码和逻辑),反正就是优化和处理。JDK selector 默认返回的是HashSet集合,Netty将其转换成数组,方便遍历。
先简单小结:
- 对于bossGroup的NioEventLoop来说,轮询到的是基本上就是accept连接事件,后续的事情就通过它的Pipeline将连接扔给一个workerGroup NioEventLoop处理
- 对于workerGroup的NioEventLoop来说,轮询到的基本上都是read/write IO读写事件,后续的事情就是通过它的Pipeline将读取到的字节流传递给每个channelHandler来处理
处理task
netty居然还有任务要处理,那这个任务是哪来的呢?
netty有两种类型的任务(其实都是用户提交的):
- 用户(用户线程)自己提交的(通过当前channel获取到绑定的eventloop,然后通过eventloop.execute方法提交任务)
- 用户提交的定时任务(eventloop.schedule)。
那任务提交到的队列,这个队列是什么结构?
- 定时任务存先放在优先级队列里,到时了再放入普通队列里。
- 普通任务存放在正常的队列里
那Netty又是如何去检测定时任务可以执行了?
EventLoopGroup
Netty实现的线程池,用于创建EventLoop线程。那线程池里默认的线程数是多少?Netty会根据系统的CPU核心进行参数设置。
回顾下JDK的线程池,有几个参数,我们先说几个,看Netty是如何实现的
- 线程工厂ThreadFactory
Netty里的线程工厂实现是ThreadPerTaskExecutor类,用于创建新的线程。每个EventLoop关联一个Thread。
Future
Netty的Future设计非常美妙。
JDK的Future:对于一个异步操作,可以暂时返回一个Future对象,然后去做别的事情,最后通过get方法拿到结果。如果get时异步操作还没有完成,则进行阻塞状态。
Netty对JDK Future类进行了扩展,增加了观察者模式。这样当异步操作执行完时,Netty就可以主动通知(回调)监听者(这种实现方式牛逼)。
但是请注意,这里的回调是由I/O线程发起的,即Reactor线程。所以传入的回调方法里不要执行耗时的任务,如果任务比较耗时最好再提交到业务线程里去执行。