【Netty】【从原理到项目落地】

管理员
# Netty深度实战:从原理到项目落地 ## 目录 1. [Netty入门:为什么选择Netty](#1-netty入门为什么选择netty) 2. [Netty核心架构总览](#2-netty核心架构总览) 3. [核心组件一:ByteBuf深度解析](#3-核心组件一bytebuf深度解析) 4. [核心组件二:Channel详解](#4-核心组件二channel详解) 5. [核心组件三:ChannelHandler与ChannelPipeline](#5-核心组件三channelhandler与channelpipeline) 6. [核心组件四:EventLoop详解](#6-核心组件四eventloop详解) 7. [核心组件五:Bootstrap启动类](#7-核心组件五bootstrap启动类) 8. [项目实战一:Echo服务器](#8-项目实战一echo服务器) 9. [项目实战二:HTTP服务器](#9-项目实战二http服务器) 10. [项目实战三:RPC框架实现](#10-项目实战三rpc框架实现) 11. [项目实战四:WebSocket聊天室](#11-项目实战四websocket聊天室) 12. [Netty性能优化](#12-netty性能优化) 13. [总结与最佳实践](#13-总结与最佳实践) --- ## 1. Netty入门:为什么选择Netty ### 1.1 传统IO的痛点 ```java /** * 传统BIO(阻塞IO)服务器 * 问题:每个连接都需要一个线程,无法处理高并发 */ public class TraditionalBIOServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8080); System.out.println("服务器启动,监听端口:8080"); while (true) { // 阻塞等待客户端连接 Socket socket = serverSocket.accept(); System.out.println("客户端连接:" + socket.getRemoteSocketAddress()); // 为每个连接创建一个线程 new Thread(() -> { try { InputStream input = socket.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(input)); String line; while ((line = reader.readLine()) != null) { System.out.println("收到消息:" + line); socket.getOutputStream().write(("Echo: " + line + "\n").getBytes()); } } catch (IOException e) { e.printStackTrace(); } }).start(); } } } ``` **传统BIO的问题:** - 每个连接需要独立线程,资源消耗大 - 线程切换开销高,性能差 - 无法处理大量并发连接(C10K问题) ### 1.2 NIO的改进 ```java /** * Java NIO服务器 * 改进:单线程处理多个连接 * 问题:API复杂,开发难度大 */ public class TraditionalNIOServer { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.bind(new InetSocketAddress(8080)); serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("NIO服务器启动,监听端口:8080"); while (true) { // 阻塞等待事件 selector.select(); Set selectedKeys = selector.selectedKeys(); Iterator iter = selectedKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { handleAccept(serverChannel, selector); } else if (key.isReadable()) { handleRead(key); } } } } private static void handleAccept(ServerSocketChannel serverChannel, Selector selector) throws IOException { SocketChannel clientChannel = serverChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); System.out.println("客户端连接:" + clientChannel.getRemoteSocketAddress()); } private static void handleRead(SelectionKey key) throws IOException { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = clientChannel.read(buffer); if (read > 0) { buffer.flip(); byte[] data = new byte[buffer.remaining()]; buffer.get(data); System.out.println("收到消息:" + new String(data)); clientChannel.write(ByteBuffer.wrap(("Echo: " + new String(data)).getBytes())); } else if (read == -1) { clientChannel.close(); } } } ``` **NIO的问题:** - API复杂,学习成本高 - 需要处理Selector、Channel、Buffer等复杂概念 - Epoll Bug(空轮询导致CPU 100%) - 开发效率低 ### 1.3 Netty的优势 ```java /** * Netty服务器 * 特点:API简洁,性能优越 */ public class NettyServer { public static void main(String[] args) throws InterruptedException { // 创建事件循环组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 接收连接 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理IO try { // 创建服务器启动器 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new EchoServerHandler()); } }); // 绑定端口并启动 ChannelFuture future = bootstrap.bind(8080).sync(); System.out.println("Netty服务器启动,监听端口:8080"); // 等待服务器关闭 future.channel().closeFuture().sync(); } finally { // 优雅关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * Echo处理器 */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf) msg; System.out.println("收到消息:" + buf.toString(CharsetUtil.UTF_8)); // 回显消息 ctx.write(buf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } ``` **Netty的优势:** - **API简洁**:开发效率高,代码简洁 - **性能优越**:零拷贝、内存池、高效线程模型 - **功能强大**:内置编解码器、HTTP/HTTPS、WebSocket等 - **稳定性好**:解决了NIO的Epoll Bug - **社区活跃**:广泛应用于大型项目(Dubbo、RocketMQ、gRPC等) ### 1.4 Netty应用场景 ```java /** * Netty应用场景 */ public class NettyUseCases { /** * 1. RPC框架 * - Dubbo * - gRPC * - Spring Cloud */ /** * 2. 消息中间件 * - RocketMQ * - ActiveMQ */ /** * 3. 大数据 * - Hadoop * - Spark * - Flink */ /** * 4. 游戏服务器 * - 实时对战 * - 聊天系统 */ /** * 5. 网关服务 * - API网关 * - 负载均衡 */ /** * 6. 即时通讯 * - 微信 * - 钉钉 */ } ``` --- ## 2. Netty核心架构总览 ### 2.1 Netty架构层次 ``` ┌─────────────────────────────────────────────────────┐ │ 应用层 │ │ (HTTP/WebSocket/RPC/自定义协议) │ └─────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────┐ │ ChannelHandler │ │ (编解码器/业务处理器/心跳检测) │ └─────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────┐ │ ChannelPipeline │ │ (Handler链式调用容器) │ └─────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────┐ │ Channel & EventLoop │ │ (网络操作/事件循环) │ └─────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────┐ │ Transport │ │ (NIO/OIO/Epoll/Kqueue) │ └─────────────────────────────────────────────────────┘ ``` ### 2.2 核心组件关系 ```java /** * Netty核心组件关系图 */ public class NettyComponentRelationship { /* * Bootstrap(启动器) * ↓ * EventLoopGroup(事件循环组) * ↓ * EventLoop(事件循环) * ↓ * Channel(通道) * ↓ * ChannelPipeline(管道) * ↓ * ChannelHandler(处理器) * ↓ * ByteBuf(字节缓冲) */ } ``` ### 2.3 数据流转过程 ``` 入站数据流(Inbound): 网络数据 → Channel → ChannelPipeline → ChannelHandler(入站)→ 业务处理 出站数据流(Outbound): 业务处理 → ChannelHandler(出站)→ ChannelPipeline → Channel → 网络发送 ``` --- ## 3. 核心组件一:ByteBuf深度解析 ### 3.1 ByteBuf vs NIO ByteBuffer ```java /** * NIO ByteBuffer的问题 */ public class NIOByteBufferDemo { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(1024); // 写入数据 buffer.put("Hello Netty".getBytes()); // 问题1:需要手动flip切换读写模式 buffer.flip(); // 读取数据 byte[] data = new byte[buffer.remaining()]; buffer.get(data); System.out.println(new String(data)); // 问题2:再次读取需要重新flip buffer.rewind(); // 问题3:固定容量,无法动态扩容 // 问题4:API复杂,使用不便 // 问题5:没有引用计数,无法实现零拷贝 } } ``` ```java /** * Netty ByteBuf的优势 */ public class NettyByteBufDemo { public static void main(String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024); // 优势1:自动扩容 buffer.writeBytes("Hello Netty".getBytes()); // 优势2:读写索引分离,无需手动flip System.out.println("读取索引:" + buffer.readerIndex()); System.out.println("写入索引:" + buffer.writerIndex()); // 读取数据 byte[] data = new byte[buffer.readableBytes()]; buffer.readBytes(data); System.out.println(new String(data)); // 优势3:支持引用计数,实现零拷贝 ByteBuf slice = buffer.slice(); // 优势4:API简洁易用 buffer.writeLong(100L); buffer.writeInt(200); // 优势5:支持池化和非池化 ByteBuf pooledBuffer = ByteBufAllocator.DEFAULT.buffer(1024); ByteBuf unpooledBuffer = Unpooled.buffer(1024); // 优势6:多种缓冲区类型 ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer(1024); // 堆外内存 ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(1024); // 堆内内存 // 优势7:支持复合缓冲区 ByteBuf compositeBuffer = Unpooled.wrappedBuffer(buffer, slice); } } ``` ### 3.2 ByteBuf核心结构 ```java /** * ByteBuf核心结构 * * ByteBuf由三个索引组成: * readerIndex: 读索引 * writerIndex: 写索引 * capacity: 容量 * * 内存布局: * +-------------------+------------------+------------------+ * | discardable bytes | readable bytes | writable bytes | * | (已读) | (可读) | (可写) | * +-------------------+------------------+------------------+ * 0 readerIndex writerIndex capacity */ public class ByteBufStructure { public static void main(String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16); System.out.println("初始状态:"); System.out.println("capacity = " + buffer.capacity()); System.out.println("readerIndex = " + buffer.readerIndex()); System.out.println("writerIndex = " + buffer.writerIndex()); System.out.println("readableBytes = " + buffer.readableBytes()); System.out.println("writableBytes = " + buffer.writableBytes()); // 写入数据 buffer.writeBytes("Hello".getBytes()); System.out.println("\n写入Hello后:"); System.out.println("readerIndex = " + buffer.readerIndex()); System.out.println("writerIndex = " + buffer.writerIndex()); System.out.println("readableBytes = " + buffer.readableBytes()); System.out.println("writableBytes = " + buffer.writableBytes()); // 读取数据 byte[] data = new byte[2]; buffer.readBytes(data); System.out.println("\n读取2字节后:"); System.out.println("readerIndex = " + buffer.readerIndex()); System.out.println("writerIndex = " + buffer.writerIndex()); System.out.println("readableBytes = " + buffer.readableBytes()); System.out.println("writableBytes = " + buffer.writableBytes()); // 丢弃已读数据 buffer.discardReadBytes(); System.out.println("\n丢弃已读数据后:"); System.out.println("readerIndex = " + buffer.readerIndex()); System.out.println("writerIndex = " + buffer.writerIndex()); System.out.println("readableBytes = " + buffer.readableBytes()); System.out.println("writableBytes = " + buffer.writableBytes()); // 清空缓冲区 buffer.clear(); System.out.println("\n清空缓冲区后:"); System.out.println("readerIndex = " + buffer.readerIndex()); System.out.println("writerIndex = " + buffer.writerIndex()); System.out.println("readableBytes = " + buffer.readableBytes()); System.out.println("writableBytes = " + buffer.writableBytes()); } } ``` ### 3.3 ByteBuf的类型 ```java /** * ByteBuf类型详解 */ public class ByteBufTypes { public static void main(String[] args) { // 1. 堆缓冲区(Heap Buffer) // - 数据存储在JVM堆中 // - 分配和释放速度快 // - 适合小数据量 ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(1024); System.out.println("堆缓冲区:" + heapBuffer); System.out.println("是否有数组:" + heapBuffer.hasArray()); System.out.println("数组地址:" + heapBuffer.array()); // 2. 直接缓冲区(Direct Buffer) // - 数据存储在堆外内存 // - 零拷贝,减少数据拷贝 // - 适合大数据量 ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer(1024); System.out.println("\n直接缓冲区:" + directBuffer); System.out.println("是否有数组:" + directBuffer.hasArray()); // 3. 复合缓冲区(Composite Buffer) // - 多个ByteBuf的逻辑组合 // - 实现零拷贝 // - 不实际拷贝数据 ByteBuf buf1 = Unpooled.copiedBuffer("Hello".getBytes()); ByteBuf buf2 = Unpooled.copiedBuffer(" World".getBytes()); ByteBuf compositeBuffer = Unpooled.wrappedBuffer(buf1, buf2); System.out.println("\n复合缓冲区:" + compositeBuffer); System.out.println("内容:" + compositeBuffer.toString(CharsetUtil.UTF_8)); // 4. 只读缓冲区(Read-only Buffer) ByteBuf readOnlyBuffer = buf1.asReadOnly(); // readOnlyBuffer.writeByte(1); // 抛出异常 // 5. 切片缓冲区(Slice Buffer) ByteBuf sliceBuffer = buf1.slice(0, 2); // 截取前两个字节 System.out.println("\n切片缓冲区:" + sliceBuffer); System.out.println("内容:" + sliceBuffer.toString(CharsetUtil.UTF_8)); // 释放缓冲区 ReferenceCountUtil.release(heapBuffer); ReferenceCountUtil.release(directBuffer); ReferenceCountUtil.release(buf1); ReferenceCountUtil.release(buf2); ReferenceCountUtil.release(compositeBuffer); } } ``` ### 3.4 ByteBuf引用计数 ```java /** * ByteBuf引用计数机制 * * 引用计数用于跟踪ByteBuf的引用次数,实现零拷贝和自动内存管理 */ public class ByteBufReferenceCount { public static void main(String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024); System.out.println("初始引用计数:" + buffer.refCnt()); // 1 // retain:增加引用计数 buffer.retain(); System.out.println("retain后引用计数:" + buffer.refCnt()); // 2 buffer.retain(); System.out.println("再次retain后引用计数:" + buffer.refCnt()); // 3 // release:减少引用计数 buffer.release(); System.out.println("release后引用计数:" + buffer.refCnt()); // 2 // slice:增加原缓冲区的引用计数 ByteBuf slice = buffer.slice(); System.out.println("slice后原缓冲区引用计数:" + buffer.refCnt()); // 3 // duplicate:增加原缓冲区的引用计数 ByteBuf duplicate = buffer.duplicate(); System.out.println("duplicate后原缓冲区引用计数:" + buffer.refCnt()); // 4 // 释放所有引用 ReferenceCountUtil.release(buffer); ReferenceCountUtil.release(slice); ReferenceCountUtil.release(duplicate); System.out.println("释放所有引用后引用计数:" + buffer.refCnt()); // 0 } } ``` ### 3.5 ByteBuf池化与非池化 ```java /** * ByteBuf池化与非池化 */ public class ByteBufPooling { public static void main(String[] args) { // 池化ByteBuf(默认) // - 优点:重用内存,减少GC压力 // - 缺点:需要手动释放 System.setProperty("io.netty.allocator.type", "pooled"); ByteBuf pooledBuffer = ByteBufAllocator.DEFAULT.buffer(1024); System.out.println("池化缓冲区:" + pooledBuffer); ReferenceCountUtil.release(pooledBuffer); // 非池化ByteBuf // - 优点:使用简单,无需释放 // - 缺点:每次创建新对象,GC压力大 System.setProperty("io.netty.allocator.type", "unpooled"); ByteBuf unpooledBuffer = Unpooled.buffer(1024); System.out.println("非池化缓冲区:" + unpooledBuffer); } } ``` ### 3.6 ByteBuf工具类 ```java /** * ByteBuf工具类使用 */ public class ByteBufUtils { public static void main(String[] args) { // 1. 创建ByteBuf ByteBuf buffer1 = Unpooled.buffer(1024); ByteBuf buffer2 = Unpooled.directBuffer(1024); ByteBuf buffer3 = Unpooled.wrappedBuffer("Hello".getBytes()); // 2. 转换为ByteBuf ByteBuf buffer4 = Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8); // 3. 检查是否释放 if (ReferenceCountUtil.release(buffer1)) { System.out.println("buffer1已释放"); } // 4. 保留引用 ByteBuf buffer5 = buffer4.retainedSlice(); // 5. 安全释放 ReferenceCountUtil.safeRelease(buffer2); ReferenceCountUtil.safeRelease(buffer3); ReferenceCountUtil.safeRelease(buffer4); ReferenceCountUtil.safeRelease(buffer5); } } ``` --- ## 4. 核心组件二:Channel详解 ### 4.1 Channel是什么 ```java /** * Channel概念 * * Channel是Netty对网络连接的抽象,代表一个网络连接 * * Channel功能: * 1. 读写数据 * 2. 连接管理 * 3. 事件通知 */ public class ChannelConcept { /* * Channel类型: * * NioSocketChannel: NIO客户端Socket连接 * NioServerSocketChannel: NIO服务端Socket连接 * OioSocketChannel: OIO客户端Socket连接 * OioServerSocketChannel: OIO服务端Socket连接 * NioDatagramChannel: NIO UDP连接 * * LocalChannel: 本地传输 * EmbeddedChannel: 嵌入式通道(测试用) */ } ``` ### 4.2 Channel核心方法 ```java /** * Channel核心方法 */ public class ChannelMethods { public static void main(String[] args) { // 获取Channel(示例,实际需要从Context获取) Channel channel = null; // 1. 写数据(异步) if (channel != null) { ChannelFuture future = channel.writeAndFlush("Hello Netty"); // 添加监听器 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { System.out.println("数据发送成功"); } else { System.out.println("数据发送失败:" + future.cause()); } } }); // 等待完成(阻塞) try { future.await(); // 可选:等待操作完成 } catch (InterruptedException e) { e.printStackTrace(); } // 2. 获取配置 ChannelConfig config = channel.config(); // 3. 获取属性 AttributeKey key = AttributeKey.valueOf("userId"); String userId = channel.attr(key).get(); // 4. 获取ChannelPipeline ChannelPipeline pipeline = channel.pipeline(); // 5. 获取EventLoop EventLoop eventLoop = channel.eventLoop(); // 6. 获取远程地址 InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); // 7. 获取本地地址 InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress(); // 8. 判断状态 boolean isActive = channel.isActive(); boolean isOpen = channel.isOpen(); boolean isWritable = channel.isWritable(); // 9. 关闭Channel ChannelFuture closeFuture = channel.close(); // 10. 注册EventLoop channel.eventLoop().register(channel); } } } ``` ### 4.3 Channel生命周期 ```java /** * Channel生命周期 */ public class ChannelLifecycle { /* * Channel生命周期事件: * * channelRegistered: Channel注册到EventLoop * channelUnregistered: Channel从EventLoop注销 * channelActive: Channel准备就绪(连接建立) * channelInactive: Channel不再活跃(连接关闭) * channelRead: 读取到数据 * channelReadComplete: 数据读取完成 * channelWritabilityChanged: 可写状态改变 * exceptionCaught: 发生异常 */ } ``` ```java /** * Channel生命周期处理器 */ @Sharable public class ChannelLifecycleHandler extends ChannelInboundHandlerAdapter { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("Channel注册到EventLoop:" + ctx.channel()); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("Channel从EventLoop注销:" + ctx.channel()); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Channel准备就绪:" + ctx.channel()); // 连接建立后可以发送数据 ctx.writeAndFlush("Welcome to Netty Server\n"); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("Channel不再活跃:" + ctx.channel()); super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("读取到数据:" + msg); super.channelRead(ctx, msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("数据读取完成"); super.channelReadComplete(ctx); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("可写状态改变,isWritable = " + channel.isWritable()); super.channelWritabilityChanged(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("发生异常:" + cause); ctx.close(); } } ``` ### 4.4 Channel配置 ```java /** * Channel配置 */ public class ChannelConfigDemo { public static void main(String[] args) { // 获取ChannelConfig(示例) ChannelConfig config = null; if (config != null) { // 1. 配置TCP参数 config.setOption(ChannelOption.SO_KEEPALIVE, true); // 保持连接 config.setOption(ChannelOption.TCP_NODELAY, true); // 禁用Nagle算法 config.setOption(ChannelOption.SO_BACKLOG, 128); // 连接队列大小 config.setOption(ChannelOption.SO_REUSEADDR, true); // 地址重用 // 2. 配置缓冲区 config.setOption(ChannelOption.SO_RCVBUF, 32 * 1024); // 接收缓冲区 config.setOption(ChannelOption.SO_SNDBUF, 32 * 1024); // 发送缓冲区 // 3. 配置超时 config.setOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); // 连接超时 // 4. 配置写缓冲区高低水位 config.setWriteBufferHighWaterMark(64 * 1024); // 高水位 config.setWriteBufferLowWaterMark(32 * 1024); // 低水位 // 5. 配置自动读取 config.setAutoRead(true); // 自动读取数据 // 6. 配置最大消息大小 config.setOption(ChannelOption.MAX_MESSAGES_PER_READ, 16); } } } ``` ### 4.5 Channel属性 ```java /** * Channel属性 */ public class ChannelAttributes { // 定义属性Key private static final AttributeKey USER_ID = AttributeKey.valueOf("userId"); private static final AttributeKey CONNECT_TIME = AttributeKey.valueOf("connectTime"); public static void setAttributes(Channel channel) { // 设置属性 channel.attr(USER_ID).set("user123"); channel.attr(CONNECT_TIME).set(System.currentTimeMillis()); // 获取属性 String userId = channel.attr(USER_ID).get(); Long connectTime = channel.attr(CONNECT_TIME).get(); System.out.println("用户ID:" + userId); System.out.println("连接时间:" + new Date(connectTime)); // 移除属性 channel.attr(USER_ID).remove(); } } ``` --- ## 5. 核心组件三:ChannelHandler与ChannelPipeline ### 5.1 ChannelHandler是什么 ```java /** * ChannelHandler概念 * * ChannelHandler是Netty处理IO事件的接口 * * ChannelHandler类型: * 1. ChannelInboundHandler:入站处理器(处理读取到的数据) * 2. ChannelOutboundHandler:出站处理器(处理要发送的数据) * 3. ChannelDuplexHandler:双向处理器(同时处理入站和出站) */ public class ChannelHandlerConcept { /* * 数据流转: * * 入站(Inbound): * 网络数据 → Channel → ChannelPipeline → ChannelInboundHandler → 业务处理 * * 出站(Outbound): * 业务处理 → ChannelOutboundHandler → ChannelPipeline → Channel → 网络发送 */ } ``` ### 5.2 ChannelInboundHandler ```java /** * 入站处理器示例 */ @Sharable public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("【Inbound】Channel注册"); super.channelRegistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("【Inbound】连接建立"); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("【Inbound】读取数据:" + msg); // 处理数据 ByteBuf buf = (ByteBuf) msg; String message = buf.toString(CharsetUtil.UTF_8); // 继续传递给下一个Handler super.channelRead(ctx, message); // 释放资源 ReferenceCountUtil.release(buf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("【Inbound】读取完成"); super.channelReadComplete(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("【Inbound】连接关闭"); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("【Inbound】发生异常:" + cause); ctx.close(); } } ``` ### 5.3 ChannelOutboundHandler ```java /** * 出站处理器示例 */ public class MyOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { System.out.println("【Outbound】绑定地址:" + localAddress); super.bind(ctx, localAddress, promise); } @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { System.out.println("【Outbound】连接远程地址:" + remoteAddress); super.connect(ctx, remoteAddress, localAddress, promise); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("【Outbound】写入数据:" + msg); // 转换数据 String str = (String) msg; ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes(str.getBytes(CharsetUtil.UTF_8)); // 继续传递给下一个Handler super.write(ctx, buf, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { System.out.println("【Outbound】刷新缓冲区"); super.flush(ctx); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("【Outbound】关闭连接"); super.close(ctx, promise); } } ``` ### 5.4 ChannelDuplexHandler ```java /** * 双向处理器示例 */ @Sharable public class MyDuplexHandler extends ChannelDuplexHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("【Duplex-Inbound】读取数据:" + msg); // 处理入站数据 ByteBuf buf = (ByteBuf) msg; String message = buf.toString(CharsetUtil.UTF_8); // 处理完数据后,回写 ctx.writeAndFlush("Echo: " + message); super.channelRead(ctx, msg); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("【Duplex-Outbound】写入数据:" + msg); super.write(ctx, msg, promise); } } ``` ### 5.5 ChannelPipeline详解 ```java /** * ChannelPipeline概念 * * ChannelPipeline是ChannelHandler的容器,管理Handler链 * * Pipeline特点: * 1. 链式结构:Handler按添加顺序组成链 * 2. 双向流动:入站和出站数据流方向相反 * 3. 线程安全:Pipeline是线程安全的 */ public class ChannelPipelineConcept { /* * Pipeline结构: * * ┌─────────────────┐ * │ HeadContext │ * └────────┬────────┘ * ↓ * ┌─────────────────┐ * │ Handler1 │ * └────────┬────────┘ * ↓ * ┌─────────────────┐ * │ Handler2 │ * └────────┬────────┘ * ↓ * ┌─────────────────┐ * │ Handler3 │ * └────────┬────────┘ * ↓ * ┌─────────────────┐ * │ TailContext │ * └─────────────────┘ * * 入站数据流:Head → Handler1 → Handler2 → Handler3 → Tail * 出站数据流:Tail → Handler3 → Handler2 → Handler1 → Head */ } ``` ### 5.6 Pipeline操作 ```java /** * Pipeline操作示例 */ public class ChannelPipelineOperations { public static void addHandlers(ChannelPipeline pipeline) { // 1. 添加Handler到最后 pipeline.addLast("handler1", new MyInboundHandler()); pipeline.addLast("handler2", new MyOutboundHandler()); pipeline.addLast("handler3", new MyDuplexHandler()); // 2. 添加Handler到指定位置 pipeline.addFirst("firstHandler", new MyInboundHandler()); pipeline.addBefore("handler2", "beforeHandler2", new MyInboundHandler()); pipeline.addAfter("handler1", "afterHandler1", new MyOutboundHandler()); // 3. 获取Handler ChannelHandler handler1 = pipeline.get("handler1"); MyInboundHandler inboundHandler = pipeline.get(MyInboundHandler.class); // 4. 替换Handler pipeline.replace("handler1", "newHandler1", new MyInboundHandler()); // 5. 移除Handler pipeline.remove("handler2"); pipeline.remove(MyOutboundHandler.class); // 6. 获取Pipeline中的Handler列表 Map handlers = pipeline.toMap(); System.out.println("Pipeline中的Handler:" + handlers); } } ``` ### 5.7 ChannelHandlerContext ```java /** * ChannelHandlerContext详解 * * ChannelHandlerContext是Handler的上下文,保存Handler的关联信息 */ public class ChannelHandlerContextDemo { /* * ChannelHandlerContext功能: * 1. 关联Channel和Pipeline * 2. 提供操作Channel和Pipeline的方法 * 3. 提供Attribute存储自定义数据 * 4. 提供事件传递机制 */ } ``` ```java /** * 使用ChannelHandlerContext的Handler */ @Sharable public class ContextHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 1. 获取Channel Channel channel = ctx.channel(); // 2. 获取Pipeline ChannelPipeline pipeline = ctx.pipeline(); // 3. 获取EventLoop EventLoop eventLoop = ctx.executor(); // 4. 获取Allocator ByteBufAllocator allocator = ctx.alloc(); // 5. 存储属性 AttributeKey key = AttributeKey.valueOf("handlerData"); ctx.attr(key).set("data"); // 6. 向后传递事件 ctx.fireChannelRead(msg); // 7. 写数据(出站) ctx.writeAndFlush("Response"); // 8. 执行任务 eventLoop.execute(() -> { System.out.println("在EventLoop中执行任务"); }); } } ``` --- ## 6. 核心组件四:EventLoop详解 ### 6.1 EventLoop是什么 ```java /** * EventLoop概念 * * EventLoop是Netty的核心线程模型,负责处理IO事件和任务 * * EventLoop特点: * 1. 一个EventLoop绑定一个Thread * 2. 一个EventLoop可以处理多个Channel * 3. 一个Channel的生命周期由一个EventLoop负责 * 4. 保证线程安全,无需同步 */ public class EventLoopConcept { /* * EventLoopGroup: * - EventLoopGroup是EventLoop的组 * - BossGroup:负责接收连接 * - WorkerGroup:负责处理IO * * EventLoop线程模型: * ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ * │ EventLoop1 │ │ EventLoop2 │ │ EventLoop3 │ * │ (Thread1) │ │ (Thread2) │ │ (Thread3) │ * └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ * │ │ │ * ┌───┴───┐ ┌───┴───┐ ┌───┴───┐ * │ Ch1 │ │ Ch3 │ │ Ch5 │ * │ Ch2 │ │ Ch4 │ │ Ch6 │ * └───────┘ └───────┘ └───────┘ */ } ``` ### 6.2 EventLoop工作原理 ```java /** * EventLoop工作原理 * * EventLoop无限循环: * 1. 检查是否有IO事件(Select) * 2. 处理IO事件(Process) * 3. 执行任务队列(Run Tasks) */ public class EventLoopWorkflow { /* * EventLoop工作流程: * * while (!threadShutdown) { * // 1. 检查IO事件 * int readyKeys = selector.select(); * * // 2. 处理IO事件 * processSelectedKeys(readyKeys); * * // 3. 执行任务队列 * runAllTasks(); * } */ } ``` ### 6.3 EventLoop任务执行 ```java /** * EventLoop任务执行示例 */ public class EventLoopTaskExecution { public static void executeTasks(Channel channel) { EventLoop eventLoop = channel.eventLoop(); // 1. 立即执行任务 eventLoop.execute(() -> { System.out.println("立即执行任务,线程:" + Thread.currentThread().getName()); }); // 2. 延迟执行任务 ScheduledFuture future1 = eventLoop.schedule(() -> { System.out.println("延迟执行任务,线程:" + Thread.currentThread().getName()); }, 1, TimeUnit.SECONDS); // 3. 定期执行任务 ScheduledFuture future2 = eventLoop.scheduleAtFixedRate(() -> { System.out.println("定期执行任务,线程:" + Thread.currentThread().getName()); }, 0, 1, TimeUnit.SECONDS); // 4. 取消任务 future1.cancel(false); future2.cancel(false); // 5. 判断是否在EventLoop线程中 boolean inEventLoop = eventLoop.inEventLoop(); System.out.println("是否在EventLoop线程中:" + inEventLoop); // 6. 提交任务并等待结果 Future future3 = eventLoop.submit(() -> { return "任务结果"; }); try { String result = future3.get(); System.out.println("任务结果:" + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } ``` ### 6.4 EventLoop线程安全 ```java /** * EventLoop线程安全示例 */ @Sharable public class EventLoopThreadSafeHandler extends ChannelInboundHandlerAdapter { private final AtomicInteger counter = new AtomicInteger(0); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // EventLoop保证单线程执行,无需同步 int count = counter.incrementAndGet(); System.out.println("处理次数:" + count); // 模拟耗时操作 Thread.sleep(100); // 继续传递 super.channelRead(ctx, msg); } /** * 安全的Channel操作 */ public void safeChannelWrite(Channel channel, String message) { // 判断是否在EventLoop线程中 if (channel.eventLoop().inEventLoop()) { // 在EventLoop线程中,直接执行 channel.writeAndFlush(message); } else { // 不在EventLoop线程中,提交任务 channel.eventLoop().execute(() -> { channel.writeAndFlush(message); }); } } } ``` ### 6.5 EventLoopGroup配置 ```java /** * EventLoopGroup配置 */ public class EventLoopGroupConfig { public static void main(String[] args) { // 1. 默认配置(CPU核心数 * 2) EventLoopGroup group1 = new NioEventLoopGroup(); // 2. 指定线程数 EventLoopGroup group2 = new NioEventLoopGroup(4); // 3. 使用自定义线程工厂 EventLoopGroup group3 = new NioEventLoopGroup(4, new ThreadFactory() { private AtomicInteger counter = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "netty-eventloop-" + counter.incrementAndGet()); } }); // 4. 使用默认线程工厂 EventLoopGroup group4 = new NioEventLoopGroup(0, new DefaultThreadFactory("netty-worker")); // 5. 关闭EventLoopGroup group1.shutdownGracefully(); group2.shutdownGracefully(); group3.shutdownGracefully(); group4.shutdownGracefully(); } } ``` --- ## 7. 核心组件五:Bootstrap启动类 ### 7.1 Bootstrap是什么 ```java /** * Bootstrap概念 * * Bootstrap是Netty的启动类,用于配置和启动服务器或客户端 * * Bootstrap类型: * 1. ServerBootstrap:启动服务器 * 2. Bootstrap:启动客户端 */ public class BootstrapConcept { /* * Bootstrap配置步骤: * * 1. 配置EventLoopGroup * 2. 配置Channel类型 * 3. 配置ChannelHandler * 4. 配置ChannelOption * 5. 配置ChannelAttr * 6. 绑定端口/连接服务器 */ } ``` ### 7.2 ServerBootstrap ```java /** * ServerBootstrap完整配置 */ public class ServerBootstrapDemo { public static void main(String[] args) throws InterruptedException { // 1. 创建EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 接收连接 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理IO try { // 2. 创建ServerBootstrap ServerBootstrap bootstrap = new ServerBootstrap(); // 3. 配置Bootstrap bootstrap.group(bossGroup, workerGroup) // 配置Channel类型 .channel(NioServerSocketChannel.class) // 配置Handler .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 添加编解码器 pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); // 添加业务Handler pipeline.addLast(new EchoServerHandler()); } }) // 配置ChannelOption .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) // 配置ChannelAttr .attr(AttributeKey.newInstance("serverName"), "NettyServer"); // 4. 绑定端口并启动 ChannelFuture future = bootstrap.bind(8080).sync(); System.out.println("服务器启动成功,监听端口:8080"); // 5. 等待关闭 future.channel().closeFuture().sync(); } finally { // 6. 优雅关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } ``` ### 7.3 Bootstrap ```java /** * Bootstrap完整配置 */ public class ClientBootstrapDemo { public static void main(String[] args) throws InterruptedException { // 1. 创建EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); try { // 2. 创建Bootstrap Bootstrap bootstrap = new Bootstrap(); // 3. 配置Bootstrap bootstrap.group(group) // 配置Channel类型 .channel(NioSocketChannel.class) // 配置Handler .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 添加编解码器 pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); // 添加业务Handler pipeline.addLast(new ClientHandler()); } }) // 配置ChannelOption .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); // 4. 连接服务器 ChannelFuture connectFuture = bootstrap.connect("localhost", 8080).sync(); System.out.println("连接服务器成功"); // 5. 发送消息 Channel channel = connectFuture.channel(); channel.writeAndFlush("Hello Server"); // 6. 等待关闭 channel.closeFuture().sync(); } finally { // 7. 优雅关闭 group.shutdownGracefully(); } } } ``` ### 7.4 ChannelFuture ```java /** * ChannelFuture详解 * * ChannelFuture代表异步操作的结果 */ public class ChannelFutureDemo { public static void listenToChannelFuture(Channel channel) { // 1. 写入数据 ChannelFuture writeFuture = channel.writeAndFlush("Hello"); // 2. 添加监听器(异步) writeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { System.out.println("数据发送成功"); } else { System.out.println("数据发送失败:" + future.cause()); } } }); // 3. 简写监听器 writeFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); // 4. 等待完成(阻塞) try { writeFuture.await(); if (writeFuture.isSuccess()) { System.out.println("数据发送成功"); } } catch (InterruptedException e) { e.printStackTrace(); } // 5. 获取Channel Channel resultChannel = writeFuture.channel(); } } ``` ### 7.5 Bootstrap最佳实践 ```java /** * Bootstrap最佳实践 */ public class BootstrapBestPractices { /** * 1. 合理配置EventLoopGroup线程数 */ public static void configEventLoopGroup() { // BossGroup:1个线程即可 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // WorkerGroup:根据业务类型配置 // CPU密集型:CPU核心数 // IO密集型:CPU核心数 * 2 int workerThreads = Runtime.getRuntime().availableProcessors() * 2; EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads); } /** * 2. 合理配置ChannelOption */ public static void configChannelOption(ServerBootstrap bootstrap) { // SO_BACKLOG:连接队列大小 bootstrap.option(ChannelOption.SO_BACKLOG, 128); // SO_KEEPALIVE:保持连接 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // TCP_NODELAY:禁用Nagle算法,提高实时性 bootstrap.childOption(ChannelOption.TCP_NODELAY, true); // SO_RCVBUF/SO_SNDBUF:接收/发送缓冲区 bootstrap.childOption(ChannelOption.SO_RCVBUF, 32 * 1024); bootstrap.childOption(ChannelOption.SO_SNDBUF, 32 * 1024); // WRITE_BUFFER_WATER_MARK:写缓冲区水位 bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 64 * 1024)); } /** * 3. 合理添加Handler */ public static void addHandlers(ChannelPipeline pipeline) { // 按顺序添加:编解码器 → 协议处理器 → 业务处理器 // 1. 编解码器 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024, 0, 4)); pipeline.addLast("stringDecoder", new StringDecoder()); pipeline.addLast("stringEncoder", new StringEncoder()); // 2. 协议处理器 pipeline.addLast("idleStateHandler", new IdleStateHandler(30, 0, 0)); pipeline.addLast("heartbeatHandler", new HeartbeatHandler()); // 3. 业务处理器 pipeline.addLast("businessHandler", new BusinessHandler()); } /** * 4. 优雅关闭 */ public static void gracefulShutdown(EventLoopGroup group) { // shutdownGracefully()会平滑关闭 group.shutdownGracefully(); // 或者等待关闭完成 try { group.shutdownGracefully().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` --- ## 8. 项目实战一:Echo服务器 ### 8.1 项目概述 实现一个简单的Echo服务器,客户端发送什么消息,服务器就返回什么消息。 ### 8.2 服务器实现 ```java /** * Echo服务器 */ public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 添加EchoHandler pipeline.addLast(new EchoServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("Echo服务器启动,监听端口:" + port); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new EchoServer(8080).start(); } } /** * Echo服务器Handler */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器收到:" + buf.toString(CharsetUtil.UTF_8)); // 回显消息 ctx.write(buf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } ``` ### 8.3 客户端实现 ```java /** * Echo客户端 */ public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new EchoClientHandler()); } }); Channel channel = bootstrap.connect(host, port).sync().channel(); System.out.println("Echo客户端连接成功"); // 发送消息 for (int i = 0; i < 10; i++) { String message = "Hello " + i; channel.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); System.out.println("客户端发送:" + message); Thread.sleep(1000); } channel.closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new EchoClient("localhost", 8080).start(); } } /** * Echo客户端Handler */ @Sharable public class EchoClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("客户端收到:" + msg.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } ``` --- ## 9. 项目实战二:HTTP服务器 ### 9.1 项目概述 实现一个HTTP服务器,处理HTTP请求并返回响应。 ### 9.2 HTTP服务器实现 ```java /** * HTTP服务器 */ public class HttpServer { private final int port; public HttpServer(int port) { this.port = port; } public void start() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // HTTP编解码器 pipeline.addLast(new HttpServerCodec()); // HTTP对象聚合器 pipeline.addLast(new HttpObjectAggregator(65536)); // HTTP压缩 pipeline.addLast(new HttpContentCompressor()); // HTTP业务处理器 pipeline.addLast(new HttpServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("HTTP服务器启动,监听端口:" + port); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new HttpServer(8080).start(); } } /** * HTTP服务器Handler */ @Sharable public class HttpServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { // 获取请求信息 String uri = request.uri(); String method = request.method().name(); HttpHeaders headers = request.headers(); System.out.println("收到请求:" + method + " " + uri); System.out.println("请求头:" + headers); // 构建响应 String content = "Hello Netty HTTP Server\n" + "Method: " + method + "\n" + "URI: " + uri + "\n" + "Time: " + new Date(); FullHttpResponse response = new DefaultFullHttpResponse( HTTP_1_1, OK, Unpooled.copiedBuffer(content, CharsetUtil.UTF_8) ); // 设置响应头 response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); response.headers().set(CONTENT_LENGTH, response.content().readableBytes()); response.headers().set(CONNECTION, CLOSE); // 发送响应 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } ``` ### 9.3 静态文件服务器 ```java /** * 静态文件服务器 */ @Sharable public class StaticFileServerHandler extends SimpleChannelInboundHandler { private final String basePath; public StaticFileServerHandler(String basePath) { this.basePath = basePath; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { // 只处理GET请求 if (!GET.equals(request.method())) { sendError(ctx, METHOD_NOT_ALLOWED); return; } final String uri = request.uri(); final String path = sanitizeUri(uri); if (path == null) { sendError(ctx, FORBIDDEN); return; } File file = new File(basePath, path); if (file.isHidden() || !file.exists()) { sendError(ctx, NOT_FOUND); return; } if (!file.isFile()) { sendError(ctx, FORBIDDEN); return; } RandomAccessFile raf; try { raf = new RandomAccessFile(file, "r"); } catch (FileNotFoundException ignore) { sendError(ctx, NOT_FOUND); return; } long fileLength = raf.length(); HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); HttpUtil.setContentLength(response, fileLength); setContentTypeHeader(response, file); if (HttpUtil.isKeepAlive(request)) { response.headers().set(CONNECTION, KEEP_ALIVE); } ctx.write(response); ChannelFuture sendFileFuture; ChannelFuture lastContentFuture; if (ctx.pipeline().get(SslHandler.class) == null) { sendFileFuture = ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); } else { sendFileFuture = null; lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, fileLength)), ctx.newProgressivePromise()); } sendFileFuture.addListener(new ChannelProgressiveFutureListener() { @Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) { if (total < 0) { System.err.println(future.channel() + " Transfer progress: " + progress); } else { System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total); } } @Override public void operationComplete(ChannelProgressiveFuture future) { System.err.println(future.channel() + " Transfer complete."); } }); if (!HttpUtil.isKeepAlive(request)) { lastContentFuture.addListener(ChannelFutureListener.CLOSE); } } private String sanitizeUri(String uri) { try { uri = URLDecoder.decode(uri, "UTF-8"); } catch (UnsupportedEncodingException e) { try { uri = URLDecoder.decode(uri, ISO_8859_1.toString()); } catch (UnsupportedEncodingException e1) { throw new Error(); } } if (uri.isEmpty() || uri.charAt(0) != '/') { return null; } uri = uri.replace('/', File.separatorChar); if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri.charAt(0) == '.' || uri.charAt(uri.length() - 1) == '.') { return null; } return uri; } private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse( HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private void setContentTypeHeader(HttpResponse response, File file) { MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap(); response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath())); } } ``` --- ## 10. 项目实战三:RPC框架实现 ### 10.1 RPC框架架构 ``` ┌─────────────┐ ┌─────────────┐ │ 客户端 │ │ 服务端 │ │ │ │ │ │ RPC接口 │ ──── 1.方法调用 ────→ │ RPC接口 │ │ │ │ │ │ 动态代理 │ │ 服务注册 │ │ │ ←──── 2.返回结果 ───── │ │ │ Netty客户端 │ │ Netty服务端 │ └─────────────┘ └─────────────┘ ``` ### 10.2 RPC协议定义 ```java /** * RPC请求 */ @Data public class RpcRequest implements Serializable { private String requestId; // 请求ID private String interfaceName; // 接口名称 private String methodName; // 方法名称 private Class[] paramTypes; // 参数类型 private Object[] params; // 参数值 } /** * RPC响应 */ @Data public class RpcResponse implements Serializable { private String requestId; // 请求ID private Object result; // 结果 private Throwable error; // 异常 } ``` ### 10.3 RPC编解码器 ```java /** * RPC消息编码器 */ public class RpcEncoder extends MessageToByteEncoder { private final Class genericClass; public RpcEncoder(Class genericClass) { this.genericClass = genericClass; } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if (genericClass.isInstance(msg)) { byte[] data = SerializationUtils.serialize(msg); out.writeInt(data.length); out.writeBytes(data); } } } /** * RPC消息解码器 */ public class RpcDecoder extends ByteToMessageDecoder { private final Class genericClass; public RpcDecoder(Class genericClass) { this.genericClass = genericClass; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtils.deserialize(data, genericClass); out.add(obj); } } ``` ### 10.4 RPC服务端 ```java /** * RPC服务端 */ public class RpcServer { private final int port; private final Map serviceMap = new ConcurrentHashMap<>(); public RpcServer(int port) { this.port = port; } /** * 注册服务 */ public void registerService(String interfaceName, Object service) { serviceMap.put(interfaceName, service); System.out.println("注册服务:" + interfaceName); } /** * 启动服务 */ public void start() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 编解码器 pipeline.addLast(new RpcEncoder(RpcResponse.class)); pipeline.addLast(new RpcDecoder(RpcRequest.class)); // RPC处理器 pipeline.addLast(new RpcServerHandler(serviceMap)); } }); ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("RPC服务端启动,监听端口:" + port); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * RPC服务端Handler */ @Sharable public class RpcServerHandler extends SimpleChannelInboundHandler { private final Map serviceMap; public RpcServerHandler(Map serviceMap) { this.serviceMap = serviceMap; } @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception { RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { // 获取服务 Object service = serviceMap.get(request.getInterfaceName()); if (service == null) { throw new RuntimeException("服务不存在:" + request.getInterfaceName()); } // 反射调用方法 Method method = service.getClass().getMethod(request.getMethodName(), request.getParamTypes()); Object result = method.invoke(service, request.getParams()); response.setResult(result); } catch (Exception e) { response.setError(e); System.err.println("RPC调用失败:" + e); } // 发送响应 ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } ``` ### 10.5 RPC客户端 ```java /** * RPC客户端 */ public class RpcClient { private final String host; private final int port; private EventLoopGroup group; private Channel channel; public RpcClient(String host, int port) { this.host = host; this.port = port; } /** * 连接服务端 */ public void connect() throws InterruptedException { group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 编解码器 pipeline.addLast(new RpcEncoder(RpcRequest.class)); pipeline.addLast(new RpcDecoder(RpcResponse.class)); // RPC处理器 pipeline.addLast(new RpcClientHandler()); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); channel = future.channel(); System.out.println("RPC客户端连接成功:" + host + ":" + port); } /** * 发送请求并获取响应 */ public Object send(RpcRequest request) throws Exception { RpcClientHandler handler = (RpcClientHandler) channel.pipeline().last(); return handler.sendRequest(request); } /** * 关闭连接 */ public void close() { if (channel != null) { channel.close(); } if (group != null) { group.shutdownGracefully(); } } } /** * RPC客户端Handler */ @Sharable public class RpcClientHandler extends SimpleChannelInboundHandler { private final ConcurrentHashMap futureMap = new ConcurrentHashMap<>(); @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { String requestId = response.getRequestId(); RpcFuture future = futureMap.get(requestId); if (future != null) { future.done(response); } } /** * 发送请求 */ public Object sendRequest(RpcRequest request) throws Exception { RpcFuture future = new RpcFuture(); futureMap.put(request.getRequestId(), future); channel.writeAndFlush(request); return future.get(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } /** * RPC Future */ public class RpcFuture { private volatile Object result; private final CountDownLatch latch = new CountDownLatch(1); public void done(RpcResponse response) { this.result = response.getResult() != null ? response.getResult() : response.getError(); latch.countDown(); } public Object get() throws Exception { latch.await(30, TimeUnit.SECONDS); if (result instanceof Throwable) { throw (Exception) result; } return result; } } ``` ### 10.6 RPC动态代理 ```java /** * RPC动态代理工厂 */ public class RpcProxyFactory { /** * 创建代理对象 */ @SuppressWarnings("unchecked") public static T create(Class interfaceClass, String host, int port) { return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new RpcInvocationHandler(host, port) ); } } /** * RPC调用处理器 */ public class RpcInvocationHandler implements InvocationHandler { private final RpcClient client; public RpcInvocationHandler(String host, int port) throws InterruptedException { this.client = new RpcClient(host, port); client.connect(); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 构建请求 RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParamTypes(method.getParameterTypes()); request.setParams(args); // 发送请求并获取响应 return client.send(request); } } ``` ### 10.7 RPC使用示例 ```java /** * RPC服务接口 */ public interface UserService { String getUsername(Long userId); User getUser(Long userId); } /** * RPC服务实现 */ public class UserServiceImpl implements UserService { @Override public String getUsername(Long userId) { return "User" + userId; } @Override public User getUser(Long userId) { User user = new User(); user.setId(userId); user.setName("User" + userId); user.setAge(20); return user; } } /** * 服务端启动 */ public class RpcServerMain { public static void main(String[] args) throws InterruptedException { RpcServer server = new RpcServer(8080); // 注册服务 server.registerService(UserService.class.getName(), new UserServiceImpl()); // 启动服务 server.start(); } } /** * 客户端调用 */ public class RpcClientMain { public static void main(String[] args) throws Exception { // 创建代理对象 UserService userService = RpcProxyFactory.create(UserService.class, "localhost", 8080); // 调用远程方法 String username = userService.getUsername(1L); System.out.println("用户名:" + username); User user = userService.getUser(1L); System.out.println("用户信息:" + user); } } ``` --- ## 11. 项目实战四:WebSocket聊天室 ### 11.1 聊天室架构 ``` ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ 客户端1 │ │ 客户端2 │ │ 客户端3 │ │ 客户端4 │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ └───────────────┼───────────────┼───────────────┘ │ │ ┌───┴───────────────┴───┐ │ WebSocket服务器 │ └───────────────────────┘ ``` ### 11.2 消息协议 ```java /** * 聊天消息 */ @Data public class ChatMessage { private String type; // 消息类型:login/logout/chat private String username; // 用户名 private String content; // 消息内容 private long timestamp; // 时间戳 } ``` ### 11.3 WebSocket服务器 ```java /** * WebSocket聊天服务器 */ public class ChatServer { private final int port; public ChatServer(int port) { this.port = port; } public void start() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // HTTP编解码器 pipeline.addLast(new HttpServerCodec()); // HTTP对象聚合器 pipeline.addLast(new HttpObjectAggregator(65536)); // WebSocket处理器 pipeline.addLast(new WebSocketServerProtocolHandler("/chat")); // 聊天业务处理器 pipeline.addLast(new ChatServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("WebSocket聊天服务器启动,监听端口:" + port); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new ChatServer(8080).start(); } } /** * 聊天服务器Handler */ @Sharable public class ChatServerHandler extends SimpleChannelInboundHandler { // 存储所有在线用户的Channel private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { String text = msg.text(); System.out.println("收到消息:" + text); // 解析消息 ChatMessage chatMessage = JSON.parseObject(text, ChatMessage.class); chatMessage.setTimestamp(System.currentTimeMillis()); // 广播消息给所有在线用户 ChannelGroup channelGroup = channels.find(ctx.channel().id()); for (Channel channel : channelGroup) { channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(chatMessage))); } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("用户加入:" + channel); // 加入到ChannelGroup channels.add(channel); // 发送欢迎消息 ChatMessage welcomeMessage = new ChatMessage(); welcomeMessage.setType("system"); welcomeMessage.setContent("欢迎加入聊天室!"); welcomeMessage.setTimestamp(System.currentTimeMillis()); channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(welcomeMessage))); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("用户离开:" + channel); // 从ChannelGroup移除 channels.remove(channel); // 广播用户离开消息 ChatMessage leaveMessage = new ChatMessage(); leaveMessage.setType("system"); leaveMessage.setContent("用户离开聊天室"); leaveMessage.setTimestamp(System.currentTimeMillis()); channels.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(leaveMessage))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } ``` ### 11.4 HTML客户端 ```html WebSocket聊天室

WebSocket聊天室

``` --- ## 12. Netty性能优化 ### 12.1 内存优化 ```java /** * 内存优化配置 */ public class MemoryOptimization { /** * 1. 使用池化ByteBuf */ public static void pooledByteBuf() { System.setProperty("io.netty.allocator.type", "pooled"); ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024); ReferenceCountUtil.release(buffer); } /** * 2. 使用直接缓冲区 */ public static void directBuffer() { ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(1024); ReferenceCountUtil.release(buffer); } /** * 3. 合理设置缓冲区大小 */ public static void bufferSize() { // 根据业务场景设置 int bufferSize = 16 * 1024; // 16KB ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(bufferSize); ReferenceCountUtil.release(buffer); } /** * 4. 及时释放ByteBuf */ public static void releaseByteBuf() { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024); try { // 使用buffer } finally { ReferenceCountUtil.release(buffer); } } } ``` ### 12.2 线程优化 ```java /** * 线程优化配置 */ public class ThreadOptimization { /** * 1. 合理配置EventLoopGroup线程数 */ public static void configEventLoopGroup() { // BossGroup:1个线程即可 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // WorkerGroup:根据业务类型配置 int cpuCores = Runtime.getRuntime().availableProcessors(); EventLoopGroup workerGroup = new NioEventLoopGroup(cpuCores * 2); } /** * 2. 使用Epoll(Linux) */ public static void useEpoll() { if (Epoll.isAvailable()) { EventLoopGroup bossGroup = new EpollEventLoopGroup(1); EventLoopGroup workerGroup = new EpollEventLoopGroup(); // Epoll性能优于NIO } } /** * 3. 避免阻塞EventLoop线程 */ public static void avoidBlocking() { // 不要在Handler中进行阻塞操作 // 使用单独的线程池处理耗时任务 EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(16); // 在Pipeline中使用 pipeline.addLast(businessGroup, "businessHandler", new BusinessHandler()); } } ``` ### 12.3 网络优化 ```java /** * 网络优化配置 */ public class NetworkOptimization { /** * 1. 配置TCP参数 */ public static void configTcpParams(ServerBootstrap bootstrap) { // 禁用Nagle算法,提高实时性 bootstrap.childOption(ChannelOption.TCP_NODELAY, true); // 保持连接 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 地址重用 bootstrap.option(ChannelOption.SO_REUSEADDR, true); // 连接队列大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); } /** * 2. 配置缓冲区 */ public static void configBuffer(ServerBootstrap bootstrap) { // 接收缓冲区 bootstrap.childOption(ChannelOption.SO_RCVBUF, 32 * 1024); // 发送缓冲区 bootstrap.childOption(ChannelOption.SO_SNDBUF, 32 * 1024); // 写缓冲区水位 bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 64 * 1024)); } /** * 3. 配置心跳 */ public static void configHeartbeat(ChannelPipeline pipeline) { // 30秒读空闲,0秒写空闲,0秒读写空闲 pipeline.addLast("idleStateHandler", new IdleStateHandler(30, 0, 0)); } } ``` ### 12.4 编解码优化 ```java /** * 编解码优化 */ public class CodecOptimization { /** * 1. 使用高效的编解码器 */ public static void efficientCodec() { // 使用LengthFieldBasedFrameDecoder处理粘包拆包 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024, 0, 4)); // 使用StringDecoder简化字符串处理 pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8)); } /** * 2. 使用压缩 */ public static void useCompression(ChannelPipeline pipeline) { // HTTP压缩 pipeline.addLast("httpCompressor", new HttpContentCompressor()); pipeline.addLast("httpDecompressor", new HttpContentDecompressor()); } /** * 3. 批量处理 */ public static void batchProcessing() { // 使用FixedLengthFrameDecoder批量处理 pipeline.addLast("fixedLengthFrameDecoder", new FixedLengthFrameDecoder(1024)); } } ``` --- ## 13. 总结与最佳实践 ### 13.1 核心要点总结 1. **ByteBuf** - 读写索引分离,无需手动flip - 支持引用计数,实现零拷贝 - 支持池化和非池化 - 合理选择堆缓冲区和直接缓冲区 2. **Channel** - 代表网络连接 - 异步IO操作 - 生命周期事件 - 线程安全 3. **ChannelHandler** - 入站处理器:ChannelInboundHandler - 出站处理器:ChannelOutboundHandler - 双向处理器:ChannelDuplexHandler - 线程安全 4. **ChannelPipeline** - Handler链式容器 - 双向数据流 - 动态添加/删除Handler 5. **EventLoop** - 单线程事件循环 - 一个EventLoop处理多个Channel - 任务执行机制 - 线程安全保证 6. **Bootstrap** - 服务器启动:ServerBootstrap - 客户端启动:Bootstrap - 简化配置 - 优雅关闭 ### 13.2 常见问题与解决方案 ```java /** * 常见问题与解决方案 */ public class CommonProblems { /** * 问题1:内存泄漏 * 解决方案: * - 及时释放ByteBuf * - 使用引用计数 * - 使用工具检测泄漏 */ /** * 问题2:线程阻塞 * 解决方案: * - 避免在Handler中阻塞 * - 使用独立线程池 * - 异步处理耗时任务 */ /** * 问题3:粘包拆包 * 解决方案: * - 使用LengthFieldBasedFrameDecoder * - 使用DelimiterBasedFrameDecoder * - 使用FixedLengthFrameDecoder */ /** * 问题4:连接数限制 * 解决方案: * - 调整系统参数 * - 使用EventLoopGroup * - 合理配置SO_BACKLOG */ /** * 问题5:性能瓶颈 * 解决方案: * - 使用Epoll(Linux) * - 使用零拷贝 * - 使用直接缓冲区 * - 批量处理 */ } ``` ### 13.3 最佳实践 1. **资源管理** - 及时释放ByteBuf - 使用try-finally保证资源释放 - 使用ReferenceCountUtil 2. **异常处理** - 在exceptionCaught中处理异常 - 记录日志 - 关闭Channel 3. **性能优化** - 使用池化ByteBuf - 使用直接缓冲区 - 避免阻塞EventLoop - 使用Epoll(Linux) 4. **编码规范** - Handler标注@Sharable - 合理命名Handler - 添加日志 - 单元测试 5. **监控告警** - 监控连接数 - 监控流量 - 监控异常 - 监控性能指标 --- ## 结语 通过本文的学习,你应该已经掌握了: 1. **Netty核心原理**:ByteBuf、Channel、ChannelHandler、ChannelPipeline、EventLoop、Bootstrap 2. **实战项目**:Echo服务器、HTTP服务器、RPC框架、WebSocket聊天室 3. **性能优化**:内存优化、线程优化、网络优化、编解码优化 4. **最佳实践**:资源管理、异常处理、性能优化、编码规范、监控告警 Netty是一个强大的网络框架,广泛应用于各种网络应用。继续深入学习,你会发现更多精彩的应用场景! 建议: 1. 深入学习Netty源码 2. 实践更多项目 3. 学习相关技术(NIO、TCP/IP) 4. 关注Netty社区动态 ``` --- ## 12. Netty性能优化 ### 12.1 内存优化 ```java /** * 内存优化配置 */ public class MemoryOptimization { /** * 1. 使用池化ByteBuf */ public static void pooledByteBuf() { System.setProperty("io.netty.allocator.type", "pooled"); ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024); ReferenceCountUtil.release(buffer); } /** * 2. 使用直接缓冲区 */ public static void directBuffer() { ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(1024); ReferenceCountUtil.release(buffer); } /** * 3. 合理设置缓冲区大小 */ public static void bufferSize() { // 根据业务场景设置 int bufferSize = 16 * 1024; // 16KB ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(bufferSize); ReferenceCountUtil.release(buffer); } /** * 4. 及时释放ByteBuf */ public static void releaseByteBuf() { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024); try { // 使用buffer } finally { ReferenceCountUtil.release(buffer); } } } ``` ### 12.2 线程优化 ```java /** * 线程优化配置 */ public class ThreadOptimization { /** * 1. 合理配置EventLoopGroup线程数 */ public static void configEventLoopGroup() { // BossGroup:1个线程即可 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // WorkerGroup:根据业务类型配置 int cpuCores = Runtime.getRuntime().availableProcessors(); EventLoopGroup workerGroup = new NioEventLoopGroup(cpuCores * 2); } /** * 2. 使用Epoll(Linux) */ public static void useEpoll() { if (Epoll.isAvailable()) { EventLoopGroup bossGroup = new EpollEventLoopGroup(1); EventLoopGroup workerGroup = new EpollEventLoopGroup(); // Epoll性能优于NIO } } /** * 3. 避免阻塞EventLoop线程 */ public static void avoidBlocking() { // 不要在Handler中进行阻塞操作 // 使用单独的线程池处理耗时任务 EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(16); // 在Pipeline中使用 pipeline.addLast(businessGroup, "businessHandler", new BusinessHandler()); } } ``` ### 12.3 网络优化 ```java /** * 网络优化配置 */ public class NetworkOptimization { /** * 1. 配置TCP参数 */ public static void configTcpParams(ServerBootstrap bootstrap) { // 禁用Nagle算法,提高实时性 bootstrap.childOption(ChannelOption.TCP_NODELAY, true); // 保持连接 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 地址重用 bootstrap.option(ChannelOption.SO_REUSEADDR, true); // 连接队列大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); } /** * 2. 配置缓冲区 */ public static void configBuffer(ServerBootstrap bootstrap) { // 接收缓冲区 bootstrap.childOption(ChannelOption.SO_RCVBUF, 32 * 1024); // 发送缓冲区 bootstrap.childOption(ChannelOption.SO_SNDBUF, 32 * 1024); // 写缓冲区水位 bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 64 * 1024)); } /** * 3. 配置心跳 */ public static void configHeartbeat(ChannelPipeline pipeline) { // 30秒读空闲,0秒写空闲,0秒读写空闲 pipeline.addLast("idleStateHandler", new IdleStateHandler(30, 0, 0)); } } ``` ### 12.4 编解码优化 ```java /** * 编解码优化 */ public class CodecOptimization { /** * 1. 使用高效的编解码器 */ public static void efficientCodec() { // 使用LengthFieldBasedFrameDecoder处理粘包拆包 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024, 0, 4)); // 使用StringDecoder简化字符串处理 pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8)); } /** * 2. 使用压缩 */ public static void useCompression(ChannelPipeline pipeline) { // HTTP压缩 pipeline.addLast("httpCompressor", new HttpContentCompressor()); pipeline.addLast("httpDecompressor", new HttpContentDecompressor()); } /** * 3. 批量处理 */ public static void batchProcessing() { // 使用FixedLengthFrameDecoder批量处理 pipeline.addLast("fixedLengthFrameDecoder", new FixedLengthFrameDecoder(1024)); } } ``` --- ## 13. 总结与最佳实践 ### 13.1 核心要点总结 1. **ByteBuf** - 读写索引分离,无需手动flip - 支持引用计数,实现零拷贝 - 支持池化和非池化 - 合理选择堆缓冲区和直接缓冲区 2. **Channel** - 代表网络连接 - 异步IO操作 - 生命周期事件 - 线程安全 3. **ChannelHandler** - 入站处理器:ChannelInboundHandler - 出站处理器:ChannelOutboundHandler - 双向处理器:ChannelDuplexHandler - 线程安全 4. **ChannelPipeline** - Handler链式容器 - 双向数据流 - 动态添加/删除Handler 5. **EventLoop** - 单线程事件循环 - 一个EventLoop处理多个Channel - 任务执行机制 - 线程安全保证 6. **Bootstrap** - 服务器启动:ServerBootstrap - 客户端启动:Bootstrap - 简化配置 - 优雅关闭 ### 13.2 常见问题与解决方案 ```java /** * 常见问题与解决方案 */ public class CommonProblems { /** * 问题1:内存泄漏 * 解决方案: * - 及时释放ByteBuf * - 使用引用计数 * - 使用工具检测泄漏 */ /** * 问题2:线程阻塞 * 解决方案: * - 避免在Handler中阻塞 * - 使用独立线程池 * - 异步处理耗时任务 */ /** * 问题3:粘包拆包 * 解决方案: * - 使用LengthFieldBasedFrameDecoder * - 使用DelimiterBasedFrameDecoder * - 使用FixedLengthFrameDecoder */ /** * 问题4:连接数限制 * 解决方案: * - 调整系统参数 * - 使用EventLoopGroup * - 合理配置SO_BACKLOG */ /** * 问题5:性能瓶颈 * 解决方案: * - 使用Epoll(Linux) * - 使用零拷贝 * - 使用直接缓冲区 * - 批量处理 */ } ``` ### 13.3 最佳实践 1. **资源管理** - 及时释放ByteBuf - 使用try-finally保证资源释放 - 使用ReferenceCountUtil 2. **异常处理** - 在exceptionCaught中处理异常 - 记录日志 - 关闭Channel 3. **性能优化** - 使用池化ByteBuf - 使用直接缓冲区 - 避免阻塞EventLoop - 使用Epoll(Linux) 4. **编码规范** - Handler标注@Sharable - 合理命名Handler - 添加日志 - 单元测试 5. **监控告警** - 监控连接数 - 监控流量 - 监控异常 - 监控性能指标 --- ## 结语 通过本文的学习,你应该已经掌握了: 1. **Netty核心原理**:ByteBuf、Channel、ChannelHandler、ChannelPipeline、EventLoop、Bootstrap 2. **实战项目**:Echo服务器、HTTP服务器、RPC框架、WebSocket聊天室 3. **性能优化**:内存优化、线程优化、网络优化、编解码优化 4. **最佳实践**:资源管理、异常处理、性能优化、编码规范、监控告警 Netty是一个强大的网络框架,广泛应用于各种网络应用。继续深入学习,你会发现更多精彩的应用场景! 建议: 1. 深入学习Netty源码 2. 实践更多项目 3. 学习相关技术(NIO、TCP/IP) 4. 关注Netty社区动态
评论 0

发表评论 取消回复

Shift+Enter 换行  ·  Enter 发送
还没有评论,来发表第一条吧