开发问题解决方案
时间和时间戳的应用场景
Netty的ByteBuf基础知识
JVM生产环境调优工具
i18n注意事项
导致缓存击穿的代码示例
CPU高的问题排查方法
服务器性能监控
复费率读写接口标准
本文档使用 MrDoc 发布
-
+
首页
Netty的ByteBuf基础知识
# Netty的ByteBuf基础知识 # 1. ByteBuf简介 网络上数据的基本单位是字节,Java NIO提供的ByteBuffer作为字节容器并不好用,所以Netty提供了替代品ByteBuf类,具有很多优秀的特性: - 容量可以按需动态扩展,类似于 StringBuffer; - 读写采用了不同的指针,读写模式可以随意切换,不需要调用 flip 方法; - 通过内置的复合缓冲类型可以实现零拷贝; - 支持引用计数; - 支持缓存池。 ## 1.1 基本结构 ByteBuf除了保存字节内容外,还会维护2个索引——读索引readerIndex和写索引writerIndex,当从其中读数据时,读索引readerIndex会递增,当写入ByteBuf时,写索引writerIndex会递增。 另外,ByteBuf还维护了废弃字节和可扩容字节这两个属性。ByteBuf默认容量限制为Integer.MAX_VALUE。 ## 1.2 分类维度 ByteBuf都可以划分归属到三个不同的维度中(看不明白可以光看加黑部分,有个概念即可): - **Heap/Direct**:**堆内和堆外内存**。Heap 指的是在 JVM 堆内分配,底层依赖的是字节数据;Direct 则是堆外内存,不受 JVM 限制,分配方式依赖 JDK 底层的 ByteBuffer; - **Pooled/Unpooled**:**池化和非池化内存**。Pooled 是从预先分配好的内存中取出,使用完可以放回 ByteBuf 内存池,等待下一次分配。而 Unpooled 是直接调用系统 API 去申请内存,确保能够被 JVM GC 管理回收; - **Unsafe/非Unsafe**:**操作方式是否安全**。Unsafe 表示每次调用 JDK 的 Unsafe 对象操作物理内存,依赖 offset + index 的方式操作数据。非 Unsafe 则不需要依赖 JDK 的 Unsafe 对象,直接通过数组下标的方式。 Netty通过ByteBufAllocator分配器来创建ByteBuf,分配器有两种实现: - PoolByteBufAllocator:池化ByteBuf分配器,将ByteBuf实例放入内存池中,提高了性能,将内存碎片减少到最小。它底层采用了**jemalloc**高效内存分配的策略,该策略被好几种现代操作系统所采用; - UnpooledByteBufAllocator:普通未池化ByteBuf分配器,它没有把ByteBuf放入内存池中,每次被调用时,返回一个新的ByteBuf实例,通过Java的垃圾回收机制回收。 ## 1.3 池化和非池化 为什么Netty要使用池化技术,内容实在太多,有兴趣的可以看这一篇: [Java中看内存分配—Netty内存池](https://www.notion.so/Java-Netty-15466b04f0fd45d88dd6141696b8bc21?pvs=21) 简单来说,池化的意义在于如下2方面: - 对于DirectByteBuffer的分配和释放是比较低效的,使用池化技术能快速分配内存; - 池化技术使对象可以复用,从而降低GC频率。 # 2. ByteBuf用法 ## 2.1 堆缓冲区 堆缓冲区是在JVM的堆内存中创建对象,由JVM负责分配内存和GC。 堆缓冲区申请和释放资源会相对简单,但发送数据的速度较慢,因为发送之前JVM内部需要将数据复制到一个直接缓冲区中,然后从直接缓冲区中发送。 ```java private static void createHeapBuf(){ // 创建缓冲区 ByteBuf heapBuf = Unpooled.buffer(16); // 向缓冲区中写入数据 heapBuf.writeByte(1); heapBuf.writeByte(2); } ``` 现在我们使用最多的就是堆缓冲区,这种用法通过牺牲性能换取代码安全,声明ByteBuf对象就如同写其他Java对象一样,无需考虑何时进行垃圾回收,一切交给JVM自动完成。 ## 2.2 直接缓冲区 直接缓冲区是在JVM之外直接操作内存,它对于网络数据传输是最理想的选择。 使用直接缓冲区的话,可以直接发送数据(Netty的零拷贝技术),但是直接缓冲区相对来说分配和释放会更昂贵。 另外,使用直接缓冲区的话,不受JVM垃圾回收的管理,所以需要手工释放,否则会出现内存泄漏问题。 ```java private static void createDirectBuf(){ ByteBuf directBuf = Unpooled.directBuffer(16); directBuf.writeByte(1); directBuf.writeByte(2); } ``` ## 2.3 复合缓冲区 复合缓冲区为多个缓冲区提供了一个聚合视图,我们可以根据需要添加或者移除缓冲区。复合缓冲区适用于需要发送多个不同消息的时候,为了避免多余的复制(创建多个相同的消息头),我们可以选择使用复合缓冲区。 ```java private static void compositeBuf(){ // 创建复合缓冲区 CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); // 创建堆缓冲区 ByteBuf heapBuf = Unpooled.buffer(16); heapBuf.writeByte(1); heapBuf.writeByte(2); heapBuf.writeByte(3); // 创建堆外缓冲区 ByteBuf directBuf = Unpooled.directBuffer(16); directBuf.writeByte(4); directBuf.writeByte(5); directBuf.writeByte(6); // 将堆缓冲区和堆外缓冲区追加到复合缓冲区 compositeByteBuf.addComponents(heapBuf,directBuf); } ``` # 3. ByteBuf释放 ## 3.1 引用计数 首先我们来看这样一句话: **ByteBuf是基于引用计数进行设计的,当一个ByteBuf被声明时,它的引用计数为1,当其引用计数大于0时,表示正在使用,当其引用计数为0时,则会被释放。** 这句话怎么理解呢,ByteBuf是不是都需要手工释放? 从上面的用法可以知道,ByteBuf在实例化时,有可能在JVM堆内存或堆外内存中分配内存。 - **JVM堆内存** 大家都知道,堆内存是由JVM管理,进行内存的分配和回收,例如: ```java User user = new User(); ``` 当实例化一个对象时,简单来说,user变量是存在栈内存中,new User()是在堆内存中划了一块内存,通过赋值符号=进行引用,使user指向堆内存中的对应区域。 所以当`ByteBuf byteBuf = Unpooled.buffer()`实例化一个ByteBuf对象时,实际上是由JVM分配了一块堆内存,例如: ```java public static void handle() { for (int i = 0; i < 3; i++) { ByteBuf byteBuf = Unpooled.buffer(); ... } } ``` 上面代码中,会循环创建3个ByteBuf对象,当某次循环结束时,byteBuf变量会消失,此时ByteBuf对象在堆内存中就已经不被引用了,那么JVM就会按照GC策略,对这个ByteBuf对象占用的内存进行回收。 这就说明,堆内存中创建的ByteBuf对象,是不会根据其本身的引用计数来判断是否释放的,只要失去变量的引用,就会被回收,因为JVM并不会考虑ByteBuf中的引用计数机制。 - **堆外内存** 堆外内存最大的特点是不受到JVM的管理,无法通过JVM的GC策略进行释放。 Netty框架为了提高性能才利用堆外内存进行池化或者非池化的对象创建,这部分内存是由Netty框架进行分配和释放的。Netty在发现某个对象的引用计数为0时,就会将其对应的内存空间释放。例如: ```java public static void handle() { for (int i = 0; i < 3; i++) { ByteBuf byteBuf = Unpooled.directBuffer(); ... } } ``` 上面代码中,每次循环都会在堆外内存中创建一个ByteBuf对象,当某次循环结束时,byteBuf变量消失,但此时ByteBuf对象仍旧在堆外内存中,并且**它的引用计数为1**,所以Netty不会将其释放,并且因为byteBuf变量消失,这块内存不再能被访问到,此时Netty就会报内存泄漏的异常。 所以,我们必须在使用完这个ByteBuf对象后,将其释放。 ```java public static void handle() { for (int i = 0; i < 3; i++) { ByteBuf byteBuf = Unpooled.directBuffer(); ... ReferenceCountUtil.release(byteBuf); } } ``` 但是,仅仅在最后对ByteBuf释放也许不够,比如在中间出现了retain(): ```java public static void handle() { for (int i = 0; i < 3; i++) { ByteBuf byteBuf = Unpooled.directBuffer(); // 此时引用计数为1 byteBuf.retain(); // 此时引用计数为2 ... // 中间未释放过 ReferenceCountUtil.release(byteBuf); // 此时引用计数为1 } } ``` 在上面的代码中,最后经过一次释放,引用计数最终为1,此时仍旧会存在内存泄漏的问题,所以retain()和release()必须成对出现,如下: ```java public static void handle() { for (int i = 0; i < 3; i++) { ByteBuf byteBuf = Unpooled.directBuffer(); // 此时引用计数为1 byteBuf.retain(); // 此时引用计数为2 ... // 中间未释放过 ReferenceCountUtil.release(byteBuf); // 此时引用计数为1 ReferenceCountUtil.release(byteBuf); // 此时引用计数为0 } } ``` **最终结论——最开始那句话指的只是堆外内存的情况。** - **如果ByteBuf创建在堆内存中,是无所谓计数的,只要ByteBuf对象不再被引用,就会在后续GC时被回收掉;当然,释放了也没问题,因为继承的接口和在堆外内存是一样的;** - **如果ByteBuf创建在堆外内存中,那么就由Netty负责回收,按照Netty的规则,必须计数为0才可被回收,必须在Handler链中被释放(下面会介绍手工释放或`SimpleChannelInboundHandler`自动释放),如果ByteBuf对象未被释放,并且失去引用的话,会导致内存泄漏。** ## 3.2 谁负责释放 在Netty里,因为Handler链的存在,ByteBuf经常要传递到下一个Hanlder去而不复还,所以,谁是最后使用者,谁负责释放。 另外,更要注意的是各种异常情况,ByteBuf没有成功传递到下一个Hanlder,还在自己地界里的话,一定要进行释放。 ### 3.2.1 InBound Message 根据上面的谁最后谁负责原则,每个Handler对消息可能有三种处理方式 - 对原消息不做处理,调用 ctx.fireChannelRead(msg)把原消息往下传,那不用做什么释放。 - 将原消息转化为新的消息并调用 ctx.fireChannelRead(newMsg)往下传,那必须把原消息release掉。 - 如果已经不再调用ctx.fireChannelRead(msg)传递任何消息,那更要把原消息release掉。 ### 3.2.2 OutBound Message 要发送的消息由应用所创建,并调用 ctx.writeAndFlush(msg) 进入Handler链。在每个Handler中的处理类似InBound Message,最后消息会来到HeadHandler,再经过一轮复杂的调用,**在flush完成后终将被release掉**。 ### 3.2.3 异常发生时的释放 多层的异常处理机制,有些异常处理的地方不一定准确知道ByteBuf之前释放了没有,可以在释放前加上引用计数大于0的判断避免释放失败;有时候不清楚ByteBuf被引用了多少次,但又必须在此进行彻底的释放,可以循环调用release()直到返回true。 ```java boolean flag = false; while (!flag) { flag = ReferenceCountUtil.release(msg); } ``` # 4. 常见问题 ## 4.1 隐式声明 从下面的代码中可以看到,存在隐式声明ByteBuf对象`(ByteBuf) msg`,此处会使msg引用计数`+1`,但最后并没有释放,而`ChannelInboundHandlerAdapter`并不会自己释放引用,如果后续没有其他Handler进行释放,就会出现内存泄漏问题。 ```java public class SampleServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String sn = ""; try { byte[] buffer = ByteBufUtil.decodeHexDump(ByteBufUtil.hexDump((ByteBuf) msg).replace("7d7b7b", "7b7b") + "7d7d"); ... } catch (Exception e) { } } } ``` 尽可能避免隐式声明ByteBuf。 ```java public class SampleServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String sn = ""; try { // 下面2行代码都需要引用msg的堆外内存,所以必须在这两行执行完后释放 ByteBuf byteBuf = (ByteBuf) msg; // 此时msg的引用计数为1 // 注意,这里不能释放,因为下面hexDump要用到,如果释放,下面会报异常 // io.netty.util.IllegalReferenceCountException: refCnt: 0 String str = ByteBufUtil.hexDump(byteBuf); // 要注意,此时msg的引用计数仍旧为1,因为hexDump没有增加引用计数 // 手工释放byteBuf引用 ReferenceCountUtil.release(byteBuf); str = str.replace("7d7b7b", "7b7b") + "7d7d"; byte[] buffer = ByteBufUtil.decodeHexDump(str); ... } catch (Exception e) { } finally { // 最后尝试手工释放 ReferenceCountUtil.release(msg); } } } ``` ## 4.2 变量重新赋值 看个错误案例: ```java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = null; try { byteBuf = (ByteBuf) msg; // 此时msg的引用计数为1 byteBuf = Unpooled.buffer(); // 此时msg的引用计数为1 } catch (Exception e) { } finally { ReferenceCountUtil.release(byteBuf); // 此时,msg未被释放,引用计数为1,发生内存泄漏 } } ``` 从上面的代码可以看到,byteBuf变量开始指向了msg,后来又指向了一个堆内存中创建的对象,最后释放的是堆内存中创建的对象,而不是msg,所以就会出问题。 这里修复的方式是释放msg: ```java ReferenceCountUtil.release(msg); ``` 所以,需要记住一个问题,必须对着那个对象进行释放,而不是对着那个变量。 当然,msg本身其实也是一个变量,也有可能被修改,一旦msg被重新赋值,就没有任何变量指向这一块堆外内存了,也就会出现内存泄漏。而且这种问题在开发和编译时可能很难发现,所以我们比较安全的做法是注意以下2点: - 尽量避免对msg进行重新赋值,如果有额外的参数需要传递,可以选择一些并发缓存进行存储,或者记得把msg释放掉; - 在对msg进行处理时,声明一个final变量,然后前置所有解析逻辑,并且用完即释放,如下: ```java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { // final修饰 final ByteBuf byteBuf = (ByteBuf) msg; // 此时msg的引用计数为1 // 所有解析逻辑几种到此处,可以声明一些变量存储解析结果,例如报文命令字 // 释放引用 ReferenceCountUtil.release(byteBuf); // 此时msg的引用计数为0 // 此时如果执行byteBuf = Unpooled.buff(),会因为final关键字直接报错 } catch (Exception e) { } finally { // 如果是pipeline最后一个Handler,则释放msg boolean flag = false; while (!flag) { flag = ReferenceCountUtil.release(msg); } } } ``` ## 4.3 错误的释放 虽然堆内创建的对象由JVM管理,无需手工释放,但是当进行手工释放后,其中的内容会被清空,如果后续继续使用该对象,就会报异常,如下: ```java public static void main(String[] args) { try { ByteBuf byteBuf = Unpooled.buffer(); byteBuf.writeBytes("123".getBytes()); ReferenceCountUtil.release(byteBuf); String str = ByteBufUtil.hexDump(byteBuf); } catch (Exception e) { e.printStackTrace(); } } ``` 会抛出如下异常: ```java io.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454) at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383) ... ``` 可以监控下byteBuf变量,释放前:  释放后,array为空数组:  所以,对于堆内ByteBuf对象,一般就建议不作释放,避免在参数传递过程中出错。 ## 4.4 ChannelInboundHandlerAdapter **Netty默认设置中,所有IO线程都是使用池化的堆外内存读写数据。** > Netty设置堆外内存和池化,详见 **5.附录** > 在默认配置下,ChannelInboundHandlerAdapter中传入的`Object msg`是在堆外内存池中创建的,务必要在整个pipeline最后的Handler中尝试进行释放,否则可能导致内存溢出。 ```java public class SafeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ... } catch (Exception e) { } finally { ReferenceCountUtil.release(msg); } } } ``` 如果担心忘记释放msg的引用计数,在声明Handler时,可以继承SimpleChannelInboundHandler而不是ChannelInboundHandlerAdapter。 从下面SimpleChannelInboundHandler的源码看到,finally中调用了释放的逻辑,但是有一点需要注意,如果在整个链路上,其他Handler中的retain()和release()必须成对出现,否则SimpleChannelInboundHandler中的也无法完全释放msg。 ```java public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (this.acceptInboundMessage(msg)) { this.channelRead0(ctx, msg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (this.autoRelease && release) { ReferenceCountUtil.release(msg); } } } ``` ## 4.5 内存泄漏排查 ### 4.5.1 内存泄漏监测 当应用日志中出现了内存泄漏问题时,关键字:LEAK ```bash 2022-09-24 20:11:59.079 [nioEventLoopGroup-2-8] ERROR io.netty.util.ResourceLeakDetector [171] - LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information. ``` 一种方式是在启动时添加参数进行设置: ```bash -Dio.netty.leakDetection.level=paranoid ``` 另一种方式可以在代码中添加内存泄漏检测功能进行测试。 ```java ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); ``` 添加到如下位置: ```java public class SafeServer extends NettyBaseServer { @Autowired public NettyConfig nettyConfig; @PostConstruct public void start() throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new SafeChannelInitializer()); ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); ChannelFuture future = serverBootstrap.bind(nettyConfig.prepaymentSafePort).sync(); future.addListener(new NettyServerListener(nettyConfig.prepaymentSafePort)); channel = future.channel(); if (future.isSuccess()) { log.info("启动 SafeServer"); } } } ``` 其中`ResourceLeakDetector.Level`共有4个级别: - `DISABLED` - 完全禁用; - `SIMPLE` - 抽样 1% 的缓冲区是否有泄漏。默认; - `ADVANCED` - 抽样 1% 的缓冲区是否泄漏,以及能定位到缓冲区泄漏的代码位置; - `PARANOID` - 全量检测的缓冲区是否泄漏,会导致性能严重下降,所以只能用于测试,可以定位到缓冲区泄漏的代码位置。 在启用`ADVANCED`或`PARANOID`检测后,可以看到如下日志,并根据日志定位具体内存泄漏位置。 这里可以看出是com.acrel.adapter.netty.master.prepayment.safeprotocol.SafeServerHandler中的ByteBufUtil.hexDump()中出现了内存泄漏。 ```java 2023-09-14 15:50:00.372 [ERROR][lettuce-nioEventLoop-29-1][io.netty.util.ResourceLeakDetector:319] LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: #1: io.netty.buffer.AdvancedLeakAwareByteBuf.getUnsignedByte(AdvancedLeakAwareByteBuf.java:160) io.netty.buffer.ByteBufUtil$HexUtil.hexDump(ByteBufUtil.java:1373) io.netty.buffer.ByteBufUtil$HexUtil.access$000(ByteBufUtil.java:1297) io.netty.buffer.ByteBufUtil.hexDump(ByteBufUtil.java:143) io.netty.buffer.ByteBufUtil.hexDump(ByteBufUtil.java:135) com.acrel.adapter.netty.master.prepayment.safeprotocol.SafeServerHandler.channelRead(SafeServerHandler.java:135) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ... ``` 检查代码,发现存在隐式声明的引用,修改后问题解决。 ```java public class SafeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String sn = ""; try { // 此处出现了隐式声明 ByteBufUtil.hexDump((ByteBuf) msg) byte[] buffer = ByteBufUtil.decodeHexDump(ByteBufUtil.hexDump((ByteBuf) msg).replace("7d7b7b", "7b7b") + "7d7d"); ... } catch (Exception e) { } finally { ReferenceCountUtil.release(msg); } } } ``` ### 4.5.2 堆外内存大小 如果需要读取堆外内存的具体大小,可以通过如下代码获取: ```java long directMemory = PlatformDependent.usedDirectMemory(); ``` ### 4.5.3 Btrace Btrace 是一款通过字节码检测 Java 程序的排障神器,它可以获取程序在运行过程中的一切信息,与 AOP 的使用方式类似。我们可以通过如下方式追踪 DirectByteBuffer 的堆外内存申请的源头: ```java @BTrace public class TraceDirectAlloc { @OnMethod(clazz = "java.nio.Bits", method = "reserveMemory") public static void printThreadStack() { jstack(); } } ``` # 5. 附录 ## 5.1 Netty设置堆外内存和池化 - 方法1:启动参数设置 堆外内存 ```bash # 禁用堆外内存 -Dio.netty.noPreferDirect=true; ``` 池化 ```bash # 池化,默认配置 -Dio.netty.allocator.type=pooled # 禁用池化 -Dio.netty.allocator.type=unpooled ``` - 方法2:代码 堆外内存,UnpooledByteBufAllocator构造参数传入false ```java // 禁用池化 ServerBootstrap serverBootStrap = new ServerBootstrap(); UnpooledByteBufAllocator unpooledByteBufAllocator = new UnpooledByteBufAllocator(false); serverBootStrap.childOption(ChannelOption.ALLOCATOR, unpooledByteBufAllocator) ``` 池化 ```java ServerBootstrap serverBootStrap = new ServerBootstrap(); // 池化,默认配置 serverBootStrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 禁用池化 serverBootStrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT) ```
admin
2024年2月22日 08:59
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码