深入浅出:从进程、线程到协程的完整解析
# 深入浅出:从进程、线程到协程的完整解析
## 目录
1. [引言:并发编程的三大基石](#引言并发编程的三大基石)
2. [进程:操作系统资源分配的最小单位](#进程操作系统资源分配的最小单位)
3. [线程:CPU调度的最小单位](#线程cpu调度的最小单位)
4. [协程:用户态的轻量级线程](#协程用户态的轻量级线程)
5. [Java中的线程模型](#java中的线程模型)
6. [协程的实现与原理](#协程的实现与原理)
7. [性能对比与选择策略](#性能对比与选择策略)
8. [实战应用与最佳实践](#实战应用与最佳实践)
9. [总结](#总结)
---
## 引言:并发编程的三大基石
在开始深入之前,我们需要回答几个核心问题:
### 核心问题
1. **进程对应一个Java程序吗?**
- 答案:**是的,但不仅限于此**。一个Java程序通常对应一个JVM进程,但一个Java程序可能启动多个JVM进程。
2. **线程对应什么?**
- 答案:**线程是进程内部的执行流**。一个Java进程可以包含多个线程,每个线程共享进程的资源,但拥有独立的执行栈和程序计数器。
3. **协程又对应什么?**
- 答案:**协程是用户态的轻量级线程**。在Java中,协程对应的是**虚拟线程(Virtual Threads,Project Loom)**或**第三方库实现的协程**。
### 并发编程的三个层次
```
┌─────────────────────────────────────────────────────────┐
│ 应用程序层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 协程层 │ │ 线程层 │ │ 进程层 │ │
│ │ (用户态) │ │ (内核态) │ │ (操作系统) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
轻量级 中等 重量级
快速切换 上下文切换 进程切换
无需内核介入 需要内核调度 资源隔离
```
### 为什么需要理解三者的区别?
1. **性能优化**:选择合适的并发模型可以大幅提升系统性能
2. **资源利用**:合理分配系统资源,避免浪费
3. **系统设计**:根据业务需求选择合适的技术方案
4. **问题排查**:理解原理有助于快速定位并发问题
---
## 进程:操作系统资源分配的最小单位
### 什么是进程?
从操作系统的角度来看,进程是**程序的一次执行实例**,是操作系统进行资源分配和调度的基本单位。
### 进程的核心特征
```java
/**
* 进程的特征
*/
public class ProcessCharacteristics {
/**
* 1. 独立性
*
* 每个进程拥有独立的内存空间
* - 代码段(Text Segment)
* - 数据段(Data Segment)
* - 堆(Heap)
* - 栈(Stack)
*/
/**
* 2. 动态性
*
* 进程是动态创建、运行和销毁的
* 进程状态转换:
* 创建态 -> 就绪态 -> 运行态 -> 阻塞态 -> 终止态
*/
/**
* 3. 并发性
*
* 多个进程可以同时存在于系统中
* 通过时间片轮转实现并发执行
*/
/**
* 4. 异步性
*
* 进程的执行速度不可预测
* 受CPU调度、I/O等待等因素影响
*/
/**
* 5. 结构性
*
* 每个进程包含:
* - 进程控制块(PCB)
* - 程序段
* - 数据段
* - 堆栈段
*/
}
```
### 进程控制块(PCB)
进程控制块是操作系统管理进程的核心数据结构:
```java
/**
* 进程控制块(Process Control Block)
*
* PCB是进程存在的唯一标识,包含进程的所有信息
*/
public class ProcessControlBlock {
// 1. 进程标识符
private int pid; // 进程ID
private int parentPid; // 父进程ID
private String userId; // 用户ID
// 2. 处理器状态
private long programCounter; // 程序计数器
private long[] registers; // 通用寄存器
private long stackPointer; // 栈指针
private long basePointer; // 基址指针
private int processorState; // 处理器状态
// 3. 进程调度信息
private ProcessState state; // 进程状态
private int priority; // 优先级
private int schedulingPolicy; // 调度策略
private long waitingTime; // 等待时间
// 4. 进程控制信息
private int codeSegment; // 代码段地址
private int dataSegment; // 数据段地址
private int stackSegment; // 栈段地址
private long memoryLimit; // 内存限制
private int openFiles[]; // 打开的文件列表
private ProcessState[] childProcesses; // 子进程列表
// 5. 账户信息
private int uid; // 实际用户ID
private int gid; // 实际组ID
private int euid; // 有效用户ID
private int egid; // 有效组ID
public enum ProcessState {
NEW, // 新建态
READY, // 就绪态
RUNNING, // 运行态
BLOCKED, // 阻塞态
TERMINATED // 终止态
}
}
```
### 进程的内存布局
```
┌─────────────────────────────────────────────────────┐
│ 高地址 │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 栈(Stack) │ │
│ │ - 局部变量 │ │
│ │ - 方法参数 │ │
│ │ - 返回地址 │ │
│ │ - 向下增长 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ ──────────── │ │
│ │ 未使用的内存 │ │
│ │ ──────────── │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 堆(Heap) │ │
│ │ - 对象实例 │ │
│ │ - 数组 │ │
│ │ - 向上增长 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 数据段(Data Segment) │ │
│ │ - 全局变量 │ │
│ │ - 静态变量 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 代码段(Text Segment) │ │
│ │ - 程序代码 │ │
│ │ - 只读 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 低地址 │
└─────────────────────────────────────────────────────┘
```
### 进程间通信(IPC)
由于进程间内存隔离,需要特殊的通信机制:
```java
/**
* 进程间通信方式
*/
public class IPCMechanisms {
/**
* 1. 管道(Pipe)
*
* 匿名管道:用于父子进程间通信
* 命名管道:用于无亲缘关系的进程
*
* 特点:
* - 半双工通信
* - 基于字节流
* - 容量有限
*/
/**
* 2. 消息队列(Message Queue)
*
* 特点:
* - 全双工通信
* - 基于消息
* - 可以异步通信
*/
/**
* 3. 共享内存(Shared Memory)
*
* 特点:
* - 最快的IPC方式
* - 需要同步机制
* - 多个进程映射同一块物理内存
*/
/**
* 4. 信号量(Semaphore)
*
* 特点:
* - 用于进程同步
* - 不是真正的数据传输
* - 基于计数器
*/
/**
* 5. 套接字(Socket)
*
* 特点:
* - 可用于网络通信
* - 支持TCP/UDP
* - 跨主机通信
*/
}
```
### Java进程的实现
在Java中,一个JVM实例就是一个进程:
```java
/**
* Java进程示例
*/
public class JavaProcessDemo {
public static void main(String[] args) {
// 获取当前Java进程
ProcessHandle currentProcess = ProcessHandle.current();
System.out.println("当前进程PID: " + currentProcess.pid());
System.out.println("进程名称: " +
currentProcess.info().command().orElse("unknown"));
// 创建新的Java进程
createNewProcess();
// 父子进程通信
parentChildCommunication();
}
/**
* 创建新的Java进程
*/
private static void createNewProcess() {
try {
// 使用ProcessBuilder创建进程
ProcessBuilder pb = new ProcessBuilder(
"java", "-version"
);
// 启动进程
Process process = pb.start();
// 读取进程输出
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream())
);
String line;
while ((line = reader.readLine()) != null) {
System.out.println("子进程输出: " + line);
}
// 等待进程结束
int exitCode = process.waitFor();
System.out.println("进程退出码: " + exitCode);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 父子进程通信
*/
private static void parentChildCommunication() {
try {
ProcessBuilder pb = new ProcessBuilder(
"java",
"-cp",
".",
"com.example.ChildProcess"
);
// 重定向输入输出
pb.redirectErrorStream(true);
Process process = pb.start();
// 向子进程写入数据
try (PrintWriter writer = new PrintWriter(
process.getOutputStream(), true)) {
writer.println("Hello from parent process!");
writer.println("exit"); // 通知子进程退出
}
// 读取子进程响应
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
System.out.println("子进程: " + line);
}
}
process.waitFor();
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
### 进程的优缺点
**优点:**
1. **隔离性好**:进程间内存隔离,一个进程崩溃不影响其他进程
2. **安全性高**:防止一个进程恶意访问另一个进程的内存
3. **可移植性强**:进程可以在不同的操作系统上运行
4. **并发执行**:真正实现并行执行(多核CPU)
**缺点:**
1. **开销大**:进程创建、切换开销大
2. **通信复杂**:需要特殊的IPC机制
3. **资源占用多**:每个进程占用独立的内存空间
4. **共享困难**:进程间共享数据需要复杂的同步机制
---
## 线程:CPU调度的最小单位
### 什么是线程?
线程是进程内的一个执行单元,是CPU调度的最小单位。线程共享进程的资源,但拥有独立的执行栈和程序计数器。
### 线程与进程的关系
```
┌─────────────────────────────────────────────────────┐
│ 进程(资源容器) │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 线程1 │ │ 线程2 │ │
│ │ - 独立栈 │ │ - 独立栈 │ │
│ │ - 独立PC │ │ - 独立PC │ │
│ │ - 独立状态 │ │ - 独立状态 │ │
│ └──────────────┘ └──────────────┘ │
│ │
│ 共享资源: │
│ - 代码段 │
│ - 数据段 │
│ - 堆 │
│ - 文件描述符 │
│ - 信号处理 │
│ │
└─────────────────────────────────────────────────────┘
```
### 线程控制块(TCB)
```java
/**
* 线程控制块(Thread Control Block)
*/
public class ThreadControlBlock {
// 1. 线程标识符
private long threadId; // 线程ID
private String threadName; // 线程名称
private long processId; // 所属进程ID
// 2. 处理器状态
private long programCounter; // 程序计数器
private long stackPointer; // 栈指针
private long basePointer; // 基址指针
private int[] registers; // 通用寄存器
// 3. 线程调度信息
private ThreadState state; // 线程状态
private int priority; // 优先级
private long waitingTime; // 等待时间
private long executionTime; // 执行时间
// 4. 线程栈信息
private long stackBase; // 栈基址
private long stackLimit; // 栈大小
private long currentStackPointer; // 当前栈指针
// 5. 同步信息
private Object monitor; // 监视器对象
private int lockState; // 锁状态
private Object[] locks; // 持有的锁列表
// 6. 其他信息
private boolean isDaemon; // 是否为守护线程
private boolean isInterrupted; // 是否被中断
private ThreadGroup threadGroup; // 线程组
public enum ThreadState {
NEW, // 新建
RUNNABLE, // 可运行
BLOCKED, // 阻塞
WAITING, // 等待
TIMED_WAITING, // 定时等待
TERMINATED // 终止
}
}
```
### 线程的状态转换
```
┌─────────┐
│ NEW │
└────┬────┘
│ start()
↓
┌─────────┐
│RUNNABLE │◄─────────┐
└────┬────┘ │
│ │
┌───────┴───────┐ │
│ │ │
↓ ↓ │
┌─────────┐ ┌─────────┐ │
│ BLOCKED │ │WAITING │ │
└────┬────┘ └────┬────┘ │
│ │ │
│ │ │
└───────────────┘ │
│ notify() / notifyAll() / timeout
│ │
│ │
└──────────────┘
│
│ interrupt() / 完成
↓
┌──────────┐
│TERMINATED│
└──────────┘
```
### Java线程的创建与使用
```java
/**
* Java线程创建方式
*/
public class JavaThreadCreation {
/**
* 方式1:继承Thread类
*/
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread运行: " + getName());
}
}
/**
* 方式2:实现Runnable接口(推荐)
*/
static class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable运行: " +
Thread.currentThread().getName());
}
}
/**
* 方式3:实现Callable接口(可返回结果)
*/
static class MyCallable implements Callable {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "Callable返回结果";
}
}
/**
* 方式4:使用线程池(推荐)
*/
static class ThreadPoolExample {
private static final ExecutorService executor =
Executors.newFixedThreadPool(10);
public static void execute() {
// 提交Runnable
executor.submit(() -> {
System.out.println("线程池执行任务");
});
// 提交Callable
Future future = executor.submit(() -> {
return "异步任务结果";
});
try {
String result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
// 方式1
Thread thread1 = new MyThread();
thread1.start();
// 方式2
Thread thread2 = new Thread(new MyRunnable());
thread2.start();
// 方式3
ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(new MyCallable());
System.out.println(future.get());
executor.shutdown();
// 方式4
ThreadPoolExample.execute();
}
}
```
### 线程同步机制
```java
/**
* 线程同步机制
*/
public class ThreadSynchronization {
/**
* 1. synchronized关键字
*
* 实例方法锁:锁定当前实例
* 静态方法锁:锁定Class对象
* 代码块锁:锁定指定对象
*/
static class SynchronizedExample {
private int count = 0;
private final Object lock = new Object();
// 实例方法锁
public synchronized void increment() {
count++;
}
// 静态方法锁
public static synchronized void staticMethod() {
// 静态方法
}
// 代码块锁
public void blockLock() {
synchronized (lock) {
count++;
}
}
}
/**
* 2. ReentrantLock(可重入锁)
*/
static class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public void tryLock() {
if (lock.tryLock()) {
try {
// 执行操作
} finally {
lock.unlock();
}
} else {
// 获取锁失败
}
}
}
/**
* 3. ReadWriteLock(读写锁)
*/
static class ReadWriteLockExample {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private Map cache = new HashMap<>();
public String get(String key) {
readLock.lock();
try {
return cache.get(key);
} finally {
readLock.unlock();
}
}
public void put(String key, String value) {
writeLock.lock();
try {
cache.put(key, value);
} finally {
writeLock.unlock();
}
}
}
/**
* 4. volatile关键字
*
* 保证可见性,不保证原子性
*/
static class VolatileExample {
private volatile boolean flag = false;
public void setFlag(boolean flag) {
this.flag = flag;
}
public boolean isFlag() {
return flag;
}
}
/**
* 5. AtomicInteger(原子类)
*
* 基于CAS(Compare-And-Swap)实现
*/
static class AtomicExample {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int get() {
return count.get();
}
}
}
```
### 线程通信机制
```java
/**
* 线程通信机制
*/
public class ThreadCommunication {
/**
* 1. wait/notify/notifyAll
*/
static class WaitNotifyExample {
private final Object lock = new Object();
private boolean condition = false;
public void waitForCondition() throws InterruptedException {
synchronized (lock) {
while (!condition) {
lock.wait();
}
// 条件满足,继续执行
}
}
public void notifyCondition() {
synchronized (lock) {
condition = true;
lock.notifyAll();
}
}
}
/**
* 2. CountDownLatch(倒计时门栓)
*/
static class CountDownLatchExample {
private final CountDownLatch latch = new CountDownLatch(3);
public void worker() {
try {
// 工作线程执行
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
public void await() throws InterruptedException {
latch.await(); // 等待所有工作线程完成
System.out.println("所有工作线程已完成");
}
}
/**
* 3. CyclicBarrier(循环栅栏)
*/
static class CyclicBarrierExample {
private final CyclicBarrier barrier = new CyclicBarrier(3,
() -> System.out.println("所有线程到达栅栏")
);
public void worker() {
try {
// 执行工作
barrier.await(); // 等待其他线程
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 4. Semaphore(信号量)
*/
static class SemaphoreExample {
private final Semaphore semaphore = new Semaphore(3); // 允许3个线程同时访问
public void access() {
try {
semaphore.acquire(); // 获取许可
try {
// 访问共享资源
} finally {
semaphore.release(); // 释放许可
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 5. BlockingQueue(阻塞队列)
*/
static class BlockingQueueExample {
private final BlockingQueue queue = new LinkedBlockingQueue<>(10);
public void producer() throws InterruptedException {
queue.put("product"); // 如果队列满,会阻塞
}
public String consumer() throws InterruptedException {
return queue.take(); // 如果队列空,会阻塞
}
}
}
```
### 线程的优缺点
**优点:**
1. **轻量级**:线程创建和切换开销比进程小得多
2. **共享资源**:线程间共享进程的资源,通信方便
3. **响应快**:用户界面线程保持响应,后台线程执行耗时操作
4. **并发性**:多个线程可以并发执行,提高系统吞吐量
**缺点:**
1. **稳定性差**:一个线程崩溃可能导致整个进程崩溃
2. **同步复杂**:需要复杂的同步机制来保护共享资源
3. **调试困难**:多线程程序的调试和分析复杂
4. **竞争条件**:容易出现死锁、活锁等问题
---
## 协程:用户态的轻量级线程
### 什么是协程?
协程是一种用户态的轻量级线程,也称为微线程或纤程。协程的调度完全由用户程序控制,不需要操作系统的介入。
### 协程的核心特点
```java
/**
* 协程的特点
*/
public class CoroutineCharacteristics {
/**
* 1. 用户态调度
*
* 协程的切换完全在用户态进行,不需要内核介入
* 切换开销极小,只需要保存/恢复少量寄存器
*/
/**
* 2. 协作式调度
*
* 协程主动让出CPU,而不是被操作系统抢占
* 更好的控制粒度,避免不必要的上下文切换
*/
/**
* 3. 轻量级
*
* 协程栈通常只有几KB,而线程栈通常是1-2MB
* 可以创建数百万个协程,而线程通常只能创建几千个
*/
/**
* 4. 高并发
*
* 适合IO密集型任务,如网络请求、数据库查询
* 一个线程可以运行数万个协程
*/
}
```
### 协程 vs 线程 vs 进程
| 特性 | 进程 | 线程 | 协程 |
|------|------|------|------|
| 调度单位 | 操作系统内核 | 操作系统内核 | 用户程序 |
| 切换开销 | 大(需要保存进程上下文) | 中等(需要保存线程上下文) | 小(只需要保存少量寄存器) |
| 内存占用 | 大(独立的内存空间) | 中(共享进程内存) | 小(几KB栈) |
| 创建数量 | 几十个到几百个 | 几千个 | 数百万个 |
| 通信方式 | IPC | 共享内存 | 直接共享 |
| 同步机制 | 复杂 | 中等 | 简单 |
| 适用场景 | CPU密集型、需要隔离 | 通用场景 | IO密集型、高并发 |
### 协程的实现原理
#### 1. 用户态栈切换
```java
/**
* 协程的栈结构
*/
public class CoroutineStack {
/**
* 每个协程都有自己的栈
*/
static class Coroutine {
private long stackPointer; // 栈指针
private byte[] stack; // 协程栈(通常几KB)
private CoroutineState state; // 协程状态
private Runnable task; // 协程任务
public enum CoroutineState {
NEW, // 新创建
RUNNING, // 运行中
SUSPENDED, // 挂起
TERMINATED // 已终止
}
public Coroutine(Runnable task, int stackSize) {
this.task = task;
this.stack = new byte[stackSize];
this.stackPointer = stackBase(stack);
this.state = CoroutineState.NEW;
}
/**
* 切换到当前协程
*
* 这需要使用汇编语言实现
*/
public native void resume();
/**
* 挂起当前协程
*/
public native void yield();
/**
* 保存当前协程上下文
*/
private native void saveContext();
/**
* 恢复协程上下文
*/
private native void restoreContext();
}
/**
* 协程调度器
*/
static class CoroutineScheduler {
private final Deque readyQueue = new ArrayDeque<>();
private final Deque blockedQueue = new ArrayDeque<>();
/**
* 创建协程
*/
public void spawn(Runnable task) {
Coroutine coroutine = new Coroutine(task, 8192); // 8KB栈
readyQueue.add(coroutine);
}
/**
* 调度协程
*/
public void schedule() {
while (!readyQueue.isEmpty()) {
Coroutine coroutine = readyQueue.pollFirst();
if (coroutine.state == CoroutineState.TERMINATED) {
continue;
}
// 恢复协程执行
coroutine.resume();
if (coroutine.state == CoroutineState.SUSPENDED) {
readyQueue.addLast(coroutine);
}
}
}
/**
* 协程让出CPU
*/
public void yield() {
Coroutine current = getCurrentCoroutine();
current.state = CoroutineState.SUSPENDED;
current.yield(); // 挂起当前协程
}
}
}
```
#### 2. 状态机模拟
```java
/**
* 使用状态机模拟协程
*/
public class StateMachineCoroutine {
/**
* 协程状态
*/
static class Coroutine {
private int state = 0;
private Object value;
public Object next() {
switch (state) {
case 0:
state = 1;
return "Step 1";
case 1:
state = 2;
return "Step 2";
case 2:
state = 3;
return "Step 3";
default:
throw new IllegalStateException("Coroutine terminated");
}
}
}
/**
* 生成器(类似Python的yield)
*/
static class Generator {
private int state = 0;
private T value;
public T yield(T value) {
this.value = value;
// 保存状态并返回
return null;
}
public T next() {
switch (state) {
case 0:
state = 1;
return yield((T) "Hello");
case 1:
state = 2;
return yield((T) "World");
case 2:
state = 3;
return yield((T) "Coroutine");
default:
return null;
}
}
}
}
```
### Java中的协程实现
#### 1. Project Loom(虚拟线程)
```java
/**
* Project Loom:Java虚拟线程
*
* 这是JDK 21正式引入的特性
*/
public class VirtualThreadExample {
/**
* 创建虚拟线程
*/
public static void createVirtualThread() {
// 方式1:使用Thread.ofVirtual()
Thread virtualThread = Thread.ofVirtual()
.name("virtual-thread-1")
.start(() -> {
System.out.println("虚拟线程运行: " +
Thread.currentThread().getName());
});
// 方式2:使用Executors
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
System.out.println("虚拟线程运行");
});
}
}
/**
* 大规模并发示例
*/
public static void largeScaleConcurrency() throws InterruptedException {
// 创建10万个虚拟线程
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CountDownLatch latch = new CountDownLatch(100_000);
for (int i = 0; i < 100_000; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// 模拟IO操作
Thread.sleep(100);
System.out.println("任务 " + taskId + " 完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
System.out.println("所有任务完成");
}
}
/**
* 结构化并发
*/
public static void structuredConcurrency() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 提交任务1
Supplier task1 = scope.fork(() -> {
Thread.sleep(1000);
return "任务1结果";
});
// 提交任务2
Supplier task2 = scope.fork(() -> {
Thread.sleep(1000);
return "任务2结果";
});
// 等待所有任务完成
scope.join().throwIfFailed();
// 获取结果
System.out.println("任务1: " + task1.get());
System.out.println("任务2: " + task2.get());
}
}
}
```
#### 2. 第三方协程库
```java
/**
* Quasar协程库示例
*
* 需要依赖:
*
* co.paralleluniverse
* quasar-core
*
*/
public class QuasarCoroutineExample {
/**
* 使用Quasar创建协程
*/
@Suspendable
public static void fiberExample() throws Exception {
// 创建Fiber
Fiber fiber = new Fiber(() -> {
System.out.println("Fiber开始");
// 让出CPU
Fiber.sleep(1000);
System.out.println("Fiber继续执行");
}).start();
// 等待Fiber完成
fiber.join();
}
/**
* Fiber通道通信
*/
public static void channelExample() throws Exception {
Channel channel = new Channel<>(10);
// 生产者Fiber
new Fiber(() -> {
for (int i = 0; i < 10; i++) {
channel.send(i);
System.out.println("发送: " + i);
}
channel.close();
}).start();
// 消费者Fiber
new Fiber(() -> {
Integer value;
while ((value = channel.receive()) != null) {
System.out.println("接收: " + value);
}
}).start().join();
}
}
```
---
## Java中的线程模型
### JVM线程模型
```
┌─────────────────────────────────────────────────────┐
│ Java虚拟机(JVM) │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Java线程 │ │ Java线程 │ │
│ │ (Thread) │ │ (Thread) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ └────────┬─────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────┐ │
│ │ 线程本地存储(TLS) │ │
│ │ JIT编译器 │ │
│ │ 垃圾收集器 │ │
│ └─────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────┐ │
│ │ 操作系统线程 │ │
│ │ (Pthread/LWP) │ │
│ └─────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────┐ │
│ │ 内核调度器 │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
```
### 1:1线程模型(JVM默认)
```java
/**
* 1:1线程模型
*
* 每个Java线程对应一个原生操作系统线程
*/
public class OneToOneModel {
/**
* 优缺点分析
*/
static class Analysis {
/**
* 优点:
* 1. 充分利用多核CPU
* 2. 操作系统直接调度,性能好
* 3. 可以真正并行执行
*
* 缺点:
* 1. 线程创建开销大
* 2. 上下文切换开销大
* 3. 无法创建大量线程(通常几千个)
* 4. 阻塞操作会阻塞整个线程
*/
}
/**
* 线程创建过程
*/
public static void threadCreationProcess() {
/*
* 1. JVM创建Thread对象
* 2. 调用start()方法
* 3. JVM调用本地方法start0()
* 4. 本地方法创建操作系统线程
* 5. 操作系统线程执行Java代码
*/
Thread thread = new Thread(() -> {
System.out.println("线程运行");
});
thread.start(); // 触发线程创建
}
}
```
### N:M线程模型(虚拟线程)
```java
/**
* N:M线程模型
*
* 多个虚拟线程(M)映射到少数物理线程(N)
*/
public class NToMModel {
/**
* 优缺点分析
*/
static class Analysis {
/**
* 优点:
* 1. 可以创建大量虚拟线程(数百万)
* 2. 虚拟线程切换开销小(用户态)
* 3. 阻塞操作不会阻塞物理线程
* 4. 适合高并发IO场景
*
* 缺点:
* 1. CPU密集型任务无法利用多核
* 2. 需要特殊的调度器
* 3. 调试和监控更复杂
*/
}
/**
* 虚拟线程调度
*/
public static void virtualThreadScheduling() {
// 创建虚拟线程
Thread vt = Thread.ofVirtual().start(() -> {
System.out.println("虚拟线程运行");
});
// 虚拟线程会被挂载到物理线程上执行
// 遇到阻塞操作时,虚拟线程会卸载
// 物理线程可以执行其他虚拟线程
}
}
```
### 线程池的实现原理
```java
/**
* 线程池实现原理
*/
public class ThreadPoolImplementation {
/**
* 线程池核心组件
*/
static class ThreadPool {
private final BlockingQueue workQueue; // 工作队列
private final Set workers; // 工作线程集合
private volatile boolean isRunning; // 运行状态
private final AtomicInteger poolSize; // 池大小
/**
* 工作线程
*/
class Worker extends Thread {
private Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
@Override
public void run() {
try {
Runnable task = firstTask;
while (task != null || (task = getTask()) != null) {
task.run();
task = null;
}
} finally {
workers.remove(this);
}
}
/**
* 从队列获取任务
*/
private Runnable getTask() {
try {
return workQueue.take();
} catch (InterruptedException e) {
return null;
}
}
}
/**
* 提交任务
*/
public void execute(Runnable command) {
if (poolSize.get() < workers.size()) {
addWorker(command);
} else {
workQueue.offer(command);
}
}
/**
* 添加工作线程
*/
private void addWorker(Runnable firstTask) {
Worker worker = new Worker(firstTask);
workers.add(worker);
worker.start();
}
}
}
```
---
## 协程的实现与原理
### 协程的底层实现
#### 1. 上下文切换机制
```java
/**
* 协程上下文切换
*/
public class CoroutineContextSwitch {
/**
* 协程上下文
*/
static class CoroutineContext {
// 需要保存的寄存器
private long rbx; // 基址寄存器
private long rsp; // 栈指针
private long rbp; // 基址指针
private long rip; // 指令指针
private long r12; // 通用寄存器
private long r13; // 通用寄存器
private long r14; // 通用寄存器
private long r15; // 通用寄存器
/**
* 保存当前上下文
*/
public native void save();
/**
* 恢复上下文
*/
public native void restore();
/**
* 交换上下文
*
* 这是协程切换的核心
*/
public native void swap(CoroutineContext other);
}
/**
* 协程管理器
*/
static class CoroutineManager {
private final Deque readyQueue = new ArrayDeque<>();
private Coroutine current;
/**
* 创建协程
*/
public Coroutine create(Runnable task) {
Coroutine coroutine = new Coroutine(task);
readyQueue.add(coroutine);
return coroutine;
}
/**
* 切换到下一个协程
*/
public void yield() {
if (readyQueue.isEmpty()) {
return;
}
Coroutine next = readyQueue.pollFirst();
readyQueue.addLast(current);
CoroutineContext currentContext = current.getContext();
CoroutineContext nextContext = next.getContext();
// 切换上下文
currentContext.swap(nextContext);
current = next;
}
}
}
```
#### 2. 栈管理机制
```java
/**
* 协程栈管理
*/
public class CoroutineStackManagement {
/**
* 栈分配策略
*/
static class StackAllocation {
/**
* 1. 固定大小栈
*
* 每个协程分配固定大小的栈
* 简单但可能浪费空间
*/
public static byte[] allocateFixedStack(int size) {
return new byte[size];
}
/**
* 2. 动态增长栈
*
* 栈空间不足时自动扩容
* 需要处理栈溢出检测
*/
public static class DynamicStack {
private byte[] stack;
private int top;
public DynamicStack(int initialSize) {
this.stack = new byte[initialSize];
this.top = initialSize;
}
public void grow(int newSize) {
byte[] newStack = new byte[newSize];
System.arraycopy(stack, 0, newStack, 0, top);
stack = newStack;
}
}
/**
* 3. 分段栈(Go语言的实现)
*
* 栈由多个连续的段组成
* 需要时分配新段
*/
public static class SegmentedStack {
private List segments = new ArrayList<>();
private int currentSegment;
public void allocateNewSegment() {
byte[] newSegment = new byte[8192];
segments.add(newSegment);
currentSegment = segments.size() - 1;
}
}
}
/**
* 栈切换
*/
static class StackSwitch {
/**
* 保存当前栈状态
*/
public native void saveStack();
/**
* 恢复栈状态
*/
public native void restoreStack();
/**
* 检测栈溢出
*/
public native boolean checkStackOverflow();
}
}
```
### 协程调度算法
```java
/**
* 协程调度算法
*/
public class CoroutineScheduling {
/**
* 1. FIFO调度
*
* 先进先出,简单公平
*/
static class FIFOScheduler {
private final Queue readyQueue = new LinkedList<>();
public void schedule() {
while (!readyQueue.isEmpty()) {
Coroutine coroutine = readyQueue.poll();
execute(coroutine);
}
}
private void execute(Coroutine coroutine) {
// 执行协程
}
}
/**
* 2. 优先级调度
*
* 高优先级协程优先执行
*/
static class PriorityScheduler {
private final PriorityQueue readyQueue =
new PriorityQueue<>((a, b) ->
Integer.compare(b.getPriority(), a.getPriority()));
public void schedule() {
while (!readyQueue.isEmpty()) {
Coroutine coroutine = readyQueue.poll();
execute(coroutine);
}
}
}
/**
* 3. 时间片调度
*
* 每个协程执行固定时间片后让出CPU
*/
static class TimeSliceScheduler {
private final Deque readyQueue = new ArrayDeque<>();
private static final int TIME_SLICE = 10_000; // 10ms
public void schedule() {
while (!readyQueue.isEmpty()) {
Coroutine coroutine = readyQueue.pollFirst();
executeWithTimeSlice(coroutine, TIME_SLICE);
if (!coroutine.isFinished()) {
readyQueue.addLast(coroutine);
}
}
}
private void executeWithTimeSlice(Coroutine coroutine, int timeSlice) {
long startTime = System.nanoTime();
while (System.nanoTime() - startTime < timeSlice) {
if (!coroutine.step()) {
break;
}
}
}
}
}
```
---
## 性能对比与选择策略
### 性能基准测试
```java
/**
* 进程、线程、协程性能对比
*/
public class PerformanceComparison {
/**
* 测试场景1:创建和销毁开销
*/
public static void testCreationOverhead() {
int count = 10_000;
// 测试进程创建
long startTime = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
// 创建进程(仅演示,不建议实际运行)
// Runtime.getRuntime().exec("echo test");
}
long processTime = System.currentTimeMillis() - startTime;
// 测试线程创建
startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
new Thread(latch::countDown).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long threadTime = System.currentTimeMillis() - startTime;
// 测试协程创建(虚拟线程)
startTime = System.currentTimeMillis();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CountDownLatch vtLatch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
executor.submit(vtLatch::countDown);
}
try {
vtLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long coroutineTime = System.currentTimeMillis() - startTime;
System.out.println("进程创建时间: " + processTime + "ms");
System.out.println("线程创建时间: " + threadTime + "ms");
System.out.println("协程创建时间: " + coroutineTime + "ms");
}
/**
* 测试场景2:上下文切换开销
*/
public static void testContextSwitchOverhead() {
int iterations = 1_000_000;
// 线程上下文切换
long startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(2);
Thread t1 = new Thread(() -> {
for (int i = 0; i < iterations; i++) {
Thread.yield();
}
latch.countDown();
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < iterations; i++) {
Thread.yield();
}
latch.countDown();
});
t1.start();
t2.start();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long threadSwitchTime = System.currentTimeMillis() - startTime;
System.out.println("线程上下文切换时间: " + threadSwitchTime + "ms");
// 协程上下文切换(虚拟线程)
startTime = System.currentTimeMillis();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CountDownLatch vtLatch = new CountDownLatch(2);
executor.submit(() -> {
for (int i = 0; i < iterations; i++) {
Thread.yield();
}
vtLatch.countDown();
});
executor.submit(() -> {
for (int i = 0; i < iterations; i++) {
Thread.yield();
}
vtLatch.countDown();
});
try {
vtLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long coroutineSwitchTime = System.currentTimeMillis() - startTime;
System.out.println("协程上下文切换时间: " + coroutineSwitchTime + "ms");
}
/**
* 测试场景3:IO密集型任务
*/
public static void testIOIntensiveTasks() throws Exception {
int taskCount = 1000;
String url = "http://example.com";
// 使用线程池
ExecutorService threadPool = Executors.newFixedThreadPool(100);
long startTime = System.currentTimeMillis();
List> threadFutures = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
threadFutures.add(threadPool.submit(() -> {
// 模拟IO操作
Thread.sleep(100);
return "done";
}));
}
for (Future future : threadFutures) {
future.get();
}
long threadTime = System.currentTimeMillis() - startTime;
System.out.println("线程池IO时间: " + threadTime + "ms");
threadPool.shutdown();
// 使用虚拟线程
try (ExecutorService vtExecutor = Executors.newVirtualThreadPerTaskExecutor()) {
startTime = System.currentTimeMillis();
List> vtFutures = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
vtFutures.add(vtExecutor.submit(() -> {
// 模拟IO操作
Thread.sleep(100);
return "done";
}));
}
for (Future future : vtFutures) {
future.get();
}
long vtTime = System.currentTimeMillis() - startTime;
System.out.println("虚拟线程IO时间: " + vtTime + "ms");
}
}
}
```
### 选择策略
```
┌─────────────────────────────────────────────────────────┐
│ 选择决策树 │
│ │
│ 是否需要资源隔离? │
│ ┌────Yes────┐ │
│ │ 使用进程 │ │
│ └───────────┘ │
│ No │
│ ↓ │
│ 任务类型? │
│ ┌─────────────┬──────────────┐ │
│ │ CPU密集型 │ IO密集型 │ │
│ ↓ ↓ ↓ │
│ 使用线程池 使用线程池 使用协程 │
│ (少量线程) (虚拟线程) │
│ │
└─────────────────────────────────────────────────────────┘
```
```java
/**
* 选择策略
*/
public class SelectionStrategy {
/**
* 场景1:需要资源隔离
*
* 选择:进程
*
* 理由:
* - 进程间内存隔离
* - 一个进程崩溃不影响其他进程
* - 适合运行不可信代码
*
* 示例:浏览器标签页、容器化应用
*/
/**
* 场景2:CPU密集型任务
*
* 选择:线程池(线程数=CPU核心数)
*
* 理由:
* - 充分利用多核CPU
* - 避免过多线程导致上下文切换
* - 操作系统直接调度,效率高
*
* 示例:图像处理、科学计算
*/
/**
* 场景3:IO密集型任务(少量并发)
*
* 选择:线程池(较多线程)
*
* 理由:
* - IO等待时线程可以切换
* - 实现相对简单
* - 调试方便
*
* 示例:文件读写、网络请求(< 1000个并发)
*/
/**
* 场景4:IO密集型任务(大量并发)
*
* 选择:协程(虚拟线程)
*
* 理由:
* - 可以创建数百万个协程
* - 协程切换开销极小
* - 阻塞操作不会阻塞物理线程
*
* 示例:Web服务器、即时通讯(> 10000个并发)
*/
/**
* 场景5:需要精细控制
*
* 选择:协程(状态机)
*
* 理由:
* - 协程的调度完全由用户控制
* - 可以实现复杂的协作逻辑
* - 避免竞态条件
*
* 示例:游戏引擎、异步状态机
*/
}
```
---
## 实战应用与最佳实践
### 实战1:高并发Web服务器
```java
/**
* 使用虚拟线程实现高并发Web服务器
*/
public class HighConcurrencyWebServer {
private final ServerSocket serverSocket;
private final ExecutorService executor;
public HighConcurrencyWebServer(int port) throws IOException {
this.serverSocket = new ServerSocket(port);
this.executor = Executors.newVirtualThreadPerTaskExecutor();
}
/**
* 启动服务器
*/
public void start() {
System.out.println("服务器启动,监听端口: " +
serverSocket.getLocalPort());
while (true) {
try {
Socket clientSocket = serverSocket.accept();
// 为每个连接创建虚拟线程
executor.submit(() -> handleClient(clientSocket));
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理客户端请求
*/
private void handleClient(Socket clientSocket) {
try (InputStream input = clientSocket.getInputStream();
OutputStream output = clientSocket.getOutputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
PrintWriter writer = new PrintWriter(output, true)) {
// 读取请求
String requestLine = reader.readLine();
System.out.println("收到请求: " + requestLine);
// 模拟业务逻辑(可能涉及数据库、缓存等IO操作)
String response = processRequest(requestLine);
// 发送响应
writer.println("HTTP/1.1 200 OK");
writer.println("Content-Type: text/plain");
writer.println();
writer.println(response);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理请求
*/
private String processRequest(String request) {
// 模拟IO操作
try {
Thread.sleep(10); // 10ms的延迟
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, World!";
}
public static void main(String[] args) throws IOException {
HighConcurrencyWebServer server = new HighConcurrencyWebServer(8080);
server.start();
}
}
```
### 实战2:异步任务编排
```java
/**
* 使用结构化并发编排异步任务
*/
public class AsyncTaskOrchestration {
/**
* 场景:需要并发执行多个任务,并等待所有任务完成
*/
public static void parallelExecution() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 任务1:查询用户信息
Supplier userTask = scope.fork(() -> {
Thread.sleep(1000);
return "用户: 张三";
});
// 任务2:查询订单信息
Supplier orderTask = scope.fork(() -> {
Thread.sleep(1000);
return "订单: 3个商品";
});
// 任务3:查询推荐信息
Supplier recommendTask = scope.fork(() -> {
Thread.sleep(1000);
return "推荐: 5个商品";
});
// 等待所有任务完成
scope.join().throwIfFailed();
// 收集结果
String result = String.join("\n",
userTask.get(),
orderTask.get(),
recommendTask.get()
);
System.out.println(result);
}
}
/**
* 场景:任务间有依赖关系
*/
public static void dependentTasks() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 任务1:获取配置
Supplier