# Redis完全指南:从小白到高手的进阶之路
## 目录
1. [引言:为什么选择Redis](#引言为什么选择redis)
2. [Redis基础入门](#redis基础入门)
3. [Redis数据类型详解](#redis数据类型详解)
4. [Redis持久化机制](#redis持久化机制)
5. [Redis内存管理](#redis内存管理)
6. [Redis集群架构](#redis集群架构)
7. [Redis高级特性](#redis高级特性)
8. [Redis性能优化](#redis性能优化)
9. [Redis实战应用](#redis实战应用)
10. [Redis监控与运维](#redis监控与运维)
11. [常见问题与解决方案](#常见问题与解决方案)
12. [总结](#总结)
---
## 引言:为什么选择Redis
### Redis的核心价值
在开始深入学习之前,我们先理解为什么Redis在现代应用中如此重要。
```java
/**
* Redis的核心优势
*/
public class RedisAdvantages {
/**
* 1. 极高的性能
*
* - 读写速度:10万次/秒
* - 响应时间:毫秒级
* - 原因:基于内存存储
*/
/**
* 2. 丰富的数据类型
*
* - String(字符串)
* - Hash(哈希)
* - List(列表)
* - Set(集合)
* - ZSet(有序集合)
* - Bitmap(位图)
* - HyperLogLog(基数统计)
* - GEO(地理位置)
*/
/**
* 3. 持久化支持
*
* - RDB(快照持久化)
* - AOF(追加日志持久化)
* - 混合持久化
*/
/**
* 4. 高可用性
*
* - 主从复制
* - 哨兵机制
* - 集群模式
*/
/**
* 5. 分布式特性
*
* - 分布式锁
* - 分布式会话
* - 分布式限流
*/
}
```
### Redis vs 其他存储方案
```
┌─────────────┬─────────────┬─────────────┬─────────────┐
│ 特性 │ Redis │ MySQL │ Memcache │
├─────────────┼─────────────┼─────────────┼─────────────┤
│ 存储类型 │ 内存存储 │ 磁盘存储 │ 内存存储 │
│ 数据类型 │ 丰富 │ 关系型 │ 简单 │
│ 持久化 │ 支持 │ 支持 │ 不支持 │
│ 读写速度 │ 极快 │ 一般 │ 极快 ││
│ 数据容量 │ 有限 │ 海量 │ 有限 │
│ 复杂查询 │ 有限 │ 强大 │ 不支持 │
│ 适用场景 │ 缓存/会话 │ 主存储 │ 简单缓存 │
└─────────────┴─────────────┴─────────────┴─────────────┘
```
### Redis的应用场景
```java
/**
* Redis典型应用场景
*/
public class RedisUseCases {
/**
* 1. 缓存系统
*
* - 热点数据缓存
* - 数据库查询缓存
* - API响应缓存
*/
/**
* 2. 会话管理
*
* - 分布式会话存储
* - 用户登录状态
* - 购物车信息
*/
/**
* 3. 排行榜
*
* - 实时排行榜
* - 热门文章排行
* - 游戏排行榜
*/
/**
* 4. 计数器
*
* - 文章阅读量
* - 视频播放量
* - 点赞数统计
*/
/**
* 5. 消息队列
*
* - 简单消息队列
* - 延迟队列
* - 发布订阅
*/
/**
* 6. 分布式锁
*
* - 防止重复操作
* - 限流控制
* - 分布式事务
*/
}
```
---
## Redis基础入门
### 1. Redis安装与配置
#### 1.1 在Linux上安装Redis
```bash
# 使用apt安装(Ubuntu/Debian)
sudo apt update
sudo apt install redis-server
# 使用yum安装(CentOS/RHEL)
sudo yum install epel-release
sudo yum install redis
# 使用dnf安装(Fedora)
sudo dnf install redis
# 启动Redis服务
sudo systemctl start redis
sudo systemctl enable redis
# 检查Redis状态
sudo systemctl status redis
```
#### 1.2 在Docker中运行Redis
```bash
# 拉取Redis镜像
docker pull redis:latest
# 运行Redis容器
docker run -d \
--name redis \
-p 6379:6379 \
-v /data/redis:/data \
redis:latest \
redis-server --appendonly yes
# 查看Redis日志
docker logs -f redis
# 进入Redis容器
docker exec -it redis redis-cli
```
#### 1.3 Redis配置文件详解
```conf
# redis.conf核心配置说明
# 网络配置
bind 0.0.0.0 # 监听所有网卡
protected-mode no # 关闭保护模式
port 6379 # 监听端口
tcp-backlog 511 # TCP连接队列长度
timeout 0 # 客户端空闲超时(0表示不关闭)
# 持久化配置
save 900 1 # 900秒内至少1个key变化则保存
save 300 10 # 300秒内至少10个key变化则保存
save 60 10000 # 60秒内至少10000个key变化则保存
rdbcompression yes # 是否压缩RDB文件
rdbchecksum yes # 是否校验RDB文件
# AOF配置
appendonly no # 是否开启AOF
appendfsync everysec # AOF同步策略
appendfilename "appendonly.aof"
auto-aof-rewrite-percentage 100 # AOF重写百分比
auto-aof-rewrite-min-size 64mb # AOF重写最小大小
# 内存配置
maxmemory 256mb # 最大内存限制
maxmemory-policy allkeys-lru # 内存淘汰策略
# 日志配置
loglevel notice # 日志级别
logfile "" # 日志文件(空表示输出到标准输出)
# 慢查询配置
slowlog-log-slower-than 10000 # 慢查询阈值(微秒)
slowlog-max-len 128 # 慢查询日志最大长度
```
### 2. Redis客户端使用
#### 2.1 使用redis-cli
```bash
# 连接Redis
redis-cli
# 连接远程Redis
redis-cli -h 127.0.0.1 -p 6379
# 连接带密码的Redis
redis-cli -h 127.0.0.1 -p 6379 -a password
# 执行命令
redis-cli SET key value
redis-cli GET key
# 批量执行命令
redis-cli << EOF
SET key1 value1
SET key2 value2
GET key1
GET key2
EOF
# 监控Redis命令
redis-cli MONITOR
# 查看信息
redis-cli INFO
redis-cli INFO server
redis-cli INFO memory
redis-cli INFO stats
# 测试延迟
redis-cli --latency
redis-cli --latency-history
```
#### 2.2 Java客户端使用
```java
/**
* Redis Java客户端示例
*/
public class RedisClientExample {
/**
* 1. Jedis客户端
*/
public static void jedisExample() {
// 创建Jedis连接
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 设置密码
// jedis.auth("password");
try {
// String操作
jedis.set("name", "张三");
String name = jedis.get("name");
System.out.println("name: " + name);
// 设置过期时间
jedis.setex("temp_key", 10, "临时数据");
// 删除key
jedis.del("name");
// Hash操作
jedis.hset("user:1", "name", "李四");
jedis.hset("user:1", "age", "25");
String userName = jedis.hget("user:1", "name");
System.out.println("userName: " + userName);
// List操作
jedis.lpush("mylist", "item1", "item2", "item3");
List list = jedis.lrange("mylist", 0, -1);
System.out.println("list: " + list);
// Set操作
jedis.sadd("myset", "member1", "member2", "member3");
Set set = jedis.smembers("myset");
System.out.println("set: " + set);
// ZSet操作
jedis.zadd("myzset", 90, "score90");
jedis.zadd("myzset", 85, "score85");
jedis.zadd("myzset", 95, "score95");
Set zset = jedis.zrange("myzset", 0, -1);
System.out.println("zset: " + zset);
} finally {
// 关闭连接
jedis.close();
}
}
/**
* 2. Lettuce客户端(推荐)
*/
public static void lettuceExample() {
// 创建Redis客户端
RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
StatefulRedisConnection connection = redisClient.connect();
RedisCommands commands = connection.sync();
try {
// String操作
commands.set("name", "张三");
String name = commands.get("name");
System.out.println("name: " + name);
// 批量操作
commands.multi();
commands.set("key1", "value1");
commands.set("key2", "value2");
commands.exec();
} finally {
// 关闭连接
connection.close();
redisClient.shutdown();
}
}
/**
* 3. Spring Data Redis
*/
@Component
public static class SpringRedisExample {
@Autowired
private RedisTemplate redisTemplate;
public void springRedisOperations() {
// String操作
redisTemplate.opsForValue().set("name", "张三");
String name = (String) redisTemplate.opsForValue().get("name");
System.out.println("name: " + name);
// Hash操作
Map userMap = new HashMap<>();
userMap.put("name", "李四");
userMap.put("age", 25);
redisTemplate.opsForHash().putAll("user:1", userMap);
// List操作
redisTemplate.opsForList().leftPushAll("mylist",
"item1", "item2", "item3");
// Set操作
redisTemplate.opsForSet().add("myset",
"member1", "member2", "member3");
// ZSet操作
redisTemplate.opsForZSet().add("myzset", "score90", 90);
redisTemplate.opsForZSet().add("myzset", "score85", 85);
}
}
}
```
---
## Redis数据类型详解
### 1. String(字符串)
String是Redis最基本的数据类型,可以存储任何类型的数据。
#### 1.1 基本操作
```bash
# 设置值
SET key value
SET name "张三"
SET age 25
SET score 95.5
# 获取值
GET name
GET age
GET score
# 设置值并指定过期时间(秒)
SETEX temp_key 10 "临时数据"
# 设置值并指定过期时间(毫秒)
PSETEX temp_key 10000 "临时数据"
# 如果key不存在则设置
SETNX lock_key "locked"
# 获取并设置值
GETSET key new_value
# 批量设置
MSET key1 value1 key2 value2 key3 value3
# 批量获取
MGET key1 key2 key3
# 追加值
APPEND key "suffix"
# 获取字符串长度
STRLEN key
# 获取指定范围的字符串
GETRANGE key 0 5
# 设置指定范围的字符串
SETRANGE key 0 "new"
```
#### 1.2 数值操作
```bash
# 自增1
INCR counter
# 自增指定值
INCRBY counter 5
# 自增浮点数
INCRBYFLOAT score 3.5
# 自减1
DECR counter
# 自减指定值
DECRBY counter 5
```
#### 1.3 String应用场景
```java
/**
* String类型应用场景
*/
public class StringUseCases {
/**
* 1. 缓存数据
*/
public static void cacheData() {
/*
* 使用场景:缓存数据库查询结果
*
* 实现:
* 1. 查询数据库
* 2. 将结果存入Redis
* 3. 设置合理的过期时间
*/
// 伪代码
String cacheKey = "user:info:" + userId;
String cachedData = redis.get(cacheKey);
if (cachedData == null) {
// 缓存未命中,查询数据库
UserData userData = database.queryUser(userId);
redis.setex(cacheKey, 3600, JSON.toJSONString(userData));
}
}
/**
* 2. 计数器
*/
public static void counter() {
/*
* 使用场景:文章阅读量、视频播放量、点赞数
*
* 实现:
* 1. 使用INCR命令自增
* 2. 使用INCRBY增加指定值
*/
// 伪代码
String articleKey = "article:read:" + articleId;
long readCount = redis.incr(articleKey);
// 也可以批量增加
String likeKey = "article:like:" + articleId;
redis.incrby(likeKey, 1);
}
/**
* 3. 分布式锁
*/
public static void distributedLock() {
/*
* 使用场景:防止重复操作、限流控制
*
* 实现:
* 1. 使用SETNX命令尝试获取锁
* 2. 设置过期时间防止死锁
* 3. 使用DEL命令释放锁
*/
// 伪代码
String lockKey = "lock:order:" + orderId;
boolean locked = redis.setnx(lockKey, "locked") == 1;
if (locked) {
// 设置过期时间
redis.expire(lockKey, 30); // 30秒后自动释放
try {
// 执行业务逻辑
processOrder(orderId);
} finally {
// 释放锁
redis.del(lockKey);
}
}
}
/**
* 4. Session管理
*/
public static void sessionManagement() {
/*
* 使用场景:用户登录状态、购物车信息
*
* 实现:
* 1. 将Session信息序列化为JSON
* 2. 使用用户ID作为key
* 3. 设置合理的过期时间
*/
// 伪代码
String sessionKey = "session:" + userId;
SessionInfo session = new SessionInfo(userId, token, cart);
redis.setex(sessionKey, 1800, JSON.toJSONString(session)); // 30分钟过期
}
/**
* 5. 限流控制
*/
public static void rateLimiting() {
/*
* 使用场景:API访问频率限制
*
* 实现:
* 1. 使用INCR记录访问次数
* 2. 设置过期时间重置计数器
* 3. 超过限制则拒绝访问
*/
// 伪代码
String limitKey = "limit:api:" + apiKey;
long count = redis.incr(limitKey);
if (count == 1) {
// 第一次访问,设置过期时间
redis.expire(limitKey, 60); // 60秒内重置
}
if (count > 100) { // 每分钟最多100次
throw new RateLimitException("访问频率超过限制");
}
}
}
```
### 2. Hash(哈希)
Hash是一个键值对集合,适合存储对象。
#### 2.1 基本操作
```bash
# 设置字段值
HSET key field value
HSET user:1 name "张三"
HSET user:1 age 25
HSET user:1 email "zhangsan@example.com"
# 获取字段值
HGET key field
HGET user:1 name
HGET user:1 age
# 批量设置字段
HMSET user:1 name "李四" age 26 email "lisi@example.com"
# 批量获取字段
HMGET user:1 name age email
# 获取所有字段和值
HGETALL user:1
# 获取所有字段
HKEYS user:1
# 获取所有值
HVALS user:1
# 删除字段
HDEL key field
HDEL user:1 email
# 检查字段是否存在
HEXISTS key field
HEXISTS user:1 name
# 获取字段数量
HLEN key
HLEN user:1
# 字段值自增
HINCRBY key field increment
HINCRBY user:1 age 1
HINCRBY user:1 score 10
# 字段值自增浮点数
HINCRBYFLOAT key field increment
HINCRBYFLOAT user:1 score 5.5
```
#### 2.2 Hash应用场景
```java
/**
* Hash类型应用场景
*/
public class HashUseCases {
/**
* 1. 对象存储
*/
public static void objectStorage() {
/*
* 使用场景:存储用户信息、商品信息等对象
*
* 实现:
* 1. 将对象的每个属性作为field
* 2. 属性值作为value
* 3. 可以单独更新某个属性
*/
// 伪代码
String userKey = "user:" + userId;
// 存储用户信息
redis.hset(userKey, "id", userId);
redis.hset(userKey, "name", "张三");
redis.hset(userKey, "age", 25);
redis.hset(userKey, "email", "zhangsan@example.com");
// 获取用户信息
Map userInfo = redis.hgetall(userKey);
// 更新单个属性
redis.hset(userKey, "age", 26);
// 获取单个属性
String name = redis.hget(userKey, "name");
}
/**
* 2. 购物车
*/
public static void shoppingCart() {
/*
* 使用场景:存储用户购物车信息
*
* 实现:
* 1. 用户ID作为key
* 2. 商品ID作为field
* 3. 商品数量作为value
*/
// 伪代码
String cartKey = "cart:" + userId;
// 添加商品到购物车
redis.hset(cartKey, productId, quantity);
// 更新商品数量
redis.hincrby(cartKey, productId, delta);
// 获取购物车所有商品
Map cartItems = redis.hgetall(cartKey);
// 删除购物车商品
redis.hdel(cartKey, productId);
// 清空购物车
redis.del(cartKey);
}
/**
* 3. 配置管理
*/
public static void configManagement() {
/*
* 使用场景:存储系统配置信息
*
* 实现:
* 1. 配置项名作为field
* 2. 配置项值作为value
* 3. 可以动态更新配置
*/
// 伪代码
String configKey = "config:system";
// 设置配置
redis.hset(configKey, "max_connections", "100");
redis.hset(configKey, "timeout", "30");
redis.hset(configKey, "debug_mode", "false");
// 获取配置
String timeout = redis.hget(configKey, "timeout");
// 更新配置
redis.hset(configKey, "timeout", "60");
// 获取所有配置
Map allConfig = redis.hgetall(configKey);
}
}
```
### 3. List(列表)
List是一个有序的字符串集合,可以重复。
#### 3.1 基本操作
```bash
# 从左侧插入元素
LPUSH key value1 value2 value3
LPUSH mylist "item1" "item2" "item3"
# 从右侧插入元素
RPUSH key value1 value2 value3
RPUSH mylist "item4" "item5"
# 获取列表长度
LLEN key
LLEN mylist
# 获取列表指定范围的元素
LRANGE key 0 -1
LRANGE mylist 0 2
# 从左侧弹出元素
LPOP key
LPOP mylist
# 从右侧弹出元素
RPOP key
RPOP mylist
# 阻塞式弹出
BLPOP key timeout
BRPOP key timeout
# 获取指定索引的元素
LINDEX key index
LINDEX mylist 0
# 设置指定索引的元素
LSET key index value
LSET mylist 0 "new_item"
# 删除指定值的元素
LREM key count value
LREM mylist 2 "item1" # 删除2个item1
# 保留指定范围的元素
LTRIM key start stop
LTRIM mylist 0 2
# 插入元素到指定位置
LINSERT key BEFORE|AFTER pivot value
LINSERT mylist BEFORE "item2" "new_item"
```
#### 3.2 List应用场景
```java
/**
* List类型应用场景
*/
public class ListUseCases {
/**
* 1. 消息队列
*/
public static void messageQueue() {
/*
* 使用场景:简单的生产者-消费者模型
*
* 实现:
* 1. 生产者使用LPUSH推送消息
* 2. 消费者使用RPOP消费消息
* 3. 使用BRPOP阻塞等待消息
*/
// 生产者
String queueKey = "queue:messages";
redis.lpush(queueKey, message1);
redis.lpush(queueKey, message2);
// 消费者(非阻塞)
String message = redis.rpop(queueKey);
// 消费者(阻塞)
while (true) {
List messages = redis.brpop(queueKey, 30); // 阻塞30秒
if (messages != null && !messages.isEmpty()) {
processMessage(messages.get(1));
}
}
}
/**
* 2. 最新消息列表
*/
public static void recentMessages() {
/*
* 使用场景:显示最新消息、最新文章
*
* 实现:
* 1. 使用LPUSH添加新消息
* 2. 使用LRANGE获取最新N条消息
* 3. 使用LTRIM限制列表长度
*/
// 伪代码
String recentKey = "recent:messages";
// 添加新消息
redis.lpush(recentKey, messageId);
// 限制列表长度为100
redis.ltrim(recentKey, 0, 99);
// 获取最新10条消息
List recentMessages = redis.lrange(recentKey, 0, 9);
}
/**
* 3. 用户动态时间线
*/
public static void userTimeline() {
/*
* 使用场景:显示用户动态、朋友圈
*
* 实现:
* 1. 用户ID作为key
* 2. 动态ID作为value
* 3. 按时间顺序排列
*/
// 伪代码
String timelineKey = "timeline:user:" + userId;
// 添加用户动态
redis.lpush(timelineKey, activityId);
// 获取用户动态
List activities = redis.lrange(timelineKey, 0, 19);
// 分页查询
int page = 1;
int pageSize = 10;
int start = (page - 1) * pageSize;
int end = start + pageSize - 1;
List pageActivities = redis.lrange(timelineKey, start, end);
}
/**
* 4. 栈和队列
*/
public static void stackAndQueue() {
/*
* 栈:使用LPUSH + LPOP
* 队列:使用LPUSH + RPOP 或 RPUSH + LPOP
*/
// 栈操作
redis.lpush("stack", "item1");
redis.lpush("stack", "item2");
String item = redis.lpop("stack"); // 弹出item2
// 队列操作
redis.rpush("queue", "item1");
redis.rpush("queue", "item2");
String queuedItem = redis.lpop("queue"); // 弹出item1
}
}
```
### 4. Set(集合)
Set是一个无序的不重复字符串集合。
#### 4.1 基本操作
```bash
# 添加元素
SADD key member1 member2 member3
SADD myset "member1" "member2" "member3"
# 获取所有元素
SMEMBERS key
SMEMBERS myset
# 检查元素是否存在
SISMEMBER key member
SISMEMBER myset "member1"
# 获取集合大小
SCARD key
SCARD myset
# 删除元素
SREM key member
SREM myset "member1"
# 随机获取元素
SRANDMEMBER key count
SRANDMEMBER myset 2
# 随机弹出元素
SPOP key count
SPOP myset
# 集合运算
# 并集
SUNION key1 key2
SUNION set1 set2
# 交集
SINTER key1 key2
SINTER set1 set2
# 差集
SDIFF key1 key2
SDIFF set1 set2
# 将运算结果保存到新集合
SUNIONSTORE destkey key1 key2
SINTERSTORE destkey key1 key2
SDIFFSTORE destkey key1 key2
```
#### 4.2 Set应用场景
```java
/**
* Set类型应用场景
*/
public class SetUseCases {
/**
* 1. 共同好友
*/
public static void commonFriends() {
/*
* 使用场景:查找两个用户的共同好友
*
* 实现:
* 1. 使用SINTER求交集
* 2. 返回共同好友列表
*/
// 伪代码
String user1Key = "friends:" + user1Id;
String user2Key = "friends:" + user2Id;
// 获取共同好友
Set commonFriends = redis.sinter(user1Key, user2Key);
// 获取共同好友数量
long commonCount = redis.sinter(user1Key, user2Key).size();
}
/**
* 2. 推荐好友
*/
public static void recommendFriends() {
/*
* 使用场景:推荐可能认识的人
*
* 实现:
* 1. 获取用户的好友
* 2. 获取好友的好友
* 3. 使用SDIFF求差集(排除已经是好友的)
*/
// 伪代码
String userKey = "friends:" + userId;
// 获取用户好友
Set userFriends = redis.smembers(userKey);
// 获取好友的好友
Set friendOfFriends = new HashSet<>();
for (String friendId : userFriends) {
String friendKey = "friends:" + friendId;
friendOfFriends.addAll(redis.smembers(friendKey));
}
// 排除已经是好友的
friendOfFriends.removeAll(userFriends);
// 排除自己
friendOfFriends.remove(userId.toString());
}
/**
* 3. 标签系统
*/
public static void tagSystem() {
/*
* 使用场景:文章标签、商品标签
*
* 实现:
* 1. 文章ID作为key
* 2. 标签作为member
* 3. 可以按标签查找文章
*/
// 伪代码
String articleKey = "tags:article:" + articleId;
// 添加文章标签
redis.sadd(articleKey, "技术", "Java", "Redis");
// 获取文章标签
Set tags = redis.smembers(articleKey);
// 按标签查找文章
String tagKey = "tag:技术";
redis.sadd(tagKey, articleId1);
redis.sadd(tagKey, articleId2);
Set articles = redis.smembers(tagKey);
// 查找同时包含多个标签的文章
Set articlesWithMultipleTags = redis.sinter(
"tag:Java", "tag:Redis"
);
}
/**
* 4. 去重
*/
public static void deduplication() {
/*
* 使用场景:数据去重
*
* 实现:
* 1. 将数据添加到Set
* 2. Set自动去重
* 3. 获取去重后的数据
*/
// 伪代码
String dedupKey = "dedup:items";
// 添加数据(自动去重)
redis.sadd(dedupKey, item1, item2, item3);
// 获取去重后的数据
Set uniqueItems = redis.smembers(dedupKey);
// 检查数据是否已存在
boolean exists = redis.sismember(dedupKey, newItem);
}
/**
* 5. 在线用户
*/
public static void onlineUsers() {
/*
* 使用场景:实时统计在线用户
*
* 实现:
* 1. 用户登录时添加到在线集合
* 2. 用户退出时从在线集合删除
* 3. 定期清理过期用户
*/
// 伪代码
String onlineKey = "users:online";
// 用户上线
redis.sadd(onlineKey, userId);
// 用户下线
redis.srem(onlineKey, userId);
// 获取在线用户
Set onlineUsers = redis.smembers(onlineKey);
// 统计在线用户数
long onlineCount = redis.scard(onlineKey);
}
}
```
### 5. ZSet(有序集合)
ZSet是一个有序的字符串集合,每个元素关联一个分数。
#### 5.1 基本操作
```bash
# 添加元素
ZADD key score member
ZADD myzset 90 "player1" 85 "player2" 95 "player3"
# 获取指定范围的元素(按分数升序)
ZRANGE key start stop
ZRANGE myzset 0 -1
# 获取指定范围的元素(按分数降序)
ZREVRANGE key start stop
ZREVRANGE myzset 0 2
# 获取指定范围的元素(带分数)
ZRANGE key start stop WITHSCORES
ZREVRANGE key start stop WITHSCORES
# 获取元素的分数
ZSCORE key member
ZSCORE myzset "player1"
# 获取元素排名
ZRANK key member # 按分数升序排名
ZREVRANK key member # 按分数降序排名
# 获取指定分数范围的元素
ZRANGEBYSCORE key min max
ZRANGEBYSCORE myzset 90 100
# 获取指定排名范围的元素
ZRANGE key start stop
# 删除元素
ZREM key member
ZREM myzset "player1"
# 删除指定排名范围的元素
ZREMRANGEBYRANK key start stop
# 删除指定分数范围的元素
ZREMRANGEBYSCORE key min max
# 增加元素分数
ZINCRBY key increment member
ZINCRBY myzset 5 "player1"
# 获取集合大小
ZCARD key
ZCARD myzset
# 获取指定分数范围内的元素数量
ZCOUNT key min max
ZCOUNT myzset 80 100
```
#### 5.2 ZSet应用场景
```java
/**
* ZSet类型应用场景
*/
public class ZSetUseCases {
/**
* 1. 排行榜
*/
public static void leaderboard() {
/*
* 使用场景:游戏排行榜、销售排行榜
*
* 实现:
* 1. 用户ID作为member
* 2. 分数作为score
* 3. 使用ZRANGE获取排行榜
*/
// 伪代码
String leaderboardKey = "leaderboard:game";
// 更新玩家分数
redis.zadd(leaderboardKey, score, playerId);
// 增加玩家分数
redis.zincrby(leaderboardKey, delta, playerId);
// 获取TOP10玩家(按分数降序)
Set top10 = redis.zrevrange(leaderboardKey, 0, 9);
// 获取TOP10玩家及分数
Set top10WithScores = redis.zrevrangeWithScores(
leaderboardKey, 0, 9
);
// 获取玩家排名
long rank = redis.zrevrank(leaderboardKey, playerId);
// 获取玩家分数
Double playerScore = redis.zscore(leaderboardKey, playerId);
// 获取指定分数范围的玩家
Set players = redis.zrangebyscore(leaderboardKey, 80, 100);
}
/**
* 2. 延时队列
*/
public static void delayedQueue() {
/*
* 使用场景:定时任务、延迟消息
*
* 实现:
* 1. 时间戳作为score
* 2. 任务ID作为member
* 3. 定期检查到期的任务
*/
// 伪代码
String queueKey = "delayed:tasks";
// 添加延时任务
long executeTime = System.currentTimeMillis() + 60000; // 60秒后执行
redis.zadd(queueKey, executeTime, taskId);
// 获取到期的任务
long currentTime = System.currentTimeMillis();
Set expiredTasks = redis.zrangebyscore(
queueKey, 0, currentTime
);
// 执行到期任务
for (String taskId : expiredTasks) {
executeTask(taskId);
// 删除已执行的任务
redis.zrem(queueKey, taskId);
}
}
/**
* 3. 优先级队列
*/
public static void priorityQueue() {
/*
* 使用场景:任务调度、消息优先级
*
* 实现:
* 1. 优先级作为score
* 2. 任务ID作为member
* 3. 优先级高的先执行
*/
// 伪代码
String queueKey = "priority:tasks";
// 添加任务(优先级0-9,9最高)
redis.zadd(queueKey, priority, taskId);
// 获取最高优先级的任务
Set highPriorityTasks = redis.zrevrange(queueKey, 0, 0);
// 执行最高优先级任务
for (String taskId : highPriorityTasks) {
executeTask(taskId);
redis.zrem(queueKey, taskId);
}
}
/**
* 4. 搜索结果排序
*/
public static void searchResults() {
/*
* 使用场景:按相关性排序搜索结果
*
* 实现:
* 1. 相关性作为score
* 2. 结果ID作为member
* 3. 按相关性返回结果
*/
// 伪代码
String searchKey = "search:results:" + query;
// 添加搜索结果
for (SearchResult result : searchResults) {
redis.zadd(searchKey, result.getRelevance(), result.getId());
}
// 获取最相关的结果
Set topResults = redis.zrevrange(searchKey, 0, 9);
}
/**
* 5. 限流滑动窗口
*/
public static void slidingWindowRateLimit() {
/*
* 使用场景:精确的限流控制
*
* 实现:
* 1. 时间戳作为score
* 2. 请求ID作为member
* 3. 删除过期的请求
* 4. 统计窗口内的请求数
*/
// 伪代码
String rateLimitKey = "ratelimit:api:" + apiKey;
long windowSize = 60 * 1000; // 60秒窗口
long maxRequests = 100;
long currentTime = System.currentTimeMillis();
// 记录请求
redis.zadd(rateLimitKey, currentTime, requestId);
// 删除过期的请求
redis.zremrangebyscore(
rateLimitKey,
0,
currentTime - windowSize
);
// 统计窗口内的请求数
long requestCount = redis.zcard(rateLimitKey);
if (requestCount > maxRequests) {
throw new RateLimitException("超过限流阈值");
}
}
}
```
---
## Redis持久化机制
### 1. RDB持久化
RDB(Redis Database)是Redis默认的持久化方式,通过生成快照来保存数据。
#### 1.1 RDB工作原理
```
┌─────────────────────────────────────────────────────────┐
│ RDB持久化流程 │
│ │
│ 1. Redis主进程fork子进程 │
│ ↓ │
│ 2. 子进程将内存数据写入临时RDB文件 │
│ ↓ │
│ 3. 子进程用新RDB文件替换旧RDB文件 │
│ ↓ │
│ 4. 子进程退出 │
│ ↓ │
│ 5. 主进程继续处理请求 │
│ │
└─────────────────────────────────────────────────────────┘
```
#### 1.2 RDB配置
```conf
# RDB配置
save 900 1 # 900秒内至少1个key变化则保存
save 300 10 # 300秒内至少10个key变化则保存
save 60 10000 # 60秒内至少10000个key变化则保存
# RDB文件名
dbfilename dump.rdb
# RDB文件目录
dir /var/lib/redis
# 是否压缩RDB文件
rdbcompression yes
# 是否校验RDB文件
rdbchecksum yes
# RDB文件名前缀
rdbprefix "dump"
# RDB文件加密
# masterauth "password"
```
#### 1.3 手动触发RDB
```bash
# 手动保存RDB(阻塞方式)
SAVE
# 后台保存RDB(非阻塞方式)
BGSAVE
# 查看上次保存时间
LASTSAVE
# 查看RDB保存状态
INFO persistence
```
#### 1.4 RDB的优缺点
```java
/**
* RDB持久化优缺点
*/
public class RDBPersistence {
/**
* 优点:
*
* 1. 文件紧凑,适合备份
* 2. 恢复速度快
* 3. 对性能影响小(fork子进程)
* 4. 适合灾难恢复
*/
/**
* 缺点:
*
* 1. 可能丢失数据(fork间隔的数据)
* 2. fork时可能卡顿(数据量大时)
* 3. 不适合实时持久化
*/
}
```
### 2. AOF持久化
AOF(Append Only File)通过记录每个写命令来持久化数据。
#### 2.1 AOF工作原理
```
┌─────────────────────────────────────────────────────────┐
│ AOF持久化流程 │
│ │
│ 1. 客户端发送写命令 │
│ ↓ │
│ 2. Redis执行命令并将命令写入AOF缓冲区 │
│ ↓ │
│ 3. 根据fsync策略将缓冲区写入磁盘 │
│ - always: 每次写入都同步 │
│ - everysec: 每秒同步一次 │
│ - no: 由操作系统决定 │
│ ↓ │
│ 4. AOF文件逐渐增大 │
│ ↓ │
│ 5. 触发AOF重写(压缩AOF文件) │
│ - fork子进程 │
│ - 生成新的AOF文件 │
│ - 替换旧的AOF文件 │
│ │
└─────────────────────────────────────────────────────────┘
```
#### 2.2 AOF配置
```conf
# 是否开启AOF
appendonly yes
# AOF文件名
appendfilename "appendonly.aof"
# AOF同步策略
# - always: 每次写操作都同步
# - everysec: 每秒同步一次(推荐)
# - no: 由操作系统决定
appendfsync everysec
# AOF重写触发条件
auto-aof-rewrite-percentage 100 # AOF文件大小增长100%时重写
auto-aof-rewrite-min-size 64mb # AOF文件大小至少64mb时重写
# 是否在AOF重写时进行追加
no-appendfsync-on-rewrite no
# AOF文件损坏时的恢复策略
aof-load-truncated yes
# AOF文件名前缀
appendfilename "appendonly"
```
#### 2.3 手动触发AOF重写
```bash
# 手动触发AOF重写
BGREWRITEAOF
# 查看AOF重写状态
INFO persistence
```
#### 2.4 AOF的优缺点
```java
/**
* AOF持久化优缺点
*/
public class AOFPersistence {
/**
* 优点:
*
* 1. 数据安全性高(最多丢失1秒数据)
* 2. AOF文件可读(文本格式)
* 3. 重写后文件紧凑
* 4. 可以通过AOF文件恢复任意时间点的数据
*/
/**
* 缺点:
*
* 1. 文件通常比RDB大
* 2. 恢复速度比RDB慢
* 3. 性能影响较大(fsync)
* 4. 重写时可能占用大量CPU
*/
}
```
### 3. 混合持久化
Redis 4.0+支持混合持久化,结合RDB和AOF的优点。
#### 3.1 混合持久化原理
```
┌─────────────────────────────────────────────────────────┐
│ 混合持久化工作流程 │
│ │
│ 1. 第一次重写时:使用RDB格式保存基线数据 │
│ ↓ │
│ 2. 后续重写时: │
│ - 基线数据使用RDB格式 │
│ - 增量数据使用AOF格式 │
│ ↓ │
│ 3. 恢复时:先加载RDB,再重放AOF增量 │
│ │
└─────────────────────────────────────────────────────────┘
```
#### 3.2 混合持久化配置
```conf
# 是否开启混合持久化(Redis 4.0+)
aof-use-rdb-preamble yes
```
#### 3.3 持久化选择建议
```java
/**
* 持久化方案选择
*/
public class PersistenceSelection {
/**
* 方案1:纯RDB
*
* 适用场景:
* - 数据不需要精确持久化
* - 需要快速恢复
* - 内存数据量较大
*/
/**
* 方案2:纯AOF
*
* 适用场景:
* - 数据安全性要求高
* - 需要最小数据丢失
* - 可以接受稍慢的恢复速度
*/
/**
* 方案3:混合持久化(推荐)
*
* 适用场景:
* - 需要平衡安全性和性能
* - 需要快速恢复且数据安全
* - Redis 4.0+版本
*/
/**
* 方案4:同时开启RDB和AOF
*
* 适用场景:
* - 需要双重保障
* - 重要业务数据
* - 有足够的磁盘空间
*/
}
```
---
## Redis内存管理
### 1. 内存淘汰策略
当Redis内存达到maxmemory限制时,会根据配置的淘汰策略删除数据。
#### 1.1 淘汰策略类型
```conf
# 内存淘汰策略配置
maxmemory 256mb
maxmemory-policy allkeys-lru
# 淘汰策略类型:
# 1. 不淘汰
noeviction # 不淘汰,返回错误
# 2. 从所有key中淘汰
allkeys-lru # LRU算法淘汰
allkeys-lfu # LFU算法淘汰
allkeys-random # 随机淘汰
allkeys-volatile-lru # LRU算法淘汰有过期时间的key
allkeys-volatile-lfu # LFU算法淘汰有过期时间的key
allkeys-volatile-random # 随机淘汰有过期时间的key
# 3. 从有过期时间的key中淘汰
volatile-lru # LRU算法淘汰
volatile-lfu # LFU算法淘汰
volatile-random # 随机淘汰
volatile-ttl # 淘汰即将过期的key
```
#### 1.2 LRU vs LFU
```java
/**
* LRU vs LFU
*/
public class LRUvsLFU {
/**
* LRU(Least Recently Used)
*
* 原理:淘汰最久未访问的数据
*
* 优点:
* - 简单有效
* - 适合大多数场景
*
* 缺点:
* - 不能反映访问频率
* - 偶发访问会误判
*/
/**
* LFU(Least Frequently Used)
*
* 原理:淘汰访问频率最低的数据
*
* 优点:
* - 能反映访问频率
* - 适合热点数据场景
*
* 缺点:
* - 计算开销较大
* - 新数据可能被淘汰
*/
}
```
### 2. 内存优化技巧
#### 2.1 选择合适的数据类型
```java
/**
* 数据类型选择优化
*/
public class DataTypeOptimization {
/**
* 1. 使用Hash代替多个String
*
* 原因:Hash可以节省内存
*
* 示例:
* - String: user:1:name, user:1:age, user:1:email
* - Hash: user:1 -> {name, age, email}
*/
/**
* 2. 使用ZSet代替List + Set
*
* 原因:ZSet自带排序和去重功能
*
* 示例:
* - List + Set: 排行榜 + 去重
* - ZSet: 排行榜(自带排序和去重)
*/
/**
* 3. 使用Bitmap代替Set
*
* 原因:Bitmap节省大量内存
*
* 示例:
* - Set: 存储一亿个用户ID(约400MB)
* - Bitmap: 存储一亿个用户位(约12.5MB)
*/
/**
* 4. 使用HyperLogLog代替Set统计
*
* 原因:HyperLogLog极其节省内存
*
* 示例:
* - Set: 存储UV(用户访问量)
* - HyperLogLog: 统计UV(只需12KB内存)
*/
}
```
#### 2.2 合理设置过期时间
```java
/**
* 过期时间设置优化
*/
public class ExpirationOptimization {
/**
* 1. 热点数据设置较短过期时间
*/
public static void hotData() {
redis.setex("hot_data", 300, value); // 5分钟
}
/**
* 2. 冷数据设置较长过期时间
*/
public static void coldData() {
redis.setex("cold_data", 3600, value); // 1小时
}
/**
* 3. 计数器类数据设置过期时间
*/
public static void counterData() {
redis.incr("counter");
redis.expire("counter", 86400); // 24小时
}
/**
* 4. Session数据设置合理过期时间
*/
public static void sessionData() {
redis.setex("session:" + userId, 1800, sessionData); // 30分钟
}
}
```
### 3. 内存监控
```bash
# 查看内存使用情况
INFO memory
# 查看内存使用详情
MEMORY STATS
# 查看某个key的内存使用
MEMORY USAGE key
# 查看所有key的内存使用
MEMORY USAGE *
# 找出内存占用最大的key
redis-cli --bigkeys
# 分析Redis内存
redis-cli --memkeys
```
---
## Redis集群架构
### 1. 主从复制
主从复制是Redis实现高可用和读写分离的基础。
#### 1.1 主从复制原理
```
┌─────────────────────────────────────────────────────────┐
│ 主从复制流程 │
│ │
│ 1. 从服务器连接主服务器 │
│ ↓ │
│ 2. 从服务器发送SYNC命令 │
│ ↓ │
│ 3. 主服务器执行BGSAVE生成RDB文件 │
│ ↓ │
│ 4. 主服务器将RDB文件发送给从服务器 │
│ ↓ │
│ 5. 从服务器加载RDB文件 │
│ ↓ │
│ 6. 主服务器将缓冲区的写命令发送给从服务器 │
│ ↓ │
│ 7. 从服务器执行写命令 │
│ ↓ │
│ 8. 命令传播:主服务器将新写命令发送给从服务器 │
│ │
└─────────────────────────────────────────────────────────┘
```
#### 1.2 配置主从复制
```conf
# 从服务器配置(slaveof)
slaveof master_ip master_port
# 或者使用命令行
redis-cli SLAVEOF 127.0.0.1 6379
# 主服务器密码
masterauth password
# 从服务器只读
slave-read-only yes
```
#### 1.3 主从复制监控
```bash
# 查看复制信息
INFO replication
# 查看主从状态
redis-cli INFO replication
# 查看从服务器延迟
redis-cli --latency-history
# 强制同步
redis-cli SYNC
```
### 2. 哨兵机制
Sentinel是Redis的高可用解决方案,用于监控和自动故障转移。
#### 2.1 哨兵架构
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Master │ │ Sentinel │ │ Sentinel │
│ Redis │◄────┤ 1 │◄────┤ 2 │
│ :6379 │ │ :26379 │ │ :26379 │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ 复制 │ 监控 │ 监控
↓ ↓ ↓
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Slave │ │ Sentinel │ │ Sentinel │
│ Redis │ │ 3 │ │ 4 │
│ :6380 │◄────┤ :26379 │◄────┤ :26379 │
└─────────────┘ └─────────────┘ └─────────────┘
```
#### 2.2 哨兵配置
```conf
# sentinel.conf配置
# 哨兵端口
port 26379
# 监控主服务器
sentinel monitor mymaster 127.0.0.1 6379 2
# 主服务器密码
sentinel auth-pass mymaster password
# 下线判断时间
sentinel down-after-milliseconds mymaster 30000
# 故障转移超时时间
sentinel failover-timeout mymaster 180000
# 并行同步数量
sentinel parallel-syncs mymaster 1
# 故障转移超时时间
sentinel failover-timeout mymaster 180000
```
#### 2.3 哨哨命令
```bash
# 查看哨兵状态
INFO sentinel
# 查看监控的主服务器
SENTINEL masters
# 查看指定主服务器的信息
SENTINEL master mymaster
# 查看从服务器列表
SENTINEL slaves mymaster
# 手动故障转移
SENTINEL failover mymaster
# 检查主服务器状态
SENTINEL ckquorum mymaster
```
### 3. Redis集群
Redis Cluster是Redis的分布式解决方案,提供数据分片和高可用性。
#### 3.1 集群架构
```
┌─────────────────────────────────────────────────────────┐
│ Redis集群架构 │
│ │
│ 分片1(Slots 0-5460) │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Master A │ │ Slave A' │ │
│ │ :7001 │◄─│ :7004 │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ 分片2(Slots 5461-10922) │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Master B │ │ Slave B' │ │
│ │ :7002 │◄─│ :7005 │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ 分片3(Slots 10923-16383) │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Master C │ │ Slave C' │ │
│ │ :7003 │◄─│ :7006 │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ 客户端通过集群协议与任意节点通信 │
│ 自动路由到正确的节点 │
│ │
└─────────────────────────────────────────────────────────┘
```
#### 3.2 集群配置
```conf
# cluster配置
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 15000
cluster-require-full-coverage yes
cluster-migration-barrier 1
# 集群总线端口
cluster-port 16379
```
#### 3.3 创建集群
```bash
# 使用redis-cli创建集群
redis-cli --cluster create \
127.0.0.1:7001 \
127.0.0.1:7002 \
127.0.0.1:7003 \
127.0.0.1:7004 \
127.0.0.1:7005 \
127.0.0.1:7006 \
--cluster-replicas 1
# 查看集群信息
redis-cli -c -p 7001 CLUSTER INFO
# 查看集群节点
redis-cli -c -p 7001 CLUSTER NODES
# 查看集群槽位
redis-cli -c -p 7001 CLUSTER SLOTS
```
#### 3.4 集群命令
```bash
# 连接集群
redis-cli -c -p 7001
# 查看集群信息
CLUSTER INFO
# 查看集群节点
CLUSTER NODES
# 查看槽位信息
CLUSTER SLOTS
# 添加节点
CLUSTER MEET ip port
# 删除节点
CLUSTER FORGET node_id
# 故障转移
CLUSTER FAILOVER
# 重新分片
redis-cli --cluster reshard 127.0.0.1:7001
```
---
## Redis高级特性
### 1. 事务
Redis事务通过MULTI、EXEC、DISCARD、WATCH实现。
#### 1.1 事务基本操作
```bash
# 开始事务
MULTI
# 执行命令(不会立即执行)
SET key1 value1
SET key2 value2
GET key1
# 执行事务
EXEC
# 取消事务
DISCARD
# 监控key
WATCH key1 key2
```
#### 1.2 事务示例
```java
/**
* Redis事务示例
*/
public class RedisTransactionExample {
/**
* 1. 原子转账
*/
public static void atomicTransfer() {
/*
* 使用场景:银行转账、账户扣款
*
* 实现:
* 1. 使用WATCH监控账户余额
* 2. 使用MULTI开始事务
* 3. 执行转账操作
* 4. 使用EXEC提交事务
*/
// 伪代码
String fromAccount = "account:" + fromUserId;
String toAccount = "account:" + toUserId;
// 监控账户
redis.watch(fromAccount, toAccount);
// 检查余额
double balance = Double.parseDouble(redis.get(fromAccount));
if (balance < amount) {
redis.unwatch();
throw new InsufficientBalanceException();
}
// 开始事务
Transaction transaction = redis.multi();
try {
// 执行转账
transaction.decrBy(fromAccount, amount);
transaction.incrBy(toAccount, amount);
// 提交事务
transaction.exec();
} catch (Exception e) {
transaction.discard();
throw e;
}
}
/**
* 2. 原子计数
*/
public static void atomicCounter() {
/*
* 使用场景:库存扣减、限量抢购
*/
// 伪代码
String stockKey = "product:stock:" + productId;
// 监控库存
redis.watch(stockKey);
// 检查库存
int stock = Integer.parseInt(redis.get(stockKey));
if (stock < 1) {
redis.unwatch();
throw new OutOfStockException();
}
// 开始事务
Transaction transaction = redis.multi();
try {
// 扣减库存
transaction.decr(stockKey);
// 提交事务
transaction.exec();
} catch (Exception e) {
transaction.discard();
throw e;
}
}
}
```
### 2. 发布订阅
Redis发布订阅是一种消息传递机制。
#### 2.1 基本操作
```bash
# 订阅频道
SUBSCRIBE channel1 channel2 channel3
# 取消订阅
UNSUBSCRIBE channel1 channel2
# 发布消息
PUBLISH channel message
# 查看活跃频道
PUBSUB CHANNELS
# 查看频道订阅数
PUBSUB NUMSUB channel1 channel2
```
#### 2.2 发布订阅示例
```java
/**
* Redis发布订阅示例
*/
public class PubSubExample {
/**
* 消息订阅者
*/
public static class MessageSubscriber extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println("收到消息 - 频道: " + channel + ", 消息: " + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("订阅频道: " + channel);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("取消订阅频道: " + channel);
}
}
/**
* 订阅消息
*/
public static void subscribeMessages() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
MessageSubscriber subscriber = new MessageSubscriber();
// 订阅频道
jedis.subscribe(subscriber, "news", "notifications");
// 取消订阅
jedis.unsubscribe(subscriber, "news");
jedis.close();
}
/**
* 发布消息
*/
public static void publishMessage() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 发布消息
long count = jedis.publish("news", "这是最新消息");
System.out.println("消息发送给 " + count + " 个订阅者");
jedis.close();
}
}
```
### 3. Lua脚本
Lua脚本可以在Redis服务器端执行,保证原子性。
#### 3.1 基本操作
```bash
# 执行Lua脚本
EVAL script numkeys key [key ...] arg [arg ...]
# 示例
EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 mykey myvalue
# 加载脚本(不立即执行)
SCRIPT LOAD script
# 执行已加载的脚本
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
# 脚本调试
SCRIPT DEBUG script
```
#### 3.2 Lua脚本示例
```java
/**
* Redis Lua脚本示例
*/
public class LuaScriptExample {
/**
* 1. 原子限流
*/
public static String rateLimitLuaScript() {
return "local key = KEYS[1]\n" +
"local limit = tonumber(ARGV[1])\n" +
"local expire = tonumber(ARGV[2])\n" +
"local current = redis.call('incr', key)\n" +
"if current == 1 then\n" +
" redis.call('expire', key, expire)\n" +
"end\n" +
"if current > limit then\n" +
" return 0\n" +
"else\n" +
" return 1\n" +
"end";
}
/**
* 2. 原子库存扣减
*/
public static String stockDeductLuaScript() {
return "local key = KEYS[1]\n" +
"local deduct = tonumber(ARGV[1])\n" +
"local stock = tonumber(redis.call('get', key))\n" +
"if stock == nil then\n" +
" redis.call('set', key, 0)\n" +
" stock = 0\n" +
"end\n" +
"if stock < deduct then\n" +
" return 0\n" +
"else\n" +
" redis.call('decrby', key, deduct)\n" +
" return stock - deduct\n" +
"end";
}
/**
* 3. 批量操作
*/
public static String batchOperationLuaScript() {
return "local result = {}\n" +
"for i = 1, #KEYS do\n" +
" result[i] = redis.call('get', KEYS[i])\n" +
"end\n" +
"return result";
}
/**
* 执行Lua脚本
*/
public static void executeLuaScript() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 执行限流脚本
String rateLimitScript = rateLimitLuaScript();
Long result = (Long) jedis.eval(
rateLimitScript,
1, // key的数量
"limit:api:123", // key
"100", // 限制次数
"60" // 过期时间(秒)
);
if (result == 1) {
System.out.println("允许访问");
} else {
System.out.println("超过限制");
}
jedis.close();
}
}
```
---
## Redis性能优化
### 1. 性能优化技巧
#### 1.1 网络优化
```java
/**
* 网络优化
*/
public class NetworkOptimization {
/**
* 1. 使用Pipeline批量执行命令
*/
public static void pipelineOptimization() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
Pipeline pipeline = jedis.pipelined();
// 批量设置
for (int i = 0; i < 1000; i++) {
pipeline.set("key:" + i, "value:" + i);
}
// 执行所有命令
pipeline.sync();
jedis.close();
}
/**
* 2. 使用连接池
*/
public static void connectionPool() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(100);
poolConfig.setMaxIdle(20);
poolConfig.setMinIdle(5);
poolConfig.setMaxWaitMillis(1000);
JedisPool jedisPool = new JedisPool(poolConfig, "127.0.0.1", 6379);
try (Jedis jedis = jedisPool.getResource()) {
jedis.set("key", "value");
}
jedisPool.close();
}
/**
* 3. 使用Lettuce异步客户端
*/
public static void asyncClient() {
RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
StatefulRedisConnection connection = redisClient.connect();
RedisAsyncCommands asyncCommands = connection.async();
// 异步操作
RedisFuture future = asyncCommands.get("key");
future.thenAccept(value -> {
System.out.println("异步获取: " + value);
});
connection.close();
redisClient.shutdown();
}
}
```
#### 1.2 内存优化
```java
/**
* 内存优化
*/
public class MemoryOptimization {
/**
* 1. 使用Hash代替多个String
*/
public static void hashOptimization() {
// 优化前:使用多个String
redis.set("user:1:id", "1");
redis.set("user:1:name", "张三");
redis.set("user:1:age", "25");
// 优化后:使用Hash
Map user = new HashMap<>();
user.put("id", "1");
user.put("name", "张三");
user.put("age", "25");
redis.hmset("user:1", user);
}
/**
* 2. 使用Bitmap代替Set
*/
public static void bitmapOptimization() {
// 优化前:使用Set
redis.sadd("users:online", "user1", "user2", "user3");
// 优化后:使用Bitmap
redis.setbit("users:online:bitmap", 1, true);
redis.setbit("users:online:bitmap", 2, true);
redis.setbit("users:online:bitmap", 3, true);
}
/**
* 3. 使用HyperLogLog统计UV
*/
public static void hyperLogLogOptimization() {
// 优化前:使用Set
redis.sadd("uv:daily", "user1", "user2", "user3");
long uv = redis.scard("uv:daily");
// 优化后:使用HyperLogLog
redis.pfadd("uv:daily:hll", "user1", "user2", "user3");
long uvHLL = redis.pfcount("uv:daily:hll");
}
}
```
### 2. 性能监控
```bash
# 查看Redis性能指标
INFO stats
# 查看慢查询日志
SLOWLOG GET 10
# 查看命令统计
INFO commandstats
# 监控Redis命令
MONITOR
# 查看客户端连接
CLIENT LIST
# 查看延迟
redis-cli --latency
redis-cli --latency-history
# 查看内存使用
INFO memory
```
---
## Redis实战应用
### 1. 分布式锁
```java
/**
* Redis分布式锁实现
*/
public class RedisDistributedLock {
private Jedis jedis;
private String lockKey;
private String lockValue;
private int lockTimeout;
/**
* 获取锁
*/
public boolean acquireLock(String lockKey, int timeout) {
this.lockKey = lockKey;
this.lockValue = UUID.randomUUID().toString();
this.lockTimeout = timeout;
// 使用SETNX获取锁
String result = jedis.set(lockKey, lockValue, "NX", "EX", timeout);
return "OK".equals(result);
}
/**
* 释放锁(Lua脚本保证原子性)
*/
public boolean releaseLock() {
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
Object result = jedis.eval(luaScript, 1, lockKey, lockValue);
return result.equals(1L);
}
/**
* 尝试获取锁(带重试)
*/
public boolean tryLock(String lockKey, int timeout, int retryInterval) {
long endTime = System.currentTimeMillis() + timeout * 1000L;
while (System.currentTimeMillis() < endTime) {
if (acquireLock(lockKey, timeout)) {
return true;
}
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
}
```
### 2. 限流器
```java
/**
* Redis限流器实现
*/
public class RedisRateLimiter {
private Jedis jedis;
/**
* 简单限流(滑动窗口)
*/
public boolean isAllowed(String key, int limit, int timeout) {
long currentTime = System.currentTimeMillis();
// 移除窗口外的数据
jedis.zremrangeByScore(key, 0, currentTime - timeout * 1000L);
// 获取窗口内的请求数
long count = jedis.zcard(key);
if (count < limit) {
// 添加当前请求
jedis.zadd(key, currentTime, UUID.randomUUID().toString());
return true;
}
return false;
}
/**
* 令牌桶限流
*/
public boolean isAllowedWithTokenBucket(String key, int capacity, int rate) {
String luaScript = "local key = KEYS[1]\n" +
"local capacity = tonumber(ARGV[1])\n" +
"local rate = tonumber(ARGV[2])\n" +
"local now = tonumber(ARGV[3])\n" +
"local info = redis.call('hmget', key, 'tokens', 'last_time')\n" +
"local tokens = tonumber(info[1])\n" +
"local lastTime = tonumber(info[2])\n" +
"if tokens == nil then\n" +
" tokens = capacity\n" +
" lastTime = now\n" +
"end\n" +
"local delta = math.max(0, now - lastTime)\n" +
"local filledTokens = math.min(capacity, tokens + (delta * rate / 1000))\n" +
"if filledTokens >= 1 then\n" +
" redis.call('hmset', key, 'tokens', filledTokens - 1, 'last_time', now)\n" +
" return 1\n" +
"else\n" +
" redis.call('hmset', key, 'tokens', filledTokens, 'last_time', now)\n" +
" return 0\n" +
"end";
Object result = jedis.eval(luaScript, 1, key,
String.valueOf(capacity),
String.valueOf(rate),
String.valueOf(System.currentTimeMillis()));
return result.equals(1L);
}
}
```
### 3. 缓存实现
```java
/**
* Redis缓存实现
*/
public class RedisCache {
private Jedis jedis;
/**
* 缓存数据
*/
public void cache(String key, Object value, int expireSeconds) {
String json = JSON.toJSONString(value);
jedis.setex(key, expireSeconds, json);
}
/**
* 获取缓存(带穿透保护)
*/
public T get(String key, Class clazz) {
String json = jedis.get(key);
if (json == null) {
// 缓存穿透保护:缓存空值
jedis.setex(key, 60, "");
return null;
}
if ("".equals(json)) {
return null;
}
return JSON.parseObject(json, clazz);
}
/**
* 删除缓存
*/
public void delete(String key) {
jedis.del(key);
}
/**
* 批量删除缓存
*/
public void deleteByPattern(String pattern) {
Set keys = jedis.keys(pattern);
if (keys != null && !keys.isEmpty()) {
jedis.del(keys.toArray(new String[0]));
}
}
}
```
---
## Redis监控与运维
### 1. 监控指标
```java
/**
* Redis监控指标
*/
public class RedisMonitor {
/**
* 1. 性能指标
*/
public static class PerformanceMetrics {
/*
* - 响应时间:延迟历史
* - 吞吐量:OPS(每秒操作数)
* - 命中率:缓存命中率
* - 连接数:客户端连接数
*/
}
/**
* 2. 内存指标
*/
public static class MemoryMetrics {
/*
* - used_memory: 已使用内存
* - max_memory: 最大内存限制
* - mem_fragmentation_ratio: 内存碎片率
* - evicted_keys: 驱逐的key数量
*/
}
/**
* 3. 持久化指标
*/
public static class PersistenceMetrics {
/*
* - rdb_last_save_time: 最后保存时间
* - aof_enabled: AOF是否启用
* - aof_rewrite_in_progress: AOF重写是否进行中
* - aof_last_rewrite_time_sec: AOF重写耗时
*/
}
}
```
### 2. 运维命令
```bash
# 查看Redis信息
INFO
# 查看服务器信息
INFO server
# 查看客户端信息
INFO clients
# 查看内存信息
INFO memory
# 查看持久化信息
INFO persistence
# 查看复制信息
INFO replication
# 查看CPU信息
INFO cpu
# 查看统计信息
INFO stats
# 查看命令统计
INFO commandstats
# 查看集群信息
INFO cluster
# 查看慢查询
SLOWLOG GET 10
# 查看慢查询日志长度
SLOWLOG LEN
# 清空慢查询日志
SLOWLOG RESET
```
---
## 常见问题与解决方案
### 1. 缓存雪崩
**问题**:大量缓存同时失效,导致数据库压力过大。
**解决方案**:
```java
/**
* 缓存雪崩解决方案
*/
public class CacheAvalanche {
/**
* 1. 设置随机过期时间
*/
public static void randomExpiration() {
int baseExpire = 3600; // 基础过期时间1小时
int randomExpire = (int) (Math.random() * 300); // 随机0-5分钟
int totalExpire = baseExpire + randomExpire;
redis.setex(key, totalExpire, value);
}
/**
* 2. 使用互斥锁
*/
public static void mutexLock() {
String lockKey = "lock:" + key;
if (redis.setnx(lockKey, "locked") == 1) {
try {
// 获取锁成功,从数据库加载
Object data = database.get(key);
redis.setex(key, expire, JSON.toJSONString(data));
return data;
} finally {
redis.del(lockKey);
}
}
// 获取锁失败,返回过期缓存或空值
return redis.get(key);
}
/**
* 3. 缓存预热
*/
public static void cacheWarmup() {
// 系统启动时加载热点数据到缓存
List hotKeys = database.getHotKeys();
for (String key : hotKeys) {
Object data = database.get(key);
redis.setex(key, 3600, JSON.toJSONString(data));
}
}
}
```
### 2. 缓存穿透
**问题**:查询不存在的数据,导致每次都查询数据库。
**解决方案**:
```java
/**
* 缓存穿透解决方案
*/
public class CachePenetration {
/**
* 1. 布隆过滤器
*/
public static class BloomFilter {
private BloomFilter filter;
public BloomFilter() {
this.filter = BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
1000000, // 预期元素数量
0.01 // 误判率
);
}
public boolean mightContain(String key) {
return filter.mightContain(key);
}
public void put(String key) {
filter.put(key);
}
}
/**
* 2. 缓存空值
*/
public static void cacheNullValue() {
Object data = database.get(key);
if (data == null) {
// 缓存空值,设置较短过期时间
redis.setex(key, 60, "");
} else {
redis.setex(key, 3600, JSON.toJSONString(data));
}
}
}
```
### 3. 缓存击穿
**问题**:热点数据过期时,大量请求同时查询数据库。
**解决方案**:
```java
/**
* 缓存击穿解决方案
*/
public class CacheBreakdown {
/**
* 1. 互斥锁(推荐)
*/
public static Object mutexLock() {
String lockKey = "lock:" + key;
if (redis.setnx(lockKey, "locked") == 1) {
redis.expire(lockKey, 10); // 设置锁过期时间
try {
Object data = database.get(key);
redis.setex(key, expire, JSON.toJSONString(data));
return data;
} finally {
redis.del(lockKey);
}
}
// 获取锁失败,等待并重试
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return getFromCacheOrDatabase(key);
}
/**
* 2. 永不过期(逻辑过期)
*/
public static Object logicalExpiration() {
String cacheKey = key;
String expireKey = key + ":expire";
String data = redis.get(cacheKey);
String expireTime = redis.get(expireKey);
// 如果缓存未过期,直接返回
if (expireTime != null && Long.parseLong(expireTime) > System.currentTimeMillis()) {
return data;
}
// 缓存已过期,使用互斥锁刷新
String lockKey = "lock:" + key;
if (redis.setnx(lockKey, "locked") == 1) {
try {
// 刷新缓存
Object newData = database.get(key);
redis.setex(cacheKey, 3600, JSON.toJSONString(newData));
redis.setex(expireKey, 3600, String.valueOf(System.currentTimeMillis() + 3600000));
return newData;
} finally {
redis.del(lockKey);
}
}
// 返回过期数据
return data;
}
}
```
---
## 总结
### 学习路径总结
通过本文的学习,你已经掌握了Redis的完整知识体系:
#### 1. 基础阶段
- ✅ Redis安装与配置
- ✅ 基本命令使用
- ✅ 五种数据类型及应用场景
- ✅ Java客户端使用
#### 2. 进阶阶段
- ✅ 持久化机制(RDB、AOF)
- ✅ 内存管理与优化
- ✅ 主从复制
- ✅ 哨兵机制
#### 3. 高级阶段
- ✅ Redis集群
- ✅ 事务
- ✅ 发布订阅
- ✅ Lua脚本
#### 4. 实战阶段
- ✅ 分布式锁
- ✅ 限流器
- ✅ 缓存实现
- ✅ 性能优化
#### 5. 专家阶段
- ✅ 缓存问题解决方案
- ✅ 监控与运维
- ✅ 架构设计
- ✅ 最佳实践
### 核心要点回顾
1. **数据类型选择**
- String:缓存、计数器、分布式锁
- Hash:对象存储、购物车
- List:消息队列、列表
- Set:去重、标签系统
- ZSet:排行榜、延时队列
2. **持久化策略**
- RDB:适合备份、快速恢复
- AOF:适合数据安全、最小丢失
- 混合持久化:平衡性能和安全
3. **集群架构**
- 主从复制:读写分离、高可用
- 哨兵:自动故障转移
- 集群:数据分片、水平扩展
4. **性能优化**
- Pipeline批量操作
- 连接池管理
- 合理设置过期时间
- 选择合适的数据类型
### 下一步学习建议
1. **深入源码**
- 阅读Redis源码
- 理解内部实现机制
2. **实践项目**
- 使用Redis构建实际项目
- 解决实际问题
3. **性能调优**
- 监控Redis性能
- 优化Redis配置
4. **架构设计**
- 设计高可用Redis架构
- 解决复杂业务问题
### 推荐资源
- [Redis官方文档](https://redis.io/documentation)
- [Redis设计与实现](http://redisbook.com/)
- [Redis实战](https://redis.io/commands)
掌握Redis,不仅能提升系统性能,还能解决很多复杂的技术问题。希望这篇完全指南能帮助你从Redis小白成长为Redis高手!