【TCP】【UDP】非常重要
# TCP/UDP深度解析:从原理到封装实现
## 目录
1. [传输层协议概述](#1-传输层协议概述)
2. [TCP协议深度解析](#2-tcp协议深度解析)
3. [UDP协议深度解析](#3-udp协议深度解析)
4. [TCP vs UDP详细对比](#4-tcp-vs-udp详细对比)
5. [TCP协议封装实现](#5-tcp协议封装实现)
6. [UDP协议封装实现](#6-udp协议封装实现)
7. [应用实战:文件传输](#7-应用实战文件传输)
8. [应用实战:即时通讯](#8-应用实战即时通讯)
9. [应用实战:视频流传输](#9-应用实战视频流传输)
10. [性能优化与调优](#10-性能优化与调优)
11. [总结与最佳实践](#11-总结与最佳实践)
---
## 1. 传输层协议概述
### 1.1 网络分层模型
```java
/**
* 网络分层模型
*/
public class NetworkLayers {
/*
* OSI七层模型:
* ┌─────────────────────────────────┐
* │ 7. 应用层 (Application) │ HTTP, FTP, SMTP
* │ 6. 表示层 (Presentation) │ SSL/TLS, JPEG
* │ 5. 会话层 (Session) │ RPC, NetBIOS
* │ 4. 传输层 (Transport) │ TCP, UDP ← 我们关注的层次
* │ 3. 网络层 (Network) │ IP, ICMP
* │ 2. 数据链路层 (Data Link) │ Ethernet, MAC
* │ 1. 物理层 (Physical) │ 光纤, 电线
* └─────────────────────────────────┘
*
* TCP/IP四层模型:
* ┌─────────────────────────────────┐
* │ 应用层 │ HTTP, FTP, SMTP
* │ 传输层 │ TCP, UDP ← 我们关注的层次
* │ 网络层 │ IP
* │ 网络接口层 │ Ethernet
* └─────────────────────────────────┘
*/
}
```
### 1.2 传输层的作用
```java
/**
* 传输层的作用
*/
public class TransportLayer {
/**
* 传输层的核心功能:
* 1. 进程到进程的通信(端到端通信)
* 2. 多路复用和多路分解(端口管理)
* 3. 可靠数据传输(TCP)
* 4. 流量控制
* 5. 拥塞控制
* 6. 差错控制
*/
public static void main(String[] args) {
System.out.println("传输层的作用:");
System.out.println("1. 端口管理:区分同一台主机上的不同进程");
System.out.println("2. 数据分段:将应用层数据分割成合适的段");
System.out.println("3. 可靠传输:保证数据完整性和顺序性(TCP)");
System.out.println("4. 流量控制:防止发送方发送过快淹没接收方");
System.out.println("5. 拥塞控制:防止网络过载导致丢包");
}
}
```
### 1.3 端口详解
```java
/**
* 端口详解
*/
public class PortDetails {
/*
* 端口分类:
*
* ┌─────────────────────────────────────────────────┐
* │ 知名端口 (0-1023) │
* │ 需要root权限,由IANA分配 │
* │ HTTP: 80, HTTPS: 443, SSH: 22, FTP: 21 │
* └─────────────────────────────────────────────────┘
*
* ┌─────────────────────────────────────────────────┐
* │ 注册端口 (1024-49151) │
* │ 由应用程序注册使用 │
* │ MySQL: 3306, Redis: 6379, MongoDB: 27017 │
* └─────────────────────────────────────────────────┘
*
* ┌─────────────────────────────────────────────────┐
* │ 动态/私有端口 (49152-65535) │
* │ 客户端临时使用 │
* └─────────────────────────────────────────────────┘
*/
public static void main(String[] args) {
System.out.println("常见端口示例:");
System.out.println("HTTP: 80");
System.out.println("HTTPS: 443");
System.out.println("SSH: 22");
System.out.println("FTP: 21 (控制), 20 (数据)");
System.out.println("MySQL: 3306");
System.out.println("Redis: 6379");
System.out.println("MongoDB: 27017");
System.out.println("RabbitMQ: 5672");
System.out.println("Elasticsearch: 9200");
System.out.println("ZooKeeper: 2181");
}
}
```
---
## 2. TCP协议深度解析
### 2.1 TCP协议概述
```java
/**
* TCP协议概述
*/
public class TCPOverview {
/**
* TCP(Transmission Control Protocol,传输控制协议)
*
* 特点:
* 1. 面向连接:通信前需要建立连接
* 2. 可靠传输:保证数据完整性和顺序性
* 3. 面向字节流:以字节流的方式传输数据
* 4. 全双工通信:支持双向同时传输
* 5. 点对点通信:一对一通信
*
* 应用场景:
* - HTTP/HTTPS
* - FTP
* - SMTP
* - SSH
* - 数据库连接
*/
}
```
### 2.2 TCP报文格式
```java
/**
* TCP报文格式
*/
public class TCPHeader {
/*
* TCP报文头部(20-60字节):
*
* ┌────────────────────────────────────────────────────────────┐
* │ 源端口 (16 bits) │ 目标端口 (16 bits) │
* ├────────────────────────────────────────────────────────────┤
* │ 序列号 (32 bits) │
* ├────────────────────────────────────────────────────────────┤
* │ 确认号 (32 bits) │
* ├──────────┬────────────────────────────────────────────────┤
* │ 首部长度 │ 保留 │ U A P R S F │ 窗口大小 (16 bits) │
* │ (4 bits) │ (6) │ R C S S Y I │ │
* │ │ │ G K H T N N │ │
* ├──────────┴────────┴─────────────┴────────────────────────┤
* │ 校验和 (16 bits) │ 紧急指针 (16 bits) │
* ├────────────────────────────────────────────────────────────┤
* │ 选项和填充 (可选) │
* └────────────────────────────────────────────────────────────┘
*
* 标志位说明:
* URG (Urgent): 紧急指针有效
* ACK (Acknowledgment): 确认号有效
* PSH (Push): 接收方应尽快将数据交给应用层
* RST (Reset): 重置连接
* SYN (Synchronize): 同步序列号,用于建立连接
* FIN (Finish): 发送方完成数据发送
*/
}
```
### 2.3 TCP三次握手
```java
/**
* TCP三次握手
*/
public class TCPThreeWayHandshake {
/*
* 三次握手过程:
*
* 客户端 服务端
* ────────────────────────────────────────────────────────
* (1) SYN=1, seq=x
* ───────────────→
* SYN_RCVD状态
* (2) SYN=1, ACK=1
* seq=y, ack=x+1
* ←───────────────
* ESTABLISHED状态
* (3) ACK=1
* seq=x+1, ack=y+1
* ───────────────→
* ESTABLISHED状态
*
* 为什么需要三次握手?
* 1. 防止已失效的连接请求突然又传送到服务端
* 2. 确认双方的接收和发送能力
* 3. 同步双方的初始序列号
*/
}
```
```java
/**
* 三次握手详细过程
*/
public class HandshakeDetails {
public static void main(String[] args) {
System.out.println("【第一次握手】客户端 → 服务端");
System.out.println(" - SYN=1:请求建立连接");
System.out.println(" - seq=x:客户端的初始序列号");
System.out.println(" - 客户端进入SYN_SENT状态");
System.out.println();
System.out.println("【第二次握手】服务端 → 客户端");
System.out.println(" - SYN=1, ACK=1:同意建立连接,确认收到客户端请求");
System.out.println(" - seq=y:服务端的初始序列号");
System.out.println(" - ack=x+1:确认已收到客户端的序列号x");
System.out.println(" - 服务端进入SYN_RCVD状态");
System.out.println();
System.out.println("【第三次握手】客户端 → 服务端");
System.out.println(" - ACK=1:确认收到服务端的连接请求");
System.out.println(" - seq=x+1:客户端的序列号");
System.out.println(" - ack=y+1:确认已收到服务端的序列号y");
System.out.println(" - 双方进入ESTABLISHED状态,连接建立成功");
}
}
```
### 2.4 TCP四次挥手
```java
/**
* TCP四次挥手
*/
public class TCPFourWayWave {
/*
* 四次挥手过程:
*
* 客户端 服务端
* ────────────────────────────────────────────────────────
* (1) FIN=1, seq=u
* ───────────────→
* FIN_WAIT_1状态
* (2) ACK=1
* seq=v, ack=u+1
* ←───────────────
* CLOSE_WAIT状态 FIN_WAIT_2状态
* (3) FIN=1, ACK=1
* seq=w, ack=u+1
* ←───────────────
* TIME_WAIT状态 LAST_ACK状态
* (4) ACK=1
* seq=u+1, ack=w+1
* ───────────────→
* CLOSED状态
* 2MSL后CLOSED状态
*
* 为什么需要四次挥手?
* 1. TCP是全双工通信,每个方向都需要单独关闭
* 2. 服务端可能还有数据需要发送,所以需要等待
* 3. 确保双方都能正常关闭连接
*/
}
```
### 2.5 TCP滑动窗口
```java
/**
* TCP滑动窗口
*/
public class TCPSlidingWindow {
/*
* 滑动窗口机制:
*
* 发送方窗口:
* ┌─────────────────────────────────────────────────────┐
* │已发送并确认│已发送未确认│未发送但可用│不可发送 │
* │ (1-3) │ (4-5) │ (6-7) │ (8-...) │
* └─────────────────────────────────────────────────────┘
* ↑ ↑ ↑
* P1 P2 P3
*
* P1: 窗口左边界(第一个未确认的字节)
* P2: 窗口右边界(下一个可发送的字节)
* P3: 发送窗口大小
*
* 接收方窗口:
* 接收方通告窗口大小(rwnd),控制发送方的发送速率
*
* 滑动窗口的作用:
* 1. 流量控制:防止发送方发送过快
* 2. 提高效率:允许连续发送多个报文
* 3. 可靠传输:通过ACK确认机制
*/
}
```
### 2.6 TCP拥塞控制
```java
/**
* TCP拥塞控制
*/
public class TCPCongestionControl {
/*
* 拥塞控制算法:
*
* 1. 慢启动 (Slow Start)
* - 初始cwnd = 1 MSS
* - 每收到一个ACK,cwnd += 1 MSS(指数增长)
* - 当cwnd >= ssthresh时,进入拥塞避免
*
* 2. 拥塞避免 (Congestion Avoidance)
* - 每经过一个RTT,cwnd += 1 MSS(线性增长)
* - 当检测到拥塞时,进入快重传或快恢复
*
* 3. 快重传 (Fast Retransmit)
* - 收到3个重复ACK,立即重传丢失的报文
* - ssthresh = cwnd / 2
* - cwnd = ssthresh + 3 MSS
*
* 4. 快恢复 (Fast Recovery)
* - 收到新的ACK后,cwnd = ssthresh
* - 进入拥塞避免阶段
*
* 拥塞窗口变化:
* ┌──────────────────────────────────────────────┐
* │ │
* │ cwnd │ ╱╲ ╱──╲ │
* │ │ ╱ ╲ ╱ ╲ │
* │ │ ╱ ╲ ╱ ╲ ╱──╲ │
* │ │ ╱ ╲╱ ╲ ╱ ╲ │
* │ │ ╱ ╲╱ ╲ │
* │ │╱ ╲ │
* │ └─────────────────────────────────────┤
* │ 慢启动 拥塞避免 快重传 快恢复 │
* └──────────────────────────────────────────────┘
*/
}
```
### 2.7 TCP状态机
```java
/**
* TCP状态机
*/
public class TCPStateMachine {
/*
* TCP状态转换:
*
* ┌─────────────────────────────────────────────────────────┐
* │ │
* │ CLOSED LISTEN │
* │ │ │ │
* │ │ application │ │
* │ │ open/close │ │
* │ ↓ │ │
* │ SYN_SENT ────────────────────────── SYN_RCVD │
* │ │ SYN/ACK │ send SYN │
* │ │ ←───────────────┤ receive SYN │
* │ │ │ │
* │ │ ACK │ │
* │ ↓ ←───────────────┤ │
* │ ESTABLISHED │ ACK │
* │ │ ↓ │
* │ │ LISTEN │
* │ │ FIN │ │
* │ │ ←───────────────┤ │
* │ ↓ │ │
* │ FIN_WAIT_1 │ │
* │ │ ACK │ │
* │ │ ←───────────────┤ │
* │ ↓ │ │
* │ FIN_WAIT_2 ────────────────────────── CLOSE_WAIT │
* │ │ FIN/ACK │ receive FIN │
* │ │ ←───────────────┤ │
* │ ↓ │ │
* │ TIME_WAIT │ FIN │
* │ │ ACK │ ←──────────────────────── │
* │ ↓ ────────────────┤ │
* │ CLOSED │ │
* │ ↓ │
* │ CLOSED │
* │ │
* └─────────────────────────────────────────────────────────┘
*/
}
```
---
## 3. UDP协议深度解析
### 3.1 UDP协议概述
```java
/**
* UDP协议概述
*/
public class UDPOverview {
/**
* UDP(User Datagram Protocol,用户数据报协议)
*
* 特点:
* 1. 无连接:通信前不需要建立连接
* 2. 不可靠传输:不保证数据完整性和顺序性
* 3. 面向报文:以报文的方式传输数据
* 4. 无拥塞控制:发送速率不受网络状态影响
* 5. 支持一对一、一对多、多对多通信
*
* 应用场景:
* - DNS查询
* - 视频流
* - 在线游戏
* - 实时通讯
* - 广播和多播
*/
}
```
### 3.2 UDP报文格式
```java
/**
* UDP报文格式
*/
public class UDPHeader {
/*
* UDP报文头部(8字节):
*
* ┌────────────────────────────────────────────────────┐
* │ 源端口 (16 bits) │ 目标端口 (16 bits) │
* ├────────────────────────────────────────────────────┤
* │ 长度 (16 bits) │ 校验和 (16 bits) │
* ├────────────────────────────────────────────────────┤
* │ 数据 │
* └────────────────────────────────────────────────────┘
*
* 字段说明:
* - 源端口:发送方端口号,可选(0表示不使用)
* - 目标端口:接收方端口号
* - 长度:UDP数据报总长度(头部+数据),最小8字节
* - 校验和:可选,用于检测错误
*/
}
```
### 3.3 UDP与TCP的本质区别
```java
/**
* UDP与TCP的本质区别
*/
public class TCPVsUDP {
public static void main(String[] args) {
System.out.println("【面向连接 vs 无连接】");
System.out.println("TCP:通信前需要建立连接,通信后需要释放连接");
System.out.println("UDP:无需建立连接,直接发送数据");
System.out.println();
System.out.println("【可靠传输 vs 不可靠传输】");
System.out.println("TCP:保证数据完整性、顺序性,有重传机制");
System.out.println("UDP:不保证数据完整性,可能丢包、乱序");
System.out.println();
System.out.println("【面向字节流 vs 面向报文】");
System.out.println("TCP:以字节流方式传输,应用层可以任意拆分和合并");
System.out.println("UDP:以报文方式传输,应用层需要自己处理分片");
System.out.println();
System.out.println("【流量控制和拥塞控制】");
System.out.println("TCP:有流量控制和拥塞控制机制");
System.out.println("UDP:无流量控制和拥塞控制");
System.out.println();
System.out.println("【通信方式】");
System.out.println("TCP:只能一对一通信");
System.out.println("UDP:支持一对一、一对多、多对多通信");
System.out.println();
System.out.println("【首部开销】");
System.out.println("TCP:20-60字节");
System.out.println("UDP:8字节");
System.out.println();
System.out.println("【传输效率】");
System.out.println("TCP:较低,有确认、重传等开销");
System.out.println("UDP:较高,无需确认和重传");
}
}
```
---
## 4. TCP vs UDP详细对比
### 4.1 详细对比表
```java
/**
* TCP vs UDP详细对比
*/
public class TCPVsUDPComparison {
public static void main(String[] args) {
System.out.println("┌─────────────────────────────────────────────────────────────────┐");
System.out.println("│ 对比项 │ TCP │ UDP │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 连接方式 │ 面向连接 │ 无连接 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 传输可靠性 │ 可靠 │ 不可靠 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 数据顺序 │ 保证 │ 不保证 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 流量控制 │ 有(滑动窗口) │ 无 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 拥塞控制 │ 有(慢启动、拥塞避免) │ 无 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 传输方式 │ 面向字节流 │ 面向报文 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 首部开销 │ 20-60字节 │ 8字节 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 传输效率 │ 较低 │ 较高 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 通信模式 │ 一对一 │ 一对一、多播 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 应用场景 │ HTTP、FTP、SMTP、SSH │ DNS、视频流 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 数据边界 │ 无 │ 有 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 重传机制 │ 有 │ 无 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 首部大小 │ 20字节最小 │ 8字节 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 建立连接 │ 需要三次握手 │ 不需要 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 释放连接 │ 需要四次挥手 │ 不需要 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 传输速度 │ 较慢 │ 较快 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 资源消耗 │ 较多 │ 较少 │");
System.out.println("├─────────────────────────────────────────────────────────────────┤");
System.out.println("│ 适用场景 │ 可靠性要求高 │ 实时性要求高 │");
System.out.println("└─────────────────────────────────────────────────────────────────┘");
}
}
```
### 4.2 选择TCP还是UDP
```java
/**
* 选择TCP还是UDP
*/
public class ProtocolSelection {
/**
* 选择TCP的场景:
* 1. 需要可靠传输(文件传输、邮件)
* 2. 需要保证顺序(HTTP、FTP)
* 3. 需要流量控制(防止接收方被淹没)
* 4. 需要拥塞控制(防止网络过载)
* 5. 数据量小但重要(HTTP请求)
*
* 选择UDP的场景:
* 1. 需要实时性(视频流、在线游戏)
* 2. 允许少量丢包(视频、音频)
* 3. 需要多播/广播(视频会议)
* 4. 数据量小但频繁(DNS查询)
* 5. 简单请求响应(DHCP、TFTP)
*/
public static String selectProtocol(boolean requireReliability,
boolean requireRealtime) {
if (requireReliability) {
return "TCP";
} else if (requireRealtime) {
return "UDP";
} else {
return "根据具体需求选择";
}
}
}
```
---
## 5. TCP协议封装实现
### 5.1 TCP服务器封装
```java
/**
* TCP服务器封装
*/
public class TCPServer {
private int port;
private ServerSocket serverSocket;
private volatile boolean running;
private ExecutorService executorService;
private TCPMessageHandler messageHandler;
public TCPServer(int port) {
this.port = port;
this.executorService = Executors.newCachedThreadPool();
}
/**
* 设置消息处理器
*/
public void setMessageHandler(TCPMessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
/**
* 启动服务器
*/
public void start() throws IOException {
serverSocket = new ServerSocket(port);
running = true;
System.out.println("TCP服务器启动,监听端口:" + port);
new Thread(this::acceptLoop).start();
}
/**
* 接受连接循环
*/
private void acceptLoop() {
while (running) {
try {
Socket clientSocket = serverSocket.accept();
String clientAddress = clientSocket.getInetAddress().getHostAddress()
+ ":" + clientSocket.getPort();
System.out.println("客户端连接:" + clientAddress);
// 为每个客户端创建独立的线程处理
executorService.submit(() -> handleClient(clientSocket));
} catch (IOException e) {
if (running) {
System.err.println("接受连接失败:" + e.getMessage());
}
}
}
}
/**
* 处理客户端
*/
private void handleClient(Socket clientSocket) {
String clientAddress = clientSocket.getInetAddress().getHostAddress()
+ ":" + clientSocket.getPort();
try (InputStream input = clientSocket.getInputStream();
OutputStream output = clientSocket.getOutputStream()) {
// 读取数据
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
String message;
while ((message = reader.readLine()) != null) {
System.out.println("收到来自 " + clientAddress + " 的消息:" + message);
// 处理消息
String response = null;
if (messageHandler != null) {
response = messageHandler.handle(message, clientSocket);
} else {
response = "Echo: " + message;
}
// 发送响应
if (response != null) {
output.write((response + "\n").getBytes());
output.flush();
}
}
} catch (IOException e) {
System.err.println("处理客户端异常:" + clientAddress + "," + e.getMessage());
} finally {
try {
clientSocket.close();
System.out.println("客户端断开:" + clientAddress);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 停止服务器
*/
public void stop() {
running = false;
if (serverSocket != null && !serverSocket.isClosed()) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (executorService != null) {
executorService.shutdown();
}
System.out.println("TCP服务器已停止");
}
/**
* TCP消息处理器接口
*/
public interface TCPMessageHandler {
String handle(String message, Socket clientSocket);
}
}
```
### 5.2 TCP客户端封装
```java
/**
* TCP客户端封装
*/
public class TCPClient {
private String host;
private int port;
private Socket socket;
private InputStream input;
private OutputStream output;
private BufferedReader reader;
public TCPClient(String host, int port) {
this.host = host;
this.port = port;
}
/**
* 连接服务器
*/
public void connect() throws IOException {
socket = new Socket(host, port);
input = socket.getInputStream();
output = socket.getOutputStream();
reader = new BufferedReader(new InputStreamReader(input));
System.out.println("已连接到服务器:" + host + ":" + port);
}
/**
* 发送消息
*/
public String send(String message) throws IOException {
if (socket == null || socket.isClosed()) {
throw new IOException("未连接到服务器");
}
// 发送消息
output.write((message + "\n").getBytes());
output.flush();
System.out.println("发送消息:" + message);
// 读取响应
String response = reader.readLine();
System.out.println("收到响应:" + response);
return response;
}
/**
* 发送消息并等待响应(超时)
*/
public String sendWithTimeout(String message, int timeoutMillis) throws IOException {
if (socket == null || socket.isClosed()) {
throw new IOException("未连接到服务器");
}
// 设置读取超时
socket.setSoTimeout(timeoutMillis);
// 发送消息
output.write((message + "\n").getBytes());
output.flush();
System.out.println("发送消息:" + message);
// 读取响应
String response = null;
try {
response = reader.readLine();
System.out.println("收到响应:" + response);
} catch (SocketTimeoutException e) {
System.out.println("读取响应超时");
}
return response;
}
/**
* 批量发送消息
*/
public List sendBatch(List messages) throws IOException {
if (socket == null || socket.isClosed()) {
throw new IOException("未连接到服务器");
}
List responses = new ArrayList<>();
for (String message : messages) {
String response = send(message);
if (response != null) {
responses.add(response);
}
}
return responses;
}
/**
* 断开连接
*/
public void disconnect() {
try {
if (reader != null) {
reader.close();
}
if (input != null) {
input.close();
}
if (output != null) {
output.close();
}
if (socket != null && !socket.isClosed()) {
socket.close();
}
System.out.println("已断开与服务器的连接");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 检查是否已连接
*/
public boolean isConnected() {
return socket != null && socket.isConnected() && !socket.isClosed();
}
}
```
### 5.3 TCP使用示例
```java
/**
* TCP使用示例
*/
public class TCPExample {
public static void main(String[] args) {
// 启动服务器
TCPServer server = new TCPServer(8080);
server.setMessageHandler((message, clientSocket) -> {
// 业务处理
if ("ping".equals(message)) {
return "pong";
} else if (message.startsWith("echo ")) {
return message.substring(5);
} else {
return "Received: " + message;
}
});
new Thread(() -> {
try {
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
// 等待服务器启动
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 创建客户端
TCPClient client = new TCPClient("localhost", 8080);
try {
// 连接服务器
client.connect();
// 发送消息
String response1 = client.send("ping");
System.out.println("响应1:" + response1);
String response2 = client.send("echo Hello World");
System.out.println("响应2:" + response2);
// 批量发送
List messages = Arrays.asList("msg1", "msg2", "msg3");
List responses = client.sendBatch(messages);
System.out.println("批量响应:" + responses);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 断开连接
client.disconnect();
// 停止服务器
server.stop();
}
}
}
```
---
## 6. UDP协议封装实现
### 6.1 UDP服务器封装
```java
/**
* UDP服务器封装
*/
public class UDPServer {
private int port;
private DatagramSocket socket;
private volatile boolean running;
private ExecutorService executorService;
private UDPMessageHandler messageHandler;
public UDPServer(int port) {
this.port = port;
this.executorService = Executors.newCachedThreadPool();
}
/**
* 设置消息处理器
*/
public void setMessageHandler(UDPMessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
/**
* 启动服务器
*/
public void start() throws SocketException {
socket = new DatagramSocket(port);
running = true;
System.out.println("UDP服务器启动,监听端口:" + port);
new Thread(this::receiveLoop).start();
}
/**
* 接收数据循环
*/
private void receiveLoop() {
byte[] buffer = new byte[1024];
while (running) {
try {
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
// 异步处理消息
executorService.submit(() -> handleMessage(packet));
} catch (IOException e) {
if (running) {
System.err.println("接收数据失败:" + e.getMessage());
}
}
}
}
/**
* 处理消息
*/
private void handleMessage(DatagramPacket packet) {
String clientAddress = packet.getAddress().getHostAddress()
+ ":" + packet.getPort();
String message = new String(packet.getData(), 0, packet.getLength());
System.out.println("收到来自 " + clientAddress + " 的消息:" + message);
// 处理消息
String response = null;
if (messageHandler != null) {
response = messageHandler.handle(message, packet);
} else {
response = "Echo: " + message;
}
// 发送响应
if (response != null) {
try {
byte[] data = response.getBytes();
DatagramPacket responsePacket = new DatagramPacket(
data,
data.length,
packet.getAddress(),
packet.getPort()
);
socket.send(responsePacket);
System.out.println("发送响应到 " + clientAddress + ":" + response);
} catch (IOException e) {
System.err.println("发送响应失败:" + e.getMessage());
}
}
}
/**
* 广播消息
*/
public void broadcast(String message, int broadcastPort) throws IOException {
byte[] data = message.getBytes();
InetAddress broadcastAddress = InetAddress.getByName("255.255.255.255");
DatagramPacket packet = new DatagramPacket(data, data.length, broadcastAddress, broadcastPort);
socket.send(packet);
System.out.println("广播消息:" + message);
}
/**
* 停止服务器
*/
public void stop() {
running = false;
if (socket != null && !socket.isClosed()) {
socket.close();
}
if (executorService != null) {
executorService.shutdown();
}
System.out.println("UDP服务器已停止");
}
/**
* UDP消息处理器接口
*/
public interface UDPMessageHandler {
String handle(String message, DatagramPacket packet);
}
}
```
### 6.2 UDP客户端封装
```java
/**
* UDP客户端封装
*/
public class UDPClient {
private DatagramSocket socket;
private String host;
private int port;
public UDPClient() throws SocketException {
this.socket = new DatagramSocket();
}
/**
* 发送消息
*/
public String send(String message, String host, int port) throws IOException {
this.host = host;
this.port = port;
// 发送消息
byte[] data = message.getBytes();
InetAddress address = InetAddress.getByName(host);
DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
socket.send(packet);
System.out.println("发送消息到 " + host + ":" + port + ":" + message);
// 接收响应(可选)
byte[] buffer = new byte[1024];
DatagramPacket responsePacket = new DatagramPacket(buffer, buffer.length);
try {
socket.setSoTimeout(5000); // 5秒超时
socket.receive(responsePacket);
String response = new String(responsePacket.getData(),
0, responsePacket.getLength());
System.out.println("收到响应:" + response);
return response;
} catch (SocketTimeoutException e) {
System.out.println("接收响应超时");
return null;
}
}
/**
* 发送消息(不等待响应)
*/
public void sendWithoutResponse(String message, String host, int port) throws IOException {
byte[] data = message.getBytes();
InetAddress address = InetAddress.getByName(host);
DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
socket.send(packet);
System.out.println("发送消息到 " + host + ":" + port + ":" + message);
}
/**
* 批量发送消息
*/
public void sendBatch(List messages, String host, int port) throws IOException {
for (String message : messages) {
sendWithoutResponse(message, host, port);
}
}
/**
* 接收消息
*/
public String receive() throws IOException {
byte[] buffer = new byte[1024];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
String message = new String(packet.getData(), 0, packet.getLength());
System.out.println("收到来自 " + packet.getAddress().getHostAddress()
+ ":" + packet.getPort() + " 的消息:" + message);
return message;
}
/**
* 接收消息(阻塞,直到收到消息)
*/
public String receiveBlocking() throws IOException {
return receive();
}
/**
* 接收消息(非阻塞)
*/
public String receiveNonBlocking() throws IOException {
socket.setSoTimeout(100); // 100毫秒超时
return receive();
}
/**
* 关闭客户端
*/
public void close() {
if (socket != null && !socket.isClosed()) {
socket.close();
System.out.println("UDP客户端已关闭");
}
}
}
```
### 6.3 UDP使用示例
```java
/**
* UDP使用示例
*/
public class UDPExample {
public static void main(String[] args) {
// 启动服务器
UDPServer server = new UDPServer(8080);
server.setMessageHandler((message, packet) -> {
// 业务处理
if ("ping".equals(message)) {
return "pong";
} else if (message.startsWith("echo ")) {
return message.substring(5);
} else {
return "Received: " + message;
}
});
new Thread(() -> {
try {
server.start();
} catch (SocketException e) {
e.printStackTrace();
}
}).start();
// 等待服务器启动
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 创建客户端
UDPClient client = null;
try {
client = new UDPClient();
// 发送消息
String response1 = client.send("ping", "localhost", 8080);
System.out.println("响应1:" + response1);
String response2 = client.send("echo Hello World", "localhost", 8080);
System.out.println("响应2:" + response2);
// 发送消息不等待响应
client.sendWithoutResponse("Hello", "localhost", 8080);
// 批量发送
List messages = Arrays.asList("msg1", "msg2", "msg3");
client.sendBatch(messages, "localhost", 8080);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭客户端
if (client != null) {
client.close();
}
// 停止服务器
server.stop();
}
}
}
```
---
## 7. 应用实战:文件传输
### 7.1 TCP文件传输服务器
```java
/**
* TCP文件传输服务器
*/
public class TCPFileServer {
private int port;
private ServerSocket serverSocket;
private volatile boolean running;
private String savePath;
public TCPFileServer(int port, String savePath) {
this.port = port;
this.savePath = savePath;
}
/**
* 启动服务器
*/
public void start() throws IOException {
serverSocket = new ServerSocket(port);
running = true;
// 创建保存目录
File dir = new File(savePath);
if (!dir.exists()) {
dir.mkdirs();
}
System.out.println("TCP文件传输服务器启动,监听端口:" + port);
System.out.println("文件保存路径:" + savePath);
new Thread(this::acceptLoop).start();
}
/**
* 接受连接循环
*/
private void acceptLoop() {
while (running) {
try {
Socket clientSocket = serverSocket.accept();
String clientAddress = clientSocket.getInetAddress().getHostAddress()
+ ":" + clientSocket.getPort();
System.out.println("客户端连接:" + clientAddress);
// 处理文件上传
handleFileUpload(clientSocket);
} catch (IOException e) {
if (running) {
System.err.println("接受连接失败:" + e.getMessage());
}
}
}
}
/**
* 处理文件上传
*/
private void handleFileUpload(Socket clientSocket) {
try (InputStream input = clientSocket.getInputStream();
DataInputStream dataInput = new DataInputStream(input);
OutputStream output = clientSocket.getOutputStream()) {
// 读取文件名
String fileName = dataInput.readUTF();
System.out.println("接收文件:" + fileName);
// 读取文件大小
long fileSize = dataInput.readLong();
System.out.println("文件大小:" + fileSize + " 字节");
// 创建文件输出流
String filePath = savePath + File.separator + fileName;
try (FileOutputStream fileOutput = new FileOutputStream(filePath)) {
// 读取文件数据
byte[] buffer = new byte[8192];
long totalRead = 0;
int bytesRead;
while ((bytesRead = dataInput.read(buffer)) != -1) {
fileOutput.write(buffer, 0, bytesRead);
totalRead += bytesRead;
// 显示进度
double progress = (double) totalRead / fileSize * 100;
System.out.printf("接收进度: %.2f%%\r", progress);
}
System.out.println("\n文件接收完成:" + filePath);
}
// 发送确认
output.write("OK".getBytes());
output.flush();
} catch (IOException e) {
System.err.println("处理文件上传失败:" + e.getMessage());
} finally {
try {
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 停止服务器
*/
public void stop() {
running = false;
if (serverSocket != null && !serverSocket.isClosed()) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("TCP文件传输服务器已停止");
}
}
```
### 7.2 TCP文件传输客户端
```java
/**
* TCP文件传输客户端
*/
public class TCPFileClient {
/**
* 上传文件
*/
public static void uploadFile(String host, int port, String filePath) throws IOException {
File file = new File(filePath);
if (!file.exists()) {
throw new IOException("文件不存在:" + filePath);
}
System.out.println("开始上传文件:" + filePath);
System.out.println("文件大小:" + file.length() + " 字节");
try (Socket socket = new Socket(host, port);
InputStream input = new FileInputStream(file);
DataInputStream dataInput = new DataInputStream(input);
OutputStream output = socket.getOutputStream();
DataOutputStream dataOutput = new DataOutputStream(output);
InputStream responseInput = socket.getInputStream()) {
// 发送文件名
dataOutput.writeUTF(file.getName());
// 发送文件大小
dataOutput.writeLong(file.length());
dataOutput.flush();
// 发送文件数据
byte[] buffer = new byte[8192];
long totalSent = 0;
int bytesRead;
while ((bytesRead = dataInput.read(buffer)) != -1) {
dataOutput.write(buffer, 0, bytesRead);
totalSent += bytesRead;
// 显示进度
double progress = (double) totalSent / file.length() * 100;
System.out.printf("发送进度: %.2f%%\r", progress);
}
dataOutput.flush();
System.out.println("\n文件发送完成");
// 接收确认
byte[] response = new byte[2];
responseInput.read(response);
System.out.println("服务器确认:" + new String(response));
} catch (IOException e) {
System.err.println("上传文件失败:" + e.getMessage());
throw e;
}
}
}
```
### 7.3 文件传输示例
```java
/**
* 文件传输示例
*/
public class FileTransferExample {
public static void main(String[] args) {
// 启动文件服务器
TCPFileServer server = new TCPFileServer(8080, "./uploads");
new Thread(() -> {
try {
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
// 等待服务器启动
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 上传文件
try {
// 创建测试文件
String testFile = "./test.txt";
try (FileWriter writer = new FileWriter(testFile)) {
writer.write("这是一个测试文件\n");
writer.write("用于测试TCP文件传输\n");
writer.write("Hello World!");
}
// 上传文件
TCPFileClient.uploadFile("localhost", 8080, testFile);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 停止服务器
server.stop();
}
}
}
```
---
## 8. 应用实战:即时通讯
### 8.1 TCP即时通讯服务器
```java
/**
* TCP即时通讯服务器
*/
public class TCPChatServer {
private int port;
private ServerSocket serverSocket;
private volatile boolean running;
private Map clients = new ConcurrentHashMap<>();
private Map userNames = new ConcurrentHashMap<>();
public TCPChatServer(int port) {
this.port = port;
}
/**
* 启动服务器
*/
public void start() throws IOException {
serverSocket = new ServerSocket(port);
running = true;
System.out.println("TCP即时通讯服务器启动,监听端口:" + port);
new Thread(this::acceptLoop).start();
}
/**
* 接受连接循环
*/
private void acceptLoop() {
while (running) {
try {
Socket clientSocket = serverSocket.accept();
String clientId = generateClientId();
clients.put(clientId, clientSocket);
System.out.println("客户端连接:" + clientId);
// 处理客户端消息
new Thread(() -> handleClient(clientId, clientSocket)).start();
} catch (IOException e) {
if (running) {
System.err.println("接受连接失败:" + e.getMessage());
}
}
}
}
/**
* 处理客户端
*/
private void handleClient(String clientId, Socket clientSocket) {
try (InputStream input = clientSocket.getInputStream();
OutputStream output = clientSocket.getOutputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input))) {
String message;
while ((message = reader.readLine()) != null) {
System.out.println("收到来自 " + clientId + " 的消息:" + message);
// 解析消息
String[] parts = message.split(":", 2);
if (parts.length >= 2) {
String command = parts[0];
String content = parts[1];
switch (command) {
case "LOGIN":
userNames.put(clientId, content);
broadcastMessage("SYSTEM:" + content + " 加入聊天室");
break;
case "CHAT":
String userName = userNames.getOrDefault(clientId, "匿名");
broadcastMessage("CHAT:" + userName + ":" + content);
break;
case "PRIVATE":
// 私聊消息格式: PRIVATE:目标用户:消息内容
String[] privateParts = content.split(":", 2);
if (privateParts.length >= 2) {
sendPrivateMessage(clientId, privateParts[0], privateParts[1]);
}
break;
case "LOGOUT":
String logoutUser = userNames.get(clientId);
userNames.remove(clientId);
clients.remove(clientId);
broadcastMessage("SYSTEM:" + logoutUser + " 离开聊天室");
return;
}
}
}
} catch (IOException e) {
System.err.println("处理客户端异常:" + e.getMessage());
} finally {
clients.remove(clientId);
userNames.remove(clientId);
try {
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 广播消息
*/
private void broadcastMessage(String message) {
for (Socket socket : clients.values()) {
try {
OutputStream output = socket.getOutputStream();
output.write((message + "\n").getBytes());
output.flush();
} catch (IOException e) {
System.err.println("广播消息失败:" + e.getMessage());
}
}
}
/**
* 发送私聊消息
*/
private void sendPrivateMessage(String fromClientId, String targetUser, String message) {
String fromUser = userNames.getOrDefault(fromClientId, "匿名");
for (Map.Entry entry : userNames.entrySet()) {
if (entry.getValue().equals(targetUser)) {
Socket targetSocket = clients.get(entry.getKey());
if (targetSocket != null) {
try {
OutputStream output = targetSocket.getOutputStream();
output.write(("PRIVATE:" + fromUser + ":" + message + "\n").getBytes());
output.flush();
} catch (IOException e) {
System.err.println("发送私聊消息失败:" + e.getMessage());
}
}
break;
}
}
}
/**
* 生成客户端ID
*/
private String generateClientId() {
return UUID.randomUUID().toString().substring(0, 8);
}
/**
* 停止服务器
*/
public void stop() {
running = false;
if (serverSocket != null && !serverSocket.isClosed()) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("TCP即时通讯服务器已停止");
}
}
```
### 8.2 TCP即时通讯客户端
```java
/**
* TCP即时通讯客户端
*/
public class TCPChatClient {
private String host;
private int port;
private Socket socket;
private BufferedReader reader;
private PrintWriter writer;
private volatile boolean running;
private String userName;
public TCPChatClient(String host, int port, String userName) {
this.host = host;
this.port = port;
this.userName = userName;
}
/**
* 连接服务器
*/
public void connect() throws IOException {
socket = new Socket(host, port);
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new PrintWriter(socket.getOutputStream(), true);
// 登录
writer.println("LOGIN:" + userName);
System.out.println("已连接到聊天服务器:" + host + ":" + port);
System.out.println("欢迎 " + userName + " 加入聊天室!");
}
/**
* 发送消息
*/
public void sendMessage(String message) {
if (writer != null) {
writer.println("CHAT:" + message);
}
}
/**
* 发送私聊消息
*/
public void sendPrivateMessage(String targetUser, String message) {
if (writer != null) {
writer.println("PRIVATE:" + targetUser + ":" + message);
}
}
/**
* 开始接收消息
*/
public void startReceive() {
running = true;
new Thread(() -> {
try {
String message;
while (running && (message = reader.readLine()) != null) {
String[] parts = message.split(":", 2);
if (parts.length >= 2) {
String command = parts[0];
String content = parts[1];
switch (command) {
case "SYSTEM":
System.out.println("[系统] " + content);
break;
case "CHAT":
String[] chatParts = content.split(":", 2);
if (chatParts.length >= 2) {
System.out.println("[" + chatParts[0] + "] " + chatParts[1]);
}
break;
case "PRIVATE":
String[] privateParts = content.split(":", 2);
if (privateParts.length >= 2) {
System.out.println("[私聊] " + privateParts[0] + " 说: " + privateParts[1]);
}
break;
}
}
}
} catch (IOException e) {
if (running) {
System.err.println("接收消息失败:" + e.getMessage());
}
}
}).start();
}
/**
* 启动控制台输入
*/
public void startConsoleInput() {
Scanner scanner = new Scanner(System.in);
System.out.println("输入消息(格式:msg 或 私聊 用户名 msg,输入 exit 退出):");
while (running) {
String input = scanner.nextLine();
if ("exit".equalsIgnoreCase(input)) {
logout();
break;
} else if (input.startsWith("私聊 ")) {
String[] parts = input.substring(3).split(" ", 2);
if (parts.length >= 2) {
sendPrivateMessage(parts[0], parts[1]);
}
} else {
sendMessage(input);
}
}
scanner.close();
}
/**
* 登出
*/
public void logout() {
running = false;
if (writer != null) {
writer.println("LOGOUT:");
}
disconnect();
}
/**
* 断开连接
*/
public void disconnect() {
running = false;
try {
if (reader != null) {
reader.close();
}
if (writer != null) {
writer.close();
}
if (socket != null && !socket.isClosed()) {
socket.close();
}
System.out.println("已断开与服务器的连接");
} catch (IOException e) {
e.printStackTrace();
}
}
}
```
### 8.3 即时通讯示例
```java
/**
* 即时通讯示例
*/
public class ChatExample {
public static void main(String[] args) {
// 启动服务器
TCPChatServer server = new TCPChatServer(8080);
new Thread(() -> {
try {
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
// 等待服务器启动
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 创建客户端
TCPChatClient client1 = new TCPChatClient("localhost", 8080, "张三");
TCPChatClient client2 = new TCPChatClient("localhost", 8080, "李四");
try {
// 连接服务器
client1.connect();
client2.connect();
// 开始接收消息
client1.startReceive();
client2.startReceive();
// 发送消息
Thread.sleep(500);
client1.sendMessage("大家好!");
Thread.sleep(500);
client2.sendMessage("你好张三!");
Thread.sleep(500);
client1.sendPrivateMessage("李四", "你好李四,我是张三");
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
// 断开连接
client1.disconnect();
client2.disconnect();
// 停止服务器
server.stop();
}
}
}
```
---
## 9. 应用实战:视频流传输
### 9.1 UDP视频流服务器
```java
/**
* UDP视频流服务器
*/
public class UDPVideoStreamServer {
private int port;
private DatagramSocket socket;
private volatile boolean running;
private List clients = new ArrayList<>();
private int frameRate = 30; // 帧率
private int frameSize = 1024; // 帧大小
public UDPVideoStreamServer(int port) {
this.port = port;
}
/**
* 启动服务器
*/
public void start() throws SocketException {
socket = new DatagramSocket(port);
running = true;
System.out.println("UDP视频流服务器启动,监听端口:" + port);
// 启动接收线程
new Thread(this::receiveLoop).start();
// 启动发送线程
new Thread(this::sendLoop).start();
}
/**
* 接收循环
*/
private void receiveLoop() {
byte[] buffer = new byte[1024];
while (running) {
try {
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
String message = new String(packet.getData(), 0, packet.getLength());
// 处理客户端连接
if ("CONNECT".equals(message)) {
InetSocketAddress clientAddress = new InetSocketAddress(
packet.getAddress(), packet.getPort());
if (!clients.contains(clientAddress)) {
clients.add(clientAddress);
System.out.println("客户端连接:" + clientAddress);
}
}
} catch (IOException e) {
if (running) {
System.err.println("接收数据失败:" + e.getMessage());
}
}
}
}
/**
* 发送循环
*/
private void sendLoop() {
int frameNumber = 0;
long frameInterval = 1000 / frameRate; // 帧间隔(毫秒)
while (running) {
if (clients.isEmpty()) {
try {
Thread.sleep(100);
continue;
} catch (InterruptedException e) {
break;
}
}
try {
// 模拟生成视频帧
byte[] frameData = generateFrame(frameNumber);
// 发送给所有客户端
for (InetSocketAddress client : clients) {
DatagramPacket packet = new DatagramPacket(
frameData, frameData.length, client.getAddress(), client.getPort());
socket.send(packet);
}
frameNumber++;
// 控制帧率
Thread.sleep(frameInterval);
} catch (IOException | InterruptedException e) {
if (running) {
System.err.println("发送视频帧失败:" + e.getMessage());
}
}
}
}
/**
* 生成视频帧(模拟)
*/
private byte[] generateFrame(int frameNumber) {
// 实际应用中,这里应该从摄像头或视频文件读取数据
String frameInfo = String.format("FRAME:%08d:DATA", frameNumber);
return frameInfo.getBytes();
}
/**
* 设置帧率
*/
public void setFrameRate(int frameRate) {
this.frameRate = frameRate;
}
/**
* 停止服务器
*/
public void stop() {
running = false;
if (socket != null && !socket.isClosed()) {
socket.close();
}
System.out.println("UDP视频流服务器已停止");
}
}
```
### 9.2 UDP视频流客户端
```java
/**
* UDP视频流客户端
*/
public class UDPVideoStreamClient {
private String host;
private int port;
private DatagramSocket socket;
private volatile boolean running;
private VideoFrameHandler frameHandler;
public UDPVideoStreamClient(String host, int port) {
this.host = host;
this.port = port;
}
/**
* 设置帧处理器
*/
public void setFrameHandler(VideoFrameHandler frameHandler) {
this.frameHandler = frameHandler;
}
/**
* 连接服务器
*/
public void connect() throws SocketException {
socket = new DatagramSocket();
running = true;
System.out.println("连接到视频流服务器:" + host + ":" + port);
// 发送连接请求
try {
byte[] data = "CONNECT".getBytes();
InetAddress address = InetAddress.getByName(host);
DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
socket.send(packet);
// 启动接收线程
new Thread(this::receiveLoop).start();
} catch (IOException e) {
System.err.println("连接服务器失败:" + e.getMessage());
throw new SocketException(e.getMessage());
}
}
/**
* 接收循环
*/
private void receiveLoop() {
byte[] buffer = new byte[1024];
int frameCount = 0;
while (running) {
try {
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.setSoTimeout(5000); // 5秒超时
socket.receive(packet);
String frameData = new String(packet.getData(), 0, packet.getLength());
frameCount++;
// 处理视频帧
if (frameHandler != null) {
frameHandler.handleFrame(frameCount, frameData);
}
} catch (SocketTimeoutException e) {
// 超时,继续
} catch (IOException e) {
if (running) {
System.err.println("接收视频帧失败:" + e.getMessage());
}
}
}
}
/**
* 停止客户端
*/
public void stop() {
running = false;
if (socket != null && !socket.isClosed()) {
socket.close();
}
System.out.println("UDP视频流客户端已停止");
}
/**
* 视频帧处理器接口
*/
public interface VideoFrameHandler {
void handleFrame(int frameNumber, String frameData);
}
}
```
### 9.3 视频流传输示例
```java
/**
* 视频流传输示例
*/
public class VideoStreamExample {
public static void main(String[] args) {
// 启动视频流服务器
UDPVideoStreamServer server = new UDPVideoStreamServer(8080);
server.setFrameRate(30); // 30帧/秒
new Thread(() -> {
try {
server.start();
} catch (SocketException e) {
e.printStackTrace();
}
}).start();
// 等待服务器启动
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 创建视频流客户端
UDPVideoStreamClient client = new UDPVideoStreamClient("localhost", 8080);
client.setFrameHandler((frameNumber, frameData) -> {
System.out.printf("收到第%d帧: %s\n", frameNumber, frameData);
});
try {
// 连接服务器
client.connect();
// 接收10秒视频流
Thread.sleep(10000);
} catch (SocketException | InterruptedException e) {
e.printStackTrace();
} finally {
// 停止客户端
client.stop();
// 停止服务器
server.stop();
}
}
}
```
---
## 10. 性能优化与调优
### 10.1 TCP性能优化
```java
/**
* TCP性能优化
*/
public class TCPPerformanceOptimization {
/**
* 优化Socket参数
*/
public static void optimizeSocket(Socket socket) throws SocketException {
// 1. 禁用Nagle算法(提高实时性)
socket.setTcpNoDelay(true);
// 2. 设置接收缓冲区大小
socket.setReceiveBufferSize(64 * 1024); // 64KB
// 3. 设置发送缓冲区大小
socket.setSendBufferSize(64 * 1024); // 64KB
// 4. 设置保持连接
socket.setKeepAlive(true);
// 5. 设置超时时间
socket.setSoTimeout(5000); // 5秒
}
/**
* 优化ServerSocket参数
*/
public static void optimizeServerSocket(ServerSocket serverSocket) throws SocketException {
// 1. 设置接收队列大小
serverSocket.setReceiveBufferSize(64 * 1024);
// 2. 设置重用地址
serverSocket.setReuseAddress(true);
// 3. 设置超时(accept超时)
serverSocket.setSoTimeout(5000);
}
/**
* 使用NIO提高性能
*/
public static void useNIO() throws IOException {
// 使用NIO提高并发性能
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(8080));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO TCP服务器启动,监听端口:8080");
while (true) {
selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = clientChannel.read(buffer);
if (read > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println("收到消息:" + new String(data));
} else if (read == -1) {
clientChannel.close();
}
}
}
}
}
}
```
### 10.2 UDP性能优化
```java
/**
* UDP性能优化
*/
public class UDPPerformanceOptimization {
/**
* 优化DatagramSocket参数
*/
public static void optimizeDatagramSocket(DatagramSocket socket) throws SocketException {
// 1. 设置接收缓冲区大小
socket.setReceiveBufferSize(64 * 1024); // 64KB
// 2. 设置发送缓冲区大小
socket.setSendBufferSize(64 * 1024); // 64KB
// 3. 设置超时时间
socket.setSoTimeout(5000); // 5秒
// 4. 设置重用地址
socket.setReuseAddress(true);
}
/**
* 使用多播
*/
public static void useMulticast() throws IOException {
// 创建多播Socket
MulticastSocket multicastSocket = new MulticastSocket(8080);
// 加入多播组
InetAddress groupAddress = InetAddress.getByName("239.255.255.250");
multicastSocket.joinGroup(groupAddress);
System.out.println("加入多播组:" + groupAddress);
// 接收多播消息
byte[] buffer = new byte[1024];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
multicastSocket.receive(packet);
String message = new String(packet.getData(), 0, packet.getLength());
System.out.println("收到多播消息:" + message);
// 离开多播组
multicastSocket.leaveGroup(groupAddress);
multicastSocket.close();
}
/**
* 批量发送
*/
public static void batchSend(DatagramSocket socket, List messages,
String host, int port) throws IOException {
InetAddress address = InetAddress.getByName(host);
for (String message : messages) {
byte[] data = message.getBytes();
DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
socket.send(packet);
}
System.out.println("批量发送完成,共" + messages.size() + "条消息");
}
}
```
### 10.3 性能对比
```java
/**
* TCP vs UDP性能对比
*/
public class PerformanceComparison {
public static void main(String[] args) throws IOException, InterruptedException {
int messageCount = 10000;
int messageSize = 1024; // 1KB
System.out.println("开始性能测试...");
System.out.println("消息数量:" + messageCount);
System.out.println("消息大小:" + messageSize + " 字节");
System.out.println();
// TCP性能测试
long tcpStartTime = System.currentTimeMillis();
testTCPPerformance(messageCount, messageSize);
long tcpEndTime = System.currentTimeMillis();
System.out.println("TCP耗时:" + (tcpEndTime - tcpStartTime) + " 毫秒");
System.out.println();
// UDP性能测试
long udpStartTime = System.currentTimeMillis();
testUDPPerformance(messageCount, messageSize);
long udpEndTime = System.currentTimeMillis();
System.out.println("UDP耗时:" + (udpEndTime - udpStartTime) + " 毫秒");
}
private static void testTCPPerformance(int messageCount, int messageSize)
throws IOException, InterruptedException {
// 启动TCP服务器
TCPServer server = new TCPServer(8080);
server.setMessageHandler((message, clientSocket) -> "OK");
new Thread(() -> {
try {
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(100);
// 创建TCP客户端
TCPClient client = new TCPClient("localhost", 8080);
client.connect();
// 发送消息
String message = "A".repeat(messageSize);
for (int i = 0; i < messageCount; i++) {
client.send(message);
}
client.disconnect();
server.stop();
}
private static void testUDPPerformance(int messageCount, int messageSize)
throws IOException, InterruptedException {
// 启动UDP服务器
UDPServer server = new UDPServer(8081);
server.setMessageHandler((msg, packet) -> "OK");
new Thread(() -> {
try {
server.start();
} catch (SocketException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(100);
// 创建UDP客户端
UDPClient client = new UDPClient();
// 发送消息
String message = "A".repeat(messageSize);
for (int i = 0; i < messageCount; i++) {
client.sendWithoutResponse(message, "localhost", 8081);
}
client.close();
server.stop();
}
}
```
---
## 11. 总结与最佳实践
### 11.1 核心要点总结
#### TCP核心要点
1. **面向连接**:三次握手建立连接,四次挥手释放连接
2. **可靠传输**:确认机制、重传机制、序号机制
3. **流量控制**:滑动窗口机制
4. **拥塞控制**:慢启动、拥塞避免、快重传、快恢复
5. **全双工**:支持双向同时传输
6. **应用场景**:HTTP、FTP、SMTP、SSH、数据库连接
#### UDP核心要点
1. **无连接**:无需建立和释放连接
2. **不可靠传输**:不保证数据完整性和顺序性
3. **高效传输**:头部开销小,传输效率高
4. **支持多播**:支持一对多、多对多通信
5. **应用场景**:DNS、视频流、在线游戏、实时通讯
### 11.2 选择建议
```java
/**
* 选择建议
*/
public class SelectionGuide {
/**
* 选择TCP的场景:
* 1. 需要可靠传输(文件传输、邮件)
* 2. 需要保证顺序(HTTP、FTP)
* 3. 需要流量控制(防止接收方被淹没)
* 4. 需要拥塞控制(防止网络过载)
* 5. 数据量小但重要(HTTP请求)
*
* 选择UDP的场景:
* 1. 需要实时性(视频流、在线游戏)
* 2. 允许少量丢包(视频、音频)
* 3. 需要多播/广播(视频会议)
* 4. 数据量小但频繁(DNS查询)
* 5. 简单请求响应(DHCP、TFTP)
*/
}
```
### 11.3 最佳实践
#### TCP最佳实践
1. **连接管理**
- 使用连接池提高性能
- 及时释放连接
- 设置合理的超时时间
2. **参数优化**
- 调整缓冲区大小
- 启用TCP_NODELAY提高实时性
- 启用SO_KEEPALIVE保持连接
3. **错误处理**
- 完善的异常处理
- 连接重试机制
- 超时处理
4. **性能优化**
- 使用NIO提高并发性能
- 使用异步IO
- 批量处理
#### UDP最佳实践
1. **可靠性**
- 应用层实现确认机制
- 应用层实现重传机制
- 使用序列号保证顺序
2. **性能优化**
- 调整缓冲区大小
- 批量发送
- 使用多播
3. **安全考虑**
- 使用UDP over TLS(DTLS)
- 数据加密
- 身份认证
### 11.4 常见问题
```java
/**
* 常见问题与解决方案
*/
public class CommonProblems {
/**
* 问题1:粘包/拆包
* 解决方案:
* - 固定长度
* - 分隔符
* - 长度字段
*/
/**
* 问题2:连接超时
* 解决方案:
* - 设置合理的超时时间
- 实现重试机制
- 使用心跳检测
*/
/**
* 问题3:性能瓶颈
* 解决方案:
* - 使用NIO
* - 使用连接池
* - 优化缓冲区大小
*/
/**
* 问题4:UDP丢包
* 解决方案:
* - 应用层实现确认重传
- 增加缓冲区大小
- 控制发送速率
*/
}
```
---
## 结语
通过本文的学习,你应该已经掌握了:
1. **TCP协议原理**:三次握手、四次挥手、滑动窗口、拥塞控制
2. **UDP协议原理**:报文格式、特点、应用场景
3. **详细对比**:TCP vs UDP的全面对比
4. **封装实现**:TCP/UDP的完整封装
5. **实战应用**:文件传输、即时通讯、视频流传输
6. **性能优化**:TCP/UDP的性能优化策略
TCP和UDP是传输层的两大核心协议,理解它们的工作原理和特性,对于网络编程至关重要。根据不同的应用场景选择合适的协议,才能构建出高性能、高可用的网络应用!
建议:
1. 深入学习TCP/IP协议栈
2. 实践更多网络编程项目
3. 学习NIO和Netty
4. 关注网络安全
5. 学习性能调优技巧