【Netty】【从原理到项目落地】
# Netty深度实战:从原理到项目落地
## 目录
1. [Netty入门:为什么选择Netty](#1-netty入门为什么选择netty)
2. [Netty核心架构总览](#2-netty核心架构总览)
3. [核心组件一:ByteBuf深度解析](#3-核心组件一bytebuf深度解析)
4. [核心组件二:Channel详解](#4-核心组件二channel详解)
5. [核心组件三:ChannelHandler与ChannelPipeline](#5-核心组件三channelhandler与channelpipeline)
6. [核心组件四:EventLoop详解](#6-核心组件四eventloop详解)
7. [核心组件五:Bootstrap启动类](#7-核心组件五bootstrap启动类)
8. [项目实战一:Echo服务器](#8-项目实战一echo服务器)
9. [项目实战二:HTTP服务器](#9-项目实战二http服务器)
10. [项目实战三:RPC框架实现](#10-项目实战三rpc框架实现)
11. [项目实战四:WebSocket聊天室](#11-项目实战四websocket聊天室)
12. [Netty性能优化](#12-netty性能优化)
13. [总结与最佳实践](#13-总结与最佳实践)
---
## 1. Netty入门:为什么选择Netty
### 1.1 传统IO的痛点
```java
/**
* 传统BIO(阻塞IO)服务器
* 问题:每个连接都需要一个线程,无法处理高并发
*/
public class TraditionalBIOServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8080);
System.out.println("服务器启动,监听端口:8080");
while (true) {
// 阻塞等待客户端连接
Socket socket = serverSocket.accept();
System.out.println("客户端连接:" + socket.getRemoteSocketAddress());
// 为每个连接创建一个线程
new Thread(() -> {
try {
InputStream input = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
String line;
while ((line = reader.readLine()) != null) {
System.out.println("收到消息:" + line);
socket.getOutputStream().write(("Echo: " + line + "\n").getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
}
```
**传统BIO的问题:**
- 每个连接需要独立线程,资源消耗大
- 线程切换开销高,性能差
- 无法处理大量并发连接(C10K问题)
### 1.2 NIO的改进
```java
/**
* Java NIO服务器
* 改进:单线程处理多个连接
* 问题:API复杂,开发难度大
*/
public class TraditionalNIOServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(8080));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO服务器启动,监听端口:8080");
while (true) {
// 阻塞等待事件
selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
handleAccept(serverChannel, selector);
} else if (key.isReadable()) {
handleRead(key);
}
}
}
}
private static void handleAccept(ServerSocketChannel serverChannel, Selector selector)
throws IOException {
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接:" + clientChannel.getRemoteSocketAddress());
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = clientChannel.read(buffer);
if (read > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println("收到消息:" + new String(data));
clientChannel.write(ByteBuffer.wrap(("Echo: " + new String(data)).getBytes()));
} else if (read == -1) {
clientChannel.close();
}
}
}
```
**NIO的问题:**
- API复杂,学习成本高
- 需要处理Selector、Channel、Buffer等复杂概念
- Epoll Bug(空轮询导致CPU 100%)
- 开发效率低
### 1.3 Netty的优势
```java
/**
* Netty服务器
* 特点:API简洁,性能优越
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 创建事件循环组
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 接收连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理IO
try {
// 创建服务器启动器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new EchoServerHandler());
}
});
// 绑定端口并启动
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("Netty服务器启动,监听端口:8080");
// 等待服务器关闭
future.channel().closeFuture().sync();
} finally {
// 优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* Echo处理器
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到消息:" + buf.toString(CharsetUtil.UTF_8));
// 回显消息
ctx.write(buf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
```
**Netty的优势:**
- **API简洁**:开发效率高,代码简洁
- **性能优越**:零拷贝、内存池、高效线程模型
- **功能强大**:内置编解码器、HTTP/HTTPS、WebSocket等
- **稳定性好**:解决了NIO的Epoll Bug
- **社区活跃**:广泛应用于大型项目(Dubbo、RocketMQ、gRPC等)
### 1.4 Netty应用场景
```java
/**
* Netty应用场景
*/
public class NettyUseCases {
/**
* 1. RPC框架
* - Dubbo
* - gRPC
* - Spring Cloud
*/
/**
* 2. 消息中间件
* - RocketMQ
* - ActiveMQ
*/
/**
* 3. 大数据
* - Hadoop
* - Spark
* - Flink
*/
/**
* 4. 游戏服务器
* - 实时对战
* - 聊天系统
*/
/**
* 5. 网关服务
* - API网关
* - 负载均衡
*/
/**
* 6. 即时通讯
* - 微信
* - 钉钉
*/
}
```
---
## 2. Netty核心架构总览
### 2.1 Netty架构层次
```
┌─────────────────────────────────────────────────────┐
│ 应用层 │
│ (HTTP/WebSocket/RPC/自定义协议) │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ ChannelHandler │
│ (编解码器/业务处理器/心跳检测) │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ ChannelPipeline │
│ (Handler链式调用容器) │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ Channel & EventLoop │
│ (网络操作/事件循环) │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ Transport │
│ (NIO/OIO/Epoll/Kqueue) │
└─────────────────────────────────────────────────────┘
```
### 2.2 核心组件关系
```java
/**
* Netty核心组件关系图
*/
public class NettyComponentRelationship {
/*
* Bootstrap(启动器)
* ↓
* EventLoopGroup(事件循环组)
* ↓
* EventLoop(事件循环)
* ↓
* Channel(通道)
* ↓
* ChannelPipeline(管道)
* ↓
* ChannelHandler(处理器)
* ↓
* ByteBuf(字节缓冲)
*/
}
```
### 2.3 数据流转过程
```
入站数据流(Inbound):
网络数据 → Channel → ChannelPipeline → ChannelHandler(入站)→ 业务处理
出站数据流(Outbound):
业务处理 → ChannelHandler(出站)→ ChannelPipeline → Channel → 网络发送
```
---
## 3. 核心组件一:ByteBuf深度解析
### 3.1 ByteBuf vs NIO ByteBuffer
```java
/**
* NIO ByteBuffer的问题
*/
public class NIOByteBufferDemo {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 写入数据
buffer.put("Hello Netty".getBytes());
// 问题1:需要手动flip切换读写模式
buffer.flip();
// 读取数据
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println(new String(data));
// 问题2:再次读取需要重新flip
buffer.rewind();
// 问题3:固定容量,无法动态扩容
// 问题4:API复杂,使用不便
// 问题5:没有引用计数,无法实现零拷贝
}
}
```
```java
/**
* Netty ByteBuf的优势
*/
public class NettyByteBufDemo {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024);
// 优势1:自动扩容
buffer.writeBytes("Hello Netty".getBytes());
// 优势2:读写索引分离,无需手动flip
System.out.println("读取索引:" + buffer.readerIndex());
System.out.println("写入索引:" + buffer.writerIndex());
// 读取数据
byte[] data = new byte[buffer.readableBytes()];
buffer.readBytes(data);
System.out.println(new String(data));
// 优势3:支持引用计数,实现零拷贝
ByteBuf slice = buffer.slice();
// 优势4:API简洁易用
buffer.writeLong(100L);
buffer.writeInt(200);
// 优势5:支持池化和非池化
ByteBuf pooledBuffer = ByteBufAllocator.DEFAULT.buffer(1024);
ByteBuf unpooledBuffer = Unpooled.buffer(1024);
// 优势6:多种缓冲区类型
ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer(1024); // 堆外内存
ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(1024); // 堆内内存
// 优势7:支持复合缓冲区
ByteBuf compositeBuffer = Unpooled.wrappedBuffer(buffer, slice);
}
}
```
### 3.2 ByteBuf核心结构
```java
/**
* ByteBuf核心结构
*
* ByteBuf由三个索引组成:
* readerIndex: 读索引
* writerIndex: 写索引
* capacity: 容量
*
* 内存布局:
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* | (已读) | (可读) | (可写) |
* +-------------------+------------------+------------------+
* 0 readerIndex writerIndex capacity
*/
public class ByteBufStructure {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println("初始状态:");
System.out.println("capacity = " + buffer.capacity());
System.out.println("readerIndex = " + buffer.readerIndex());
System.out.println("writerIndex = " + buffer.writerIndex());
System.out.println("readableBytes = " + buffer.readableBytes());
System.out.println("writableBytes = " + buffer.writableBytes());
// 写入数据
buffer.writeBytes("Hello".getBytes());
System.out.println("\n写入Hello后:");
System.out.println("readerIndex = " + buffer.readerIndex());
System.out.println("writerIndex = " + buffer.writerIndex());
System.out.println("readableBytes = " + buffer.readableBytes());
System.out.println("writableBytes = " + buffer.writableBytes());
// 读取数据
byte[] data = new byte[2];
buffer.readBytes(data);
System.out.println("\n读取2字节后:");
System.out.println("readerIndex = " + buffer.readerIndex());
System.out.println("writerIndex = " + buffer.writerIndex());
System.out.println("readableBytes = " + buffer.readableBytes());
System.out.println("writableBytes = " + buffer.writableBytes());
// 丢弃已读数据
buffer.discardReadBytes();
System.out.println("\n丢弃已读数据后:");
System.out.println("readerIndex = " + buffer.readerIndex());
System.out.println("writerIndex = " + buffer.writerIndex());
System.out.println("readableBytes = " + buffer.readableBytes());
System.out.println("writableBytes = " + buffer.writableBytes());
// 清空缓冲区
buffer.clear();
System.out.println("\n清空缓冲区后:");
System.out.println("readerIndex = " + buffer.readerIndex());
System.out.println("writerIndex = " + buffer.writerIndex());
System.out.println("readableBytes = " + buffer.readableBytes());
System.out.println("writableBytes = " + buffer.writableBytes());
}
}
```
### 3.3 ByteBuf的类型
```java
/**
* ByteBuf类型详解
*/
public class ByteBufTypes {
public static void main(String[] args) {
// 1. 堆缓冲区(Heap Buffer)
// - 数据存储在JVM堆中
// - 分配和释放速度快
// - 适合小数据量
ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(1024);
System.out.println("堆缓冲区:" + heapBuffer);
System.out.println("是否有数组:" + heapBuffer.hasArray());
System.out.println("数组地址:" + heapBuffer.array());
// 2. 直接缓冲区(Direct Buffer)
// - 数据存储在堆外内存
// - 零拷贝,减少数据拷贝
// - 适合大数据量
ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer(1024);
System.out.println("\n直接缓冲区:" + directBuffer);
System.out.println("是否有数组:" + directBuffer.hasArray());
// 3. 复合缓冲区(Composite Buffer)
// - 多个ByteBuf的逻辑组合
// - 实现零拷贝
// - 不实际拷贝数据
ByteBuf buf1 = Unpooled.copiedBuffer("Hello".getBytes());
ByteBuf buf2 = Unpooled.copiedBuffer(" World".getBytes());
ByteBuf compositeBuffer = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println("\n复合缓冲区:" + compositeBuffer);
System.out.println("内容:" + compositeBuffer.toString(CharsetUtil.UTF_8));
// 4. 只读缓冲区(Read-only Buffer)
ByteBuf readOnlyBuffer = buf1.asReadOnly();
// readOnlyBuffer.writeByte(1); // 抛出异常
// 5. 切片缓冲区(Slice Buffer)
ByteBuf sliceBuffer = buf1.slice(0, 2); // 截取前两个字节
System.out.println("\n切片缓冲区:" + sliceBuffer);
System.out.println("内容:" + sliceBuffer.toString(CharsetUtil.UTF_8));
// 释放缓冲区
ReferenceCountUtil.release(heapBuffer);
ReferenceCountUtil.release(directBuffer);
ReferenceCountUtil.release(buf1);
ReferenceCountUtil.release(buf2);
ReferenceCountUtil.release(compositeBuffer);
}
}
```
### 3.4 ByteBuf引用计数
```java
/**
* ByteBuf引用计数机制
*
* 引用计数用于跟踪ByteBuf的引用次数,实现零拷贝和自动内存管理
*/
public class ByteBufReferenceCount {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024);
System.out.println("初始引用计数:" + buffer.refCnt()); // 1
// retain:增加引用计数
buffer.retain();
System.out.println("retain后引用计数:" + buffer.refCnt()); // 2
buffer.retain();
System.out.println("再次retain后引用计数:" + buffer.refCnt()); // 3
// release:减少引用计数
buffer.release();
System.out.println("release后引用计数:" + buffer.refCnt()); // 2
// slice:增加原缓冲区的引用计数
ByteBuf slice = buffer.slice();
System.out.println("slice后原缓冲区引用计数:" + buffer.refCnt()); // 3
// duplicate:增加原缓冲区的引用计数
ByteBuf duplicate = buffer.duplicate();
System.out.println("duplicate后原缓冲区引用计数:" + buffer.refCnt()); // 4
// 释放所有引用
ReferenceCountUtil.release(buffer);
ReferenceCountUtil.release(slice);
ReferenceCountUtil.release(duplicate);
System.out.println("释放所有引用后引用计数:" + buffer.refCnt()); // 0
}
}
```
### 3.5 ByteBuf池化与非池化
```java
/**
* ByteBuf池化与非池化
*/
public class ByteBufPooling {
public static void main(String[] args) {
// 池化ByteBuf(默认)
// - 优点:重用内存,减少GC压力
// - 缺点:需要手动释放
System.setProperty("io.netty.allocator.type", "pooled");
ByteBuf pooledBuffer = ByteBufAllocator.DEFAULT.buffer(1024);
System.out.println("池化缓冲区:" + pooledBuffer);
ReferenceCountUtil.release(pooledBuffer);
// 非池化ByteBuf
// - 优点:使用简单,无需释放
// - 缺点:每次创建新对象,GC压力大
System.setProperty("io.netty.allocator.type", "unpooled");
ByteBuf unpooledBuffer = Unpooled.buffer(1024);
System.out.println("非池化缓冲区:" + unpooledBuffer);
}
}
```
### 3.6 ByteBuf工具类
```java
/**
* ByteBuf工具类使用
*/
public class ByteBufUtils {
public static void main(String[] args) {
// 1. 创建ByteBuf
ByteBuf buffer1 = Unpooled.buffer(1024);
ByteBuf buffer2 = Unpooled.directBuffer(1024);
ByteBuf buffer3 = Unpooled.wrappedBuffer("Hello".getBytes());
// 2. 转换为ByteBuf
ByteBuf buffer4 = Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8);
// 3. 检查是否释放
if (ReferenceCountUtil.release(buffer1)) {
System.out.println("buffer1已释放");
}
// 4. 保留引用
ByteBuf buffer5 = buffer4.retainedSlice();
// 5. 安全释放
ReferenceCountUtil.safeRelease(buffer2);
ReferenceCountUtil.safeRelease(buffer3);
ReferenceCountUtil.safeRelease(buffer4);
ReferenceCountUtil.safeRelease(buffer5);
}
}
```
---
## 4. 核心组件二:Channel详解
### 4.1 Channel是什么
```java
/**
* Channel概念
*
* Channel是Netty对网络连接的抽象,代表一个网络连接
*
* Channel功能:
* 1. 读写数据
* 2. 连接管理
* 3. 事件通知
*/
public class ChannelConcept {
/*
* Channel类型:
*
* NioSocketChannel: NIO客户端Socket连接
* NioServerSocketChannel: NIO服务端Socket连接
* OioSocketChannel: OIO客户端Socket连接
* OioServerSocketChannel: OIO服务端Socket连接
* NioDatagramChannel: NIO UDP连接
*
* LocalChannel: 本地传输
* EmbeddedChannel: 嵌入式通道(测试用)
*/
}
```
### 4.2 Channel核心方法
```java
/**
* Channel核心方法
*/
public class ChannelMethods {
public static void main(String[] args) {
// 获取Channel(示例,实际需要从Context获取)
Channel channel = null;
// 1. 写数据(异步)
if (channel != null) {
ChannelFuture future = channel.writeAndFlush("Hello Netty");
// 添加监听器
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
System.out.println("数据发送成功");
} else {
System.out.println("数据发送失败:" + future.cause());
}
}
});
// 等待完成(阻塞)
try {
future.await(); // 可选:等待操作完成
} catch (InterruptedException e) {
e.printStackTrace();
}
// 2. 获取配置
ChannelConfig config = channel.config();
// 3. 获取属性
AttributeKey key = AttributeKey.valueOf("userId");
String userId = channel.attr(key).get();
// 4. 获取ChannelPipeline
ChannelPipeline pipeline = channel.pipeline();
// 5. 获取EventLoop
EventLoop eventLoop = channel.eventLoop();
// 6. 获取远程地址
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
// 7. 获取本地地址
InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
// 8. 判断状态
boolean isActive = channel.isActive();
boolean isOpen = channel.isOpen();
boolean isWritable = channel.isWritable();
// 9. 关闭Channel
ChannelFuture closeFuture = channel.close();
// 10. 注册EventLoop
channel.eventLoop().register(channel);
}
}
}
```
### 4.3 Channel生命周期
```java
/**
* Channel生命周期
*/
public class ChannelLifecycle {
/*
* Channel生命周期事件:
*
* channelRegistered: Channel注册到EventLoop
* channelUnregistered: Channel从EventLoop注销
* channelActive: Channel准备就绪(连接建立)
* channelInactive: Channel不再活跃(连接关闭)
* channelRead: 读取到数据
* channelReadComplete: 数据读取完成
* channelWritabilityChanged: 可写状态改变
* exceptionCaught: 发生异常
*/
}
```
```java
/**
* Channel生命周期处理器
*/
@Sharable
public class ChannelLifecycleHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel注册到EventLoop:" + ctx.channel());
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel从EventLoop注销:" + ctx.channel());
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel准备就绪:" + ctx.channel());
// 连接建立后可以发送数据
ctx.writeAndFlush("Welcome to Netty Server\n");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel不再活跃:" + ctx.channel());
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("读取到数据:" + msg);
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("数据读取完成");
super.channelReadComplete(ctx);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println("可写状态改变,isWritable = " + channel.isWritable());
super.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生异常:" + cause);
ctx.close();
}
}
```
### 4.4 Channel配置
```java
/**
* Channel配置
*/
public class ChannelConfigDemo {
public static void main(String[] args) {
// 获取ChannelConfig(示例)
ChannelConfig config = null;
if (config != null) {
// 1. 配置TCP参数
config.setOption(ChannelOption.SO_KEEPALIVE, true); // 保持连接
config.setOption(ChannelOption.TCP_NODELAY, true); // 禁用Nagle算法
config.setOption(ChannelOption.SO_BACKLOG, 128); // 连接队列大小
config.setOption(ChannelOption.SO_REUSEADDR, true); // 地址重用
// 2. 配置缓冲区
config.setOption(ChannelOption.SO_RCVBUF, 32 * 1024); // 接收缓冲区
config.setOption(ChannelOption.SO_SNDBUF, 32 * 1024); // 发送缓冲区
// 3. 配置超时
config.setOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); // 连接超时
// 4. 配置写缓冲区高低水位
config.setWriteBufferHighWaterMark(64 * 1024); // 高水位
config.setWriteBufferLowWaterMark(32 * 1024); // 低水位
// 5. 配置自动读取
config.setAutoRead(true); // 自动读取数据
// 6. 配置最大消息大小
config.setOption(ChannelOption.MAX_MESSAGES_PER_READ, 16);
}
}
}
```
### 4.5 Channel属性
```java
/**
* Channel属性
*/
public class ChannelAttributes {
// 定义属性Key
private static final AttributeKey USER_ID = AttributeKey.valueOf("userId");
private static final AttributeKey CONNECT_TIME = AttributeKey.valueOf("connectTime");
public static void setAttributes(Channel channel) {
// 设置属性
channel.attr(USER_ID).set("user123");
channel.attr(CONNECT_TIME).set(System.currentTimeMillis());
// 获取属性
String userId = channel.attr(USER_ID).get();
Long connectTime = channel.attr(CONNECT_TIME).get();
System.out.println("用户ID:" + userId);
System.out.println("连接时间:" + new Date(connectTime));
// 移除属性
channel.attr(USER_ID).remove();
}
}
```
---
## 5. 核心组件三:ChannelHandler与ChannelPipeline
### 5.1 ChannelHandler是什么
```java
/**
* ChannelHandler概念
*
* ChannelHandler是Netty处理IO事件的接口
*
* ChannelHandler类型:
* 1. ChannelInboundHandler:入站处理器(处理读取到的数据)
* 2. ChannelOutboundHandler:出站处理器(处理要发送的数据)
* 3. ChannelDuplexHandler:双向处理器(同时处理入站和出站)
*/
public class ChannelHandlerConcept {
/*
* 数据流转:
*
* 入站(Inbound):
* 网络数据 → Channel → ChannelPipeline → ChannelInboundHandler → 业务处理
*
* 出站(Outbound):
* 业务处理 → ChannelOutboundHandler → ChannelPipeline → Channel → 网络发送
*/
}
```
### 5.2 ChannelInboundHandler
```java
/**
* 入站处理器示例
*/
@Sharable
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("【Inbound】Channel注册");
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【Inbound】连接建立");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("【Inbound】读取数据:" + msg);
// 处理数据
ByteBuf buf = (ByteBuf) msg;
String message = buf.toString(CharsetUtil.UTF_8);
// 继续传递给下一个Handler
super.channelRead(ctx, message);
// 释放资源
ReferenceCountUtil.release(buf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("【Inbound】读取完成");
super.channelReadComplete(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【Inbound】连接关闭");
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("【Inbound】发生异常:" + cause);
ctx.close();
}
}
```
### 5.3 ChannelOutboundHandler
```java
/**
* 出站处理器示例
*/
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
System.out.println("【Outbound】绑定地址:" + localAddress);
super.bind(ctx, localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.println("【Outbound】连接远程地址:" + remoteAddress);
super.connect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("【Outbound】写入数据:" + msg);
// 转换数据
String str = (String) msg;
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes(str.getBytes(CharsetUtil.UTF_8));
// 继续传递给下一个Handler
super.write(ctx, buf, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
System.out.println("【Outbound】刷新缓冲区");
super.flush(ctx);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
System.out.println("【Outbound】关闭连接");
super.close(ctx, promise);
}
}
```
### 5.4 ChannelDuplexHandler
```java
/**
* 双向处理器示例
*/
@Sharable
public class MyDuplexHandler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("【Duplex-Inbound】读取数据:" + msg);
// 处理入站数据
ByteBuf buf = (ByteBuf) msg;
String message = buf.toString(CharsetUtil.UTF_8);
// 处理完数据后,回写
ctx.writeAndFlush("Echo: " + message);
super.channelRead(ctx, msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("【Duplex-Outbound】写入数据:" + msg);
super.write(ctx, msg, promise);
}
}
```
### 5.5 ChannelPipeline详解
```java
/**
* ChannelPipeline概念
*
* ChannelPipeline是ChannelHandler的容器,管理Handler链
*
* Pipeline特点:
* 1. 链式结构:Handler按添加顺序组成链
* 2. 双向流动:入站和出站数据流方向相反
* 3. 线程安全:Pipeline是线程安全的
*/
public class ChannelPipelineConcept {
/*
* Pipeline结构:
*
* ┌─────────────────┐
* │ HeadContext │
* └────────┬────────┘
* ↓
* ┌─────────────────┐
* │ Handler1 │
* └────────┬────────┘
* ↓
* ┌─────────────────┐
* │ Handler2 │
* └────────┬────────┘
* ↓
* ┌─────────────────┐
* │ Handler3 │
* └────────┬────────┘
* ↓
* ┌─────────────────┐
* │ TailContext │
* └─────────────────┘
*
* 入站数据流:Head → Handler1 → Handler2 → Handler3 → Tail
* 出站数据流:Tail → Handler3 → Handler2 → Handler1 → Head
*/
}
```
### 5.6 Pipeline操作
```java
/**
* Pipeline操作示例
*/
public class ChannelPipelineOperations {
public static void addHandlers(ChannelPipeline pipeline) {
// 1. 添加Handler到最后
pipeline.addLast("handler1", new MyInboundHandler());
pipeline.addLast("handler2", new MyOutboundHandler());
pipeline.addLast("handler3", new MyDuplexHandler());
// 2. 添加Handler到指定位置
pipeline.addFirst("firstHandler", new MyInboundHandler());
pipeline.addBefore("handler2", "beforeHandler2", new MyInboundHandler());
pipeline.addAfter("handler1", "afterHandler1", new MyOutboundHandler());
// 3. 获取Handler
ChannelHandler handler1 = pipeline.get("handler1");
MyInboundHandler inboundHandler = pipeline.get(MyInboundHandler.class);
// 4. 替换Handler
pipeline.replace("handler1", "newHandler1", new MyInboundHandler());
// 5. 移除Handler
pipeline.remove("handler2");
pipeline.remove(MyOutboundHandler.class);
// 6. 获取Pipeline中的Handler列表
Map handlers = pipeline.toMap();
System.out.println("Pipeline中的Handler:" + handlers);
}
}
```
### 5.7 ChannelHandlerContext
```java
/**
* ChannelHandlerContext详解
*
* ChannelHandlerContext是Handler的上下文,保存Handler的关联信息
*/
public class ChannelHandlerContextDemo {
/*
* ChannelHandlerContext功能:
* 1. 关联Channel和Pipeline
* 2. 提供操作Channel和Pipeline的方法
* 3. 提供Attribute存储自定义数据
* 4. 提供事件传递机制
*/
}
```
```java
/**
* 使用ChannelHandlerContext的Handler
*/
@Sharable
public class ContextHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 1. 获取Channel
Channel channel = ctx.channel();
// 2. 获取Pipeline
ChannelPipeline pipeline = ctx.pipeline();
// 3. 获取EventLoop
EventLoop eventLoop = ctx.executor();
// 4. 获取Allocator
ByteBufAllocator allocator = ctx.alloc();
// 5. 存储属性
AttributeKey key = AttributeKey.valueOf("handlerData");
ctx.attr(key).set("data");
// 6. 向后传递事件
ctx.fireChannelRead(msg);
// 7. 写数据(出站)
ctx.writeAndFlush("Response");
// 8. 执行任务
eventLoop.execute(() -> {
System.out.println("在EventLoop中执行任务");
});
}
}
```
---
## 6. 核心组件四:EventLoop详解
### 6.1 EventLoop是什么
```java
/**
* EventLoop概念
*
* EventLoop是Netty的核心线程模型,负责处理IO事件和任务
*
* EventLoop特点:
* 1. 一个EventLoop绑定一个Thread
* 2. 一个EventLoop可以处理多个Channel
* 3. 一个Channel的生命周期由一个EventLoop负责
* 4. 保证线程安全,无需同步
*/
public class EventLoopConcept {
/*
* EventLoopGroup:
* - EventLoopGroup是EventLoop的组
* - BossGroup:负责接收连接
* - WorkerGroup:负责处理IO
*
* EventLoop线程模型:
* ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
* │ EventLoop1 │ │ EventLoop2 │ │ EventLoop3 │
* │ (Thread1) │ │ (Thread2) │ │ (Thread3) │
* └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
* │ │ │
* ┌───┴───┐ ┌───┴───┐ ┌───┴───┐
* │ Ch1 │ │ Ch3 │ │ Ch5 │
* │ Ch2 │ │ Ch4 │ │ Ch6 │
* └───────┘ └───────┘ └───────┘
*/
}
```
### 6.2 EventLoop工作原理
```java
/**
* EventLoop工作原理
*
* EventLoop无限循环:
* 1. 检查是否有IO事件(Select)
* 2. 处理IO事件(Process)
* 3. 执行任务队列(Run Tasks)
*/
public class EventLoopWorkflow {
/*
* EventLoop工作流程:
*
* while (!threadShutdown) {
* // 1. 检查IO事件
* int readyKeys = selector.select();
*
* // 2. 处理IO事件
* processSelectedKeys(readyKeys);
*
* // 3. 执行任务队列
* runAllTasks();
* }
*/
}
```
### 6.3 EventLoop任务执行
```java
/**
* EventLoop任务执行示例
*/
public class EventLoopTaskExecution {
public static void executeTasks(Channel channel) {
EventLoop eventLoop = channel.eventLoop();
// 1. 立即执行任务
eventLoop.execute(() -> {
System.out.println("立即执行任务,线程:" + Thread.currentThread().getName());
});
// 2. 延迟执行任务
ScheduledFuture> future1 = eventLoop.schedule(() -> {
System.out.println("延迟执行任务,线程:" + Thread.currentThread().getName());
}, 1, TimeUnit.SECONDS);
// 3. 定期执行任务
ScheduledFuture> future2 = eventLoop.scheduleAtFixedRate(() -> {
System.out.println("定期执行任务,线程:" + Thread.currentThread().getName());
}, 0, 1, TimeUnit.SECONDS);
// 4. 取消任务
future1.cancel(false);
future2.cancel(false);
// 5. 判断是否在EventLoop线程中
boolean inEventLoop = eventLoop.inEventLoop();
System.out.println("是否在EventLoop线程中:" + inEventLoop);
// 6. 提交任务并等待结果
Future future3 = eventLoop.submit(() -> {
return "任务结果";
});
try {
String result = future3.get();
System.out.println("任务结果:" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
```
### 6.4 EventLoop线程安全
```java
/**
* EventLoop线程安全示例
*/
@Sharable
public class EventLoopThreadSafeHandler extends ChannelInboundHandlerAdapter {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// EventLoop保证单线程执行,无需同步
int count = counter.incrementAndGet();
System.out.println("处理次数:" + count);
// 模拟耗时操作
Thread.sleep(100);
// 继续传递
super.channelRead(ctx, msg);
}
/**
* 安全的Channel操作
*/
public void safeChannelWrite(Channel channel, String message) {
// 判断是否在EventLoop线程中
if (channel.eventLoop().inEventLoop()) {
// 在EventLoop线程中,直接执行
channel.writeAndFlush(message);
} else {
// 不在EventLoop线程中,提交任务
channel.eventLoop().execute(() -> {
channel.writeAndFlush(message);
});
}
}
}
```
### 6.5 EventLoopGroup配置
```java
/**
* EventLoopGroup配置
*/
public class EventLoopGroupConfig {
public static void main(String[] args) {
// 1. 默认配置(CPU核心数 * 2)
EventLoopGroup group1 = new NioEventLoopGroup();
// 2. 指定线程数
EventLoopGroup group2 = new NioEventLoopGroup(4);
// 3. 使用自定义线程工厂
EventLoopGroup group3 = new NioEventLoopGroup(4, new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "netty-eventloop-" + counter.incrementAndGet());
}
});
// 4. 使用默认线程工厂
EventLoopGroup group4 = new NioEventLoopGroup(0, new DefaultThreadFactory("netty-worker"));
// 5. 关闭EventLoopGroup
group1.shutdownGracefully();
group2.shutdownGracefully();
group3.shutdownGracefully();
group4.shutdownGracefully();
}
}
```
---
## 7. 核心组件五:Bootstrap启动类
### 7.1 Bootstrap是什么
```java
/**
* Bootstrap概念
*
* Bootstrap是Netty的启动类,用于配置和启动服务器或客户端
*
* Bootstrap类型:
* 1. ServerBootstrap:启动服务器
* 2. Bootstrap:启动客户端
*/
public class BootstrapConcept {
/*
* Bootstrap配置步骤:
*
* 1. 配置EventLoopGroup
* 2. 配置Channel类型
* 3. 配置ChannelHandler
* 4. 配置ChannelOption
* 5. 配置ChannelAttr
* 6. 绑定端口/连接服务器
*/
}
```
### 7.2 ServerBootstrap
```java
/**
* ServerBootstrap完整配置
*/
public class ServerBootstrapDemo {
public static void main(String[] args) throws InterruptedException {
// 1. 创建EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 接收连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理IO
try {
// 2. 创建ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
// 3. 配置Bootstrap
bootstrap.group(bossGroup, workerGroup)
// 配置Channel类型
.channel(NioServerSocketChannel.class)
// 配置Handler
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加业务Handler
pipeline.addLast(new EchoServerHandler());
}
})
// 配置ChannelOption
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
// 配置ChannelAttr
.attr(AttributeKey.newInstance("serverName"), "NettyServer");
// 4. 绑定端口并启动
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("服务器启动成功,监听端口:8080");
// 5. 等待关闭
future.channel().closeFuture().sync();
} finally {
// 6. 优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
```
### 7.3 Bootstrap
```java
/**
* Bootstrap完整配置
*/
public class ClientBootstrapDemo {
public static void main(String[] args) throws InterruptedException {
// 1. 创建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
// 2. 创建Bootstrap
Bootstrap bootstrap = new Bootstrap();
// 3. 配置Bootstrap
bootstrap.group(group)
// 配置Channel类型
.channel(NioSocketChannel.class)
// 配置Handler
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加业务Handler
pipeline.addLast(new ClientHandler());
}
})
// 配置ChannelOption
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
// 4. 连接服务器
ChannelFuture connectFuture = bootstrap.connect("localhost", 8080).sync();
System.out.println("连接服务器成功");
// 5. 发送消息
Channel channel = connectFuture.channel();
channel.writeAndFlush("Hello Server");
// 6. 等待关闭
channel.closeFuture().sync();
} finally {
// 7. 优雅关闭
group.shutdownGracefully();
}
}
}
```
### 7.4 ChannelFuture
```java
/**
* ChannelFuture详解
*
* ChannelFuture代表异步操作的结果
*/
public class ChannelFutureDemo {
public static void listenToChannelFuture(Channel channel) {
// 1. 写入数据
ChannelFuture writeFuture = channel.writeAndFlush("Hello");
// 2. 添加监听器(异步)
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
System.out.println("数据发送成功");
} else {
System.out.println("数据发送失败:" + future.cause());
}
}
});
// 3. 简写监听器
writeFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
// 4. 等待完成(阻塞)
try {
writeFuture.await();
if (writeFuture.isSuccess()) {
System.out.println("数据发送成功");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 5. 获取Channel
Channel resultChannel = writeFuture.channel();
}
}
```
### 7.5 Bootstrap最佳实践
```java
/**
* Bootstrap最佳实践
*/
public class BootstrapBestPractices {
/**
* 1. 合理配置EventLoopGroup线程数
*/
public static void configEventLoopGroup() {
// BossGroup:1个线程即可
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// WorkerGroup:根据业务类型配置
// CPU密集型:CPU核心数
// IO密集型:CPU核心数 * 2
int workerThreads = Runtime.getRuntime().availableProcessors() * 2;
EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads);
}
/**
* 2. 合理配置ChannelOption
*/
public static void configChannelOption(ServerBootstrap bootstrap) {
// SO_BACKLOG:连接队列大小
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
// SO_KEEPALIVE:保持连接
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// TCP_NODELAY:禁用Nagle算法,提高实时性
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
// SO_RCVBUF/SO_SNDBUF:接收/发送缓冲区
bootstrap.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
bootstrap.childOption(ChannelOption.SO_SNDBUF, 32 * 1024);
// WRITE_BUFFER_WATER_MARK:写缓冲区水位
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(32 * 1024, 64 * 1024));
}
/**
* 3. 合理添加Handler
*/
public static void addHandlers(ChannelPipeline pipeline) {
// 按顺序添加:编解码器 → 协议处理器 → 业务处理器
// 1. 编解码器
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024, 0, 4));
pipeline.addLast("stringDecoder", new StringDecoder());
pipeline.addLast("stringEncoder", new StringEncoder());
// 2. 协议处理器
pipeline.addLast("idleStateHandler", new IdleStateHandler(30, 0, 0));
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
// 3. 业务处理器
pipeline.addLast("businessHandler", new BusinessHandler());
}
/**
* 4. 优雅关闭
*/
public static void gracefulShutdown(EventLoopGroup group) {
// shutdownGracefully()会平滑关闭
group.shutdownGracefully();
// 或者等待关闭完成
try {
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
```
---
## 8. 项目实战一:Echo服务器
### 8.1 项目概述
实现一个简单的Echo服务器,客户端发送什么消息,服务器就返回什么消息。
### 8.2 服务器实现
```java
/**
* Echo服务器
*/
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加EchoHandler
pipeline.addLast(new EchoServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("Echo服务器启动,监听端口:" + port);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoServer(8080).start();
}
}
/**
* Echo服务器Handler
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器收到:" + buf.toString(CharsetUtil.UTF_8));
// 回显消息
ctx.write(buf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
```
### 8.3 客户端实现
```java
/**
* Echo客户端
*/
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new EchoClientHandler());
}
});
Channel channel = bootstrap.connect(host, port).sync().channel();
System.out.println("Echo客户端连接成功");
// 发送消息
for (int i = 0; i < 10; i++) {
String message = "Hello " + i;
channel.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
System.out.println("客户端发送:" + message);
Thread.sleep(1000);
}
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient("localhost", 8080).start();
}
}
/**
* Echo客户端Handler
*/
@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("客户端收到:" + msg.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
```
---
## 9. 项目实战二:HTTP服务器
### 9.1 项目概述
实现一个HTTP服务器,处理HTTP请求并返回响应。
### 9.2 HTTP服务器实现
```java
/**
* HTTP服务器
*/
public class HttpServer {
private final int port;
public HttpServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// HTTP编解码器
pipeline.addLast(new HttpServerCodec());
// HTTP对象聚合器
pipeline.addLast(new HttpObjectAggregator(65536));
// HTTP压缩
pipeline.addLast(new HttpContentCompressor());
// HTTP业务处理器
pipeline.addLast(new HttpServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("HTTP服务器启动,监听端口:" + port);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new HttpServer(8080).start();
}
}
/**
* HTTP服务器Handler
*/
@Sharable
public class HttpServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
// 获取请求信息
String uri = request.uri();
String method = request.method().name();
HttpHeaders headers = request.headers();
System.out.println("收到请求:" + method + " " + uri);
System.out.println("请求头:" + headers);
// 构建响应
String content = "Hello Netty HTTP Server\n" +
"Method: " + method + "\n" +
"URI: " + uri + "\n" +
"Time: " + new Date();
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1,
OK,
Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)
);
// 设置响应头
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
response.headers().set(CONNECTION, CLOSE);
// 发送响应
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
```
### 9.3 静态文件服务器
```java
/**
* 静态文件服务器
*/
@Sharable
public class StaticFileServerHandler extends SimpleChannelInboundHandler {
private final String basePath;
public StaticFileServerHandler(String basePath) {
this.basePath = basePath;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
// 只处理GET请求
if (!GET.equals(request.method())) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
final String uri = request.uri();
final String path = sanitizeUri(uri);
if (path == null) {
sendError(ctx, FORBIDDEN);
return;
}
File file = new File(basePath, path);
if (file.isHidden() || !file.exists()) {
sendError(ctx, NOT_FOUND);
return;
}
if (!file.isFile()) {
sendError(ctx, FORBIDDEN);
return;
}
RandomAccessFile raf;
try {
raf = new RandomAccessFile(file, "r");
} catch (FileNotFoundException ignore) {
sendError(ctx, NOT_FOUND);
return;
}
long fileLength = raf.length();
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
HttpUtil.setContentLength(response, fileLength);
setContentTypeHeader(response, file);
if (HttpUtil.isKeepAlive(request)) {
response.headers().set(CONNECTION, KEEP_ALIVE);
}
ctx.write(response);
ChannelFuture sendFileFuture;
ChannelFuture lastContentFuture;
if (ctx.pipeline().get(SslHandler.class) == null) {
sendFileFuture = ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength),
ctx.newProgressivePromise());
lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} else {
sendFileFuture = null;
lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, fileLength)),
ctx.newProgressivePromise());
}
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) {
System.err.println(future.channel() + " Transfer progress: " + progress);
} else {
System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future) {
System.err.println(future.channel() + " Transfer complete.");
}
});
if (!HttpUtil.isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
private String sanitizeUri(String uri) {
try {
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
try {
uri = URLDecoder.decode(uri, ISO_8859_1.toString());
} catch (UnsupportedEncodingException e1) {
throw new Error();
}
}
if (uri.isEmpty() || uri.charAt(0) != '/') {
return null;
}
uri = uri.replace('/', File.separatorChar);
if (uri.contains(File.separator + '.') ||
uri.contains('.' + File.separator) ||
uri.charAt(0) == '.' || uri.charAt(uri.length() - 1) == '.') {
return null;
}
return uri;
}
private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private void setContentTypeHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
}
}
```
---
## 10. 项目实战三:RPC框架实现
### 10.1 RPC框架架构
```
┌─────────────┐ ┌─────────────┐
│ 客户端 │ │ 服务端 │
│ │ │ │
│ RPC接口 │ ──── 1.方法调用 ────→ │ RPC接口 │
│ │ │ │
│ 动态代理 │ │ 服务注册 │
│ │ ←──── 2.返回结果 ───── │ │
│ Netty客户端 │ │ Netty服务端 │
└─────────────┘ └─────────────┘
```
### 10.2 RPC协议定义
```java
/**
* RPC请求
*/
@Data
public class RpcRequest implements Serializable {
private String requestId; // 请求ID
private String interfaceName; // 接口名称
private String methodName; // 方法名称
private Class>[] paramTypes; // 参数类型
private Object[] params; // 参数值
}
/**
* RPC响应
*/
@Data
public class RpcResponse implements Serializable {
private String requestId; // 请求ID
private Object result; // 结果
private Throwable error; // 异常
}
```
### 10.3 RPC编解码器
```java
/**
* RPC消息编码器
*/
public class RpcEncoder extends MessageToByteEncoder {
private final Class> genericClass;
public RpcEncoder(Class> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (genericClass.isInstance(msg)) {
byte[] data = SerializationUtils.serialize(msg);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
/**
* RPC消息解码器
*/
public class RpcDecoder extends ByteToMessageDecoder {
private final Class> genericClass;
public RpcDecoder(Class> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List