Netty提供了ByteBuf来替代Java NIO的ByteBuffer缓冲区,以操纵内存缓冲区。
与Java NIO的ByteBuffer相比,ByteBuf的优势如下:
·
Pooling(池化,这点减少了内存复制和GC,提升了效率)
·复合缓冲区类型,支持零复制
·
不需要调用flip()方法去切换读/写模式
·扩展性好,例如StringBuffer
·可以自定义缓冲区类型
·读取和写入索引分开
·方法的链式调用
·可以进行引用计数,方便重复使用
ByteBuf是一个字节容器,内部是一个字节数组。从逻辑上来分,字节容器内部可以分为四个部分:
第一个部分是已用字节,表示已经使用完的废弃的无效字节;
第二部分是可读字节,这部分数据是ByteBuf保存的有效数据,从ByteBuf中读取的数据都来自这一部分;
第三部分是可写字节,写入到ByteBuf的数据都会写到这一部分中;
第四部分是可扩容字节,表示的是该ByteBuf最多还能扩容的大小。
ByteBuf的重要属性:
ByteBuf通过三个整型的属性有效地区分可读数据和可写数据,使得读写之间相互没有冲突
·readerIndex(读指针):
指示读取的起始位置。每读取一个字节,readerIndex
自动增加1。一旦readerIndex与writerIndex相等,则表示ByteBuf不可读了。
·writerIndex(写指针):
指示写入的起始位置。每写一个字节,writerIndex自
动增加1。一旦增加到writerIndex与capacity()容量相等,则表示ByteBuf已经不可写
了。capacity()是一个成员方法,不是一个成员属性,它表示ByteBuf中可以写入的容
量。注意,它不是最大容量maxCapacity。
·maxCapacity(最大容量):
表示ByteBuf可以扩容的最大容量。当向ByteBuf写
数据的时候,如果容量不足,可以进行扩容。扩容的最大限度由maxCapacity的值来
设定,超过maxCapacity就会报错。
ByteBuf的重要方法:
第一组:容量系列
·capacity():表示ByteBuf的容量,它的值是以下三部分之和:废弃的字节数、可读字节数和可写字节数。
·maxCapacity():表示ByteBuf最大能够容纳的最大字节数。当向ByteBuf中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到maxCapacity设定的上限。
第二组:写入系列
·isWritable():表示ByteBuf是否可写。如果capacity()容量大于writerIndex指针的位置,则表示可写,否则为不可写。
注意:如果isWritable()返回false,并不代表不
能再往ByteBuf中写数据了。如果Netty发现往ByteBuf中写数据写不进去的话,会自
动扩容ByteBuf。
·writableBytes():取得可写入的字节数,它的值等于容量capacity()减去writerIndex。
·maxWritableBytes():取得最大的可写字节数,它的值等于最大容量maxCapacity减去writerIndex。
·writeBytes(byte[] src):
把src字节数组中的数据全部写到ByteBuf。这是最为常
用的一个方法。
·writeTYPE(TYPE value):写入基础数据类型的数据。TYPE表示基础数据类型,包含了8大基础数据类型。具体如下:writeByte()、writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble()。
·setTYPE(TYPE value):基础数据类型的设置,不改变writerIndex指针值,包含了8大基础数据类型的设置。具体如下:setByte()、setBoolean()、setChar()、setShort()、setInt()、setLong()、setFloat()、setDouble()。
setType系列与writeTYPE系
列的不同:setType系列不改变写指针writerIndex的值;writeTYPE系列会改变写指针
writerIndex的值。
·markWriterIndex()与resetWriterIndex():这两个方法一起介绍。前一个方法表示把当前的写指针writerIndex属性的值保存在markedWriterIndex属性中;后一个方法表示把之前保存的markedWriterIndex的值恢复到写指针writerIndex属性中。markedWriterIndex属性相当于一个暂存属性,也定义在AbstractByteBuf抽象基类中。
第三组:读取系列
·isReadable( ):返回ByteBuf是否可读。如果writerIndex指针的值大于readerIndex指针的值,则表示可读,否则为不可读。
·readableBytes( ):返回表示ByteBuf当前可读取的字节数,它的值等于writerIndex减去readerIndex。
·readBytes(byte[] dst):
读取ByteBuf中的数据。将数据从ByteBuf读取到dst字节
数组中,这里dst字节数组的大小,通常等于readableBytes()。这个方法也是最为常
用的一个方法之一。
·readType():读取基础数据类型,可以读取8大基础数据类型。具体如下:readByte()、readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble()。
·getTYPE(TYPE value):
读取基础数据类型,并且不改变指针值。具体如下:getByte()、getBoolean()、getChar()、getShort()、getInt()、getLong()、getFloat()、getDouble()。getType系列与readTYPE系列的不同:getType系列不会改变读指针readerIndex的值;readTYPE系列会改变读指针readerIndex的值。
·markReaderIndex( )与resetReaderIndex( ):这两个方法一起介绍。前一个方法表示把当前的读指针ReaderIndex保存在markedReaderIndex属性中。后一个方法表示把保存在markedReaderIndex属性的值恢复到读指针ReaderIndex中。markedReaderIndex属性定义在AbstractByteBuf抽象基类中。
这里用了默认的分配器,分配了一个初始容量为9,最大限制为100个字节的缓冲区:
【ByteBuf的引用计数】
Netty采用“计数器”来追踪ByteBuf的生命周期,一是对Pooled ByteBuf的支持,二是能够尽快地“发现”那些可以回收的ByteBuf(非Pooled),以便提升ByteBuf的分配和销毁的效率。
插个题外话:
什么是Pooled(池化)的ByteBuf缓冲区呢?在通信程序的执行过程中,Buffer缓冲区实例会被频繁创建、使用、释放。大家都知道,频繁创建对象、内存分配、释放内存,系统的开销大、性能低,如何提升性能、提高Buffer实例的使用率呢?
从Netty4版本开始,新增了对象池化的机制。即创建一个Buffer对象
池,将没有被引用的Buffer对象,放入对象缓存池中;当需要时,则重新从对象缓
存池中取出,而不需要重新创建。
引用计数的大致规则如下:在
默认情况下,当创建完一个ByteBuf
时,它的引用为1;每次调用retain()方法,它的引用就加1;每次调用release()方法,
就是将引用计数减1;如果引用为0,再次访问这个ByteBuf对象,将会抛出异常;如
果引用为0,表示这个ByteBuf没有哪个进程引用它,它占用的内存需要回收。最后一次retain方法抛出了IllegalReferenceCountException异常。原因是:在此之前,缓冲区buffer的引用计数已经为0,不能再retain了。也就是说:在Netty中,引用计数为0的缓冲区不能再继续使用。
如果retain和release这两个方法,一次都不调用呢?则在缓冲区使用完成后,调用一次release,就是释放一次。
例如在Netty流水线上,中间所有的Handler业务处理
器处理完ByteBuf之后直接传递给下一个,由最后一个Handler负责调用release来释放
缓冲区的内存空间。
当引用计数已经为0,Netty会进行ByteBuf的回收。分为两种情况:(1)Pooled池化的ByteBuf内存,回收方法是:放入可以重新分配的ByteBuf池子,等待下一次分配。(2)Unpooled未池化的ByteBuf缓冲区,回收分为两种情况:如果是堆(Heap)结构缓冲,会被JVM的垃圾回收机制回收;如果是Direct类型,调用本地方法释放外部内存(unsafe.freeMemory)。
【ByteBuf的Allocator分配器】
Netty通过ByteBufAllocator分配器来创建缓冲区和分配内存空间。Netty提供了ByteBufAllocator的两种实现:PoolByteBufAllocator和UnpooledByteBufAllocator。
1.
PoolByteBufAllocator(池化ByteBuf分配器)将ByteBuf实例放入池中,提高了
性能,将内存碎片减少到最小;这个池化分配器采用了jemalloc高效内存分配的策
略,该策略被好几种现代操作系统所采用。
2.
UnpooledByteBufAllocator是普通的未池化ByteBuf分配器,它没有把ByteBuf放
入池中,每次被调用时,返回一个新的ByteBuf实例;通过Java的垃圾回收机制回
收。
为了验证两者的性能,大家可以做一下对比试验,当然结论是使用池化分配器不依赖jvmGC:
(1)使用UnpooledByteBufAllocator的方式分配ByteBuf缓冲区,开启10000个长连接,每秒所有的连接发一条消息,再看看服务器的内存使用量的情况。实验的参考结果:
在短时间内,可以看到占到10GB多的内存空间,但随着系统
的运行,内存空间不断增长,直到整个系统内存被占满而导致内存溢出,最终系统
宕机。
(2)把UnpooledByteBufAllocator换成PooledByteBufAllocator,再进行试验,看看服务器的内存使用量的情况。实验的参考结果:
内存使用量基本能维持在一个连接占用1MB左右的内存空
间,内存使用量保持在10GB左右,经过长时间的运行测试,我们会发现内存使用量
都能维持在这个数量附近,系统不会因为内存被耗尽而崩溃。在Netty中,默认的分配器为ByteBufAllocator.DEFAULT,可以通过Java系统参数(System Property)的选项io.netty.allocator.type进行配置,配置时使用字符串值:"unpooled","pooled"。
【ByteBuf缓冲区的类型】
根据内存的管理方不同,分为堆缓存区和直接缓存区,也就是Heap ByteBuf和Direct ByteBuf。另外,为了方便缓冲区进行组合,提供了一种组合缓存区
上面三种缓冲区的类型,无论哪一种,都可以通过池化(Pooled)、非池化
(Unpooled)两种分配器来创建和分配内存空间。下面对Direct Memory(直接内存)进行一下特别的介绍:
·Direct Memory不属于Java堆内存,所分配的内存其实是调用操作系统malloc()函数来获得的;由Netty的本地内存堆Native堆进行管理。
·Direct Memory容量可通过-XX:MaxDirectMemorySize来指定,如果不指定,则默认与Java堆的最大值(-Xmx指定)一样。注意:并不是强制要求,有的JVM默认Direct Memory与-Xmx无直接关系。
·
Direct Memory的使用避免了Java堆和Native堆之间来回复制数据。在某些应用
场景中提高了性能。
·
在需要频繁创建缓冲区的场合,由于创建和销毁Direct Buffer(直接缓冲区)
的代价比较高昂,因此不宜使用Direct Buffer。也就是说,Direct Buffer尽量在池化
分配器中分配和回收。如果能将Direct Buffer进行复用,在读写频繁的情况下,就可
以大幅度改善性能。
·
对Direct Buffer的读写比Heap Buffer快,但是它的创建和销毁比普通Heap
Buffer慢。·在Java的垃圾回收机制回收Java堆时,Netty框架也会释放不再使用的DirectBuffer缓冲区,因为它的内存为堆外内存,所以清理的工作不会为Java虚拟机(JVM)带来压力。注意一下垃圾回收的应用场景:(1)垃圾回收仅在Java堆被填满,以至于无法为新的堆分配请求提供服务时发生;(2)在Java应用程序中调用System.gc()函数来释放内存。
【三类ByteBuf使用对比】
·创建的方法不同:Heap ByteBuf通过调用分配器的buffer()方法来创建;而Direct ByteBuf的创建,是通过调用分配器的directBuffer()方法。
·Heap ByteBuf缓冲区可以直接通过array()方法读取内部数组;而Direct ByteBuf缓冲区不能读取内部数组。
·可以调用hasArray()方法来判断是否为Heap ByteBuf类型的缓冲区;如果hasArray()返回值为true,则表示是Heap堆缓冲,否则就不是。
·Direct ByteBuf要读取缓冲数据进行业务处理,相对比较麻烦,需要通过getBytes/readBytes等方法先将数据复制到Java的堆内存,然后进行其他的计算。
注意,如果hasArray()返回false,不一定代表缓冲区一定就是Direct ByteBuf直接缓冲区,也有可能是CompositeByteBuf缓冲区。在很多通信编程场景下,需要多个ByteBuf组成一个完整的消息:例如HTTP协议传输时消息总是由Header(消息头)和Body(消息体)组成的。如果传输的内容很长,就会分成多个消息包进行发送,消息中的Header就需要重用,而不是每次发送都创建新的Header。这是就是使用CompositeByteBuf较多:
在以上代码中,使用到了Netty中一个非常方便的类——Unpooled帮助类,用它来创建和使用非池化的缓冲区。另外,还可以在Netty程序之外独立使用Unpooled帮助类。
另外,从Netty 4.1开始ByteBuf的默认类型是Direct ByteBuf直接内存。大家知
道,Java不能直接访问Direct ByteBuf内部的数据,必须先通过getBytes、readBytes等
方法,将数据读入Java数组中,然后才能继续在数组中进行处理。
【ByteBuf浅层复制】
ByteBuf的浅层复制分为两种,有切片(slice)浅层复制和整体(duplicate)浅层复制。首先说明一下,浅层复制是一种非常重要的操作。可以很大程度地避免内存复制。这一点对于大规模消息通信来说是非常重要的。
1.slice切片浅层复制
ByteBuf的slice方法可以获取到一个ByteBuf的一个切片。一个ByteBuf可以进行多次的切片浅层复制;多次切片后的ByteBuf对象可以共享一个存储区域。
2.duplicate整体浅层复制
和slice切片不同,duplicate()返回的是源ByteBuf的整个对象的一个浅层复制,包括如下内容:
·duplicate的读写指针、最大容量值,与源ByteBuf的读写指针相同。
·duplicate()不会改变源ByteBuf的引用计数。
·duplicate()不会复制源ByteBuf的底层数据。
duplicate()和slice()方法都是浅层复制。不同的是,slice()方法是切取一段的浅层复制,而duplicate( )是整体的浅层复制。
浅层复制方法不会实际去复制数据,也不会改变ByteBuf的引用计数,这就会导致一个问题:在源ByteBuf调用release()之后,一旦引用计数为零,就变得不能访问了;在这种场景下,源ByteBuf的所有浅层复制实例也不能进行读写了;如果强行对浅层复制实例进行读写,则会报错。因此,在调用浅层复制实例时,可以通过调用一次retain()方法来增加引用,表示它们对应的底层内存多了一次引用,引用计数为2。在浅层复制实例用完后,需要调用两次release()方法,将引用计数减一,这样就不影响源ByteBuf的内存释放。
【ByteBuf在netty中的使用】
1.netty入站ByteBuf的创建:
在入站处理时,Netty是何时自动创建入站的ByteBuf的呢?
查看Netty源代码,我们可以看到,Netty的Reactor反应器线程会在底层的Java
NIO通道读数据时,也就是AbstractNioByteChannel.NioByteUnsafe.read()处,调用
ByteBufAllocator方法,创建ByteBuf实例,从操作系统缓冲区把数据读取到Bytebuf
实例中,然后调用pipeline.fireChannelRead(byteBuf)方法将读取到的数据包送入到入
站处理流水线中。
2.netty入站ByteBuf的释放:
方式一:TailHandler自动释放
Netty默认会在ChannelPipline通道流水线的最后添加一个TailHandler末尾处理
器,它实现了默认的处理方法,在这些方法中会帮助完成ByteBuf内存释放的工作。
在默认情况下,如果每个InboundHandler入站处理器,把最初的ByteBuf数据包
一路往下传,那么TailHandler末尾处理器会自动释放掉入站的ByteBuf实例。
方式二:SimpleChannelInboundHandler自动释放
如果Handler业务处理器需要截断流水线的处理流程,不将ByteBuf数据包送入
后边的InboundHandler入站处理器,这时,流水线末端的TailHandler末尾处理器自动
释放缓冲区的工作自然就失效了。在这种场景下,Handler业务处理器有两种选择:
·手动释放ByteBuf实例。
·
继承SimpleChannelInboundHandler,利用它的自动释放功能。
SimpleChannelInboundHandler自动释放源代码:
3.netty出站ByteBuf的释放:
出站缓冲区的自动释放方式:
HeadHandler自动释放。在出站处理流程中,申请分配到的ByteBuf主要是通过HeadHandler完成自动释放的。
出站处理用到的Bytebuf缓冲区,一般是要发送的消息,通常由Handler业务处理
器所申请而分配的。例如,在write出站写入通道时,通过调用
ctx.writeAndFlush(Bytebufmsg),Bytebuf缓冲区进入出站处理的流水线。在每一个出
站Handler业务处理器中的处理完成后,最后数据包(或消息)会来到出站的最后一
棒HeadHandler,在数据输出完成后,Bytebuf会被释放一次,如果计数器为零,将
被彻底释放掉。
4.ByteBuf的传递:
channelRead方法的msg参数的形参类型不是ByteBuf,而是Object,为什么呢?实际上,msg的形参类型是由流水线的上一站决定的。
大家知道,入站处理的流程是:Netty读取底层的二进制数据,填充到
msg时,msg是ByteBuf类型,然后经过流水线,传入到第一个入站处理器;每一个
节点处理完后,将自己的处理结果(类型不一定是ByteBuf)作为msg参数,不断向
后传递。因此,msg参数的形参类型,必须是Object类型。不过,可以肯定的是,
第
一个入站处理器的channelRead方法的msg实参类型,绝对是ByteBuf类型,因为它是
Netty读取到的ByteBuf数据包
,所以可以强
制转成ByteBuf类型。
【ByteBuf的入站解码】
什么叫作Netty的解码器呢?首先,它是一个InBound入站处理器,解码器负责处理“入站数据”。Netty内置了这个解码器,叫作
ByteToMessageDecoder,位在Netty的io.netty.handler.codec包中。
强调一下,所有的Netty中的解码器,都是Inbound入站处理器类型,都直接或
者间接地实现了ChannelInboundHandler接口。
ByteToMessageDecoder是一个非常重要的解码器基类,它是一个抽象类,实现了解码的基础逻辑和流程。
ByteToMessageDecoder继承自
ChannelInboundHandlerAdapter适配器,是一个入站处理器,实现了从ByteBuf到Java
POJO对象的解码功能。查看Netty源代码,我们会惊奇地发现:ByteToMessageDecoder仅仅是个抽象类,不能以实例化方式创建对象。也就是说,直接通过ByteToMessageDecoder类,并不能完成Bytebuf字节码到具体Java类型的解码,还得依赖于它的具体实现。ByteToMessageDecoder的解码方法名为decode。通过源代码我们可以发现,decode方法只是提供了一个抽象方法,也就是说,decode方法中的具体解码过程,ByteToMessageDecoder没有具体的实现。换句话说,如何将Bytebuf数据变成Object数据,需要子类去完成,父类不管。总之,作为解码器的父类,ByteToMessageDecoder仅仅提供了一个流程性质的框架:它仅仅将子类的decode方法解码之后的Object结果,放入自己内部的结果列表List<Object>中,最终,父类会负责将List<Object>中的元素,一个一个地传递给下一个站。哦,不对!父类还没有那么“勤快”,而是将子类的Object结果放入父类的List<Object>列表,也是交由子类的decode方法完成的。
实现一个ByteBuf的整数解码器:
但是如何解决分包问题呢:
前面讲到,底层通信协议是分包传输的,一份数据可能分几次达到对端。发送端出去的包在传输过程中会进行多次的拆分和组装。接收端所收到的包和发送端所发送的包不是一模一样的,具体如图7-2所示:在发送端发出4个字符串,Netty NIO接收端可能只是接收到了3个ByteBuf数据缓冲。在Java OIO流式传输中,不会出现这样的问题,因为它的策略是:不读到完整的信息,就一直阻塞程序,不向后执行。但是,在Java的NIO中,由于NIO的非阻塞性,就会出现图7-2这样的问题。怎样保证一次性读取到完整的数据,就成了一个大问题
我们知道,Netty接收到的数据都可以通过解码器进行解码。那么,Netty通过什么样的解码器对图7-2中的3个ByteBuf数据缓冲数据进行解码,而后得到和发送端一模一样的4个字符串,是怎么办到的呢?对于上面这个问题,还是可以使用ReplayingDecoder来解决。前文讲到,在进行数据解析时,如果发现当前ByteBuf中所有可读的数据不够,ReplayingDecoder会结束解析,直到可读数据是足够的。这一切都是在ReplayingDecoder内部进行,它是通过和缓冲区装饰器类ReplayingDecoderBuffer相互配合完成的,根本就不需要用户程序来操心。