【Redis】【深度实战】【Java】

管理员
# Redis深度实战:从源码原理到生产级应用 ## 目录 1. [Redis核心架构与数据结构](#1-redis核心架构与数据结构) 2. [设计模式在Redis中的应用](#2-设计模式在redis中的应用) 3. [Java客户端反射机制深度应用](#3-java客户端反射机制深度应用) 4. [自定义注解实现缓存策略](#4-自定义注解实现缓存策略) 5. [Redis性能优化与JVM调优](#5-redis性能优化与jvm调优) 6. [生产级Redis工具类完整实现](#6-生产级redis工具类完整实现) 7. [分布式锁实战](#7-分布式锁实战) 8. [缓存穿透/击穿/雪崩解决方案](#8-缓存穿透击穿雪崩解决方案) --- ## 1. Redis核心架构与数据结构 ### 1.1 Redis对象系统(基于单例模式和工厂模式) Redis使用单例模式来管理数据库实例,每个Redis服务器进程维护一个全局唯一的`redisServer`对象。 ```c /* redis.h - Redis服务器对象定义 */ struct redisServer { // 单例模式:全局唯一的服务器实例 int port; /* TCP listening port */ int fd; /* TCP file descriptor */ redisDb *db; /* 多数据库支持 */ int dbnum; /* Total number of initialized DBs */ // 工厂模式:创建不同类型的对象 dict *commands; /* Command table */ // 时间事件处理器 aeEventLoop *el; /* Event loop */ }; ``` ### 1.2 五大基础数据结构实现 ```java /** * Redis五大基础数据结构Java封装 * 基于工厂模式创建不同类型的Redis对象 */ public class RedisDataStructure { /** * 数据结构类型枚举 */ public enum DataType { STRING, LIST, HASH, SET, ZSET } /** * 工厂方法:根据类型创建对应的Redis数据结构 */ public static RedisObject create(DataType type) { switch (type) { case STRING: return new RedisString(); case LIST: return new RedisList(); case HASH: return new RedisHash(); case SET: return new RedisSet(); case ZSET: return new RedisSortedSet(); default: throw new IllegalArgumentException("不支持的Redis数据类型: " + type); } } /** * Redis对象基类(模板方法模式) */ public static abstract class RedisObject { protected String key; // 模板方法:定义操作流程 public final boolean save(String key) { if (!validateKey(key)) { return false; } this.key = key; return doSave(); } protected abstract boolean doSave(); protected abstract boolean validateKey(String key); } /** * String类型实现(SDS - Simple Dynamic String) */ public static class RedisString extends RedisObject { private byte[] value; @Override protected boolean doSave() { // 模拟Redis SDS实现 // SDS结构:len + free + buf byte[] sds = new byte[value.length + 1]; // len字段 System.arraycopy(value, 0, sds, 1, value.length); sds[0] = (byte) value.length; return true; } @Override protected boolean validateKey(String key) { return key != null && !key.isEmpty(); } public void set(byte[] value) { this.value = value; } public byte[] get() { return value; } } /** * List类型实现(双向链表或压缩列表) */ public static class RedisList extends RedisObject { private List list = new LinkedList<>(); @Override protected boolean doSave() { // 当元素数量小于512且每个元素小于64字节时使用压缩列表 // 否则使用双向链表 return true; } @Override protected boolean validateKey(String key) { return key != null; } public void lpush(byte[] value) { list.add(0, value); } public byte[] rpop() { return list.isEmpty() ? null : list.remove(list.size() - 1); } public long llen() { return list.size(); } } /** * Hash类型实现(字典/哈希表) */ public static class RedisHash extends RedisObject { private Map map = new HashMap<>(); @Override protected boolean doSave() { // Redis哈希表使用渐进式rehash return true; } @Override protected boolean validateKey(String key) { return key != null; } public void hset(String field, byte[] value) { map.put(field, value); } public byte[] hget(String field) { return map.get(field); } } /** * Set类型实现(整数集合或哈希表) */ public static class RedisSet extends RedisObject { private Set set = new HashSet<>(); @Override protected boolean doSave() { return true; } @Override protected boolean validateKey(String key) { return key != null; } public void sadd(byte[] member) { set.add(member); } public boolean sismember(byte[] member) { return set.contains(member); } } /** * Sorted Set类型实现(跳表+字典) */ public static class RedisSortedSet extends RedisObject { // 字典:member -> score private Map dict = new HashMap<>(); // 跳表:按score排序 private TreeMap> zset = new TreeMap<>(); @Override protected boolean doSave() { return true; } @Override protected boolean validateKey(String key) { return key != null; } public void zadd(byte[] member, double score) { dict.put(member, score); zset.computeIfAbsent(score, k -> new HashSet<>()).add(member); } public Double zscore(byte[] member) { return dict.get(member); } } } ``` --- ## 2. 设计模式在Redis中的应用 ### 2.1 单例模式 - Redis连接池管理 ```java /** * Redis连接池 - 单例模式 * 确保全局只有一个连接池实例,避免资源浪费 */ public class RedisConnectionPool { // 饿汉式单例 private static final RedisConnectionPool INSTANCE = new RedisConnectionPool(); // 连接池配置 private JedisPool jedisPool; private final JedisPoolConfig poolConfig; /** * 私有构造函数 */ private RedisConnectionPool() { this.poolConfig = new JedisPoolConfig(); initializePoolConfig(); } /** * 获取单例实例 */ public static RedisConnectionPool getInstance() { return INSTANCE; } /** * 初始化连接池配置 */ private void initializePoolConfig() { // 最大连接数 poolConfig.setMaxTotal(200); // 最大空闲连接数 poolConfig.setMaxIdle(50); // 最小空闲连接数 poolConfig.setMinIdle(10); // 获取连接时的最大等待时间 poolConfig.setMaxWaitMillis(3000); // 连接有效性检测 poolConfig.setTestOnBorrow(true); poolConfig.setTestOnReturn(false); poolConfig.setTestWhileIdle(true); // 空闲连接检测周期 poolConfig.setTimeBetweenEvictionRunsMillis(60000); // 空闲连接最小空闲时间 poolConfig.setMinEvictableIdleTimeMillis(300000); // 连接超时时间 poolConfig.setSoTimeout(3000); } /** * 初始化连接池 */ public synchronized void init(String host, int port, String password) { if (jedisPool != null && !jedisPool.isClosed()) { return; } jedisPool = new JedisPool(poolConfig, host, port, 2000, password); System.out.println("Redis连接池初始化成功: " + host + ":" + port); } /** * 获取Redis连接 */ public Jedis getResource() { if (jedisPool == null) { throw new IllegalStateException("Redis连接池未初始化"); } return jedisPool.getResource(); } /** * 关闭连接池 */ public synchronized void close() { if (jedisPool != null && !jedisPool.isClosed()) { jedisPool.close(); System.out.println("Redis连接池已关闭"); } } /** * 获取连接池状态 */ public Map getPoolStats() { Map stats = new HashMap<>(); stats.put("numActive", jedisPool.getNumActive()); stats.put("numIdle", jedisPool.getNumIdle()); stats.put("numWaiters", jedisPool.getNumWaiters()); return stats; } } ``` ### 2.2 工厂模式 - 序列化器工厂 ```java /** * 序列化器工厂 - 工厂模式 * 根据不同类型创建对应的序列化器 */ public class SerializerFactory { /** * 序列化器类型枚举 */ public enum SerializerType { JDK_SERIALIZABLE, // Java原生序列化 JSON, // JSON序列化 PROTOSTUFF, // Protostuff序列化 KRYO, // Kryo序列化 HESSIAN // Hessian序列化 } /** * 缓存已创建的序列化器实例 */ private static final Map SERIALIZER_CACHE = new ConcurrentHashMap<>(); /** * 工厂方法:创建序列化器 */ public static RedisSerializer create(SerializerType type) { return SERIALIZER_CACHE.computeIfAbsent(type, t -> { switch (t) { case JDK_SERIALIZABLE: return new JdkSerializer(); case JSON: return new JsonSerializer(); case PROTOSTUFF: return new ProtostuffSerializer(); case KRYO: return new KryoSerializer(); case HESSIAN: return new HessianSerializer(); default: throw new IllegalArgumentException("不支持的序列化类型: " + type); } }); } /** * 序列化器接口 */ public interface RedisSerializer { byte[] serialize(T obj) throws SerializationException; T deserialize(byte[] data) throws SerializationException; } /** * JDK原生序列化实现 */ public static class JdkSerializer implements RedisSerializer { @Override public byte[] serialize(Object obj) throws SerializationException { if (obj == null) { return new byte[0]; } try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos)) { oos.writeObject(obj); return bos.toByteArray(); } catch (IOException e) { throw new SerializationException("JDK序列化失败", e); } } @Override public Object deserialize(byte[] data) throws SerializationException { if (data == null || data.length == 0) { return null; } try (ByteArrayInputStream bis = new ByteArrayInputStream(data); ObjectInputStream ois = new ObjectInputStream(bis)) { return ois.readObject(); } catch (IOException | ClassNotFoundException e) { throw new SerializationException("JDK反序列化失败", e); } } } /** * JSON序列化实现(使用Jackson) */ public static class JsonSerializer implements RedisSerializer { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); static { OBJECT_MAPPER.registerModule(new JavaTimeModule()); OBJECT_MAPPER.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); } @Override public byte[] serialize(Object obj) throws SerializationException { if (obj == null) { return new byte[0]; } try { return OBJECT_MAPPER.writeValueAsBytes(obj); } catch (JsonProcessingException e) { throw new SerializationException("JSON序列化失败", e); } } @Override public Object deserialize(byte[] data) throws SerializationException { if (data == null || data.length == 0) { return null; } try { return OBJECT_MAPPER.readValue(data, Object.class); } catch (IOException e) { throw new SerializationException("JSON反序列化失败", e); } } } /** * Protostuff序列化实现(高性能) */ public static class ProtostuffSerializer implements RedisSerializer { private static final Map, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>(); @Override @SuppressWarnings("unchecked") public byte[] serialize(Object obj) throws SerializationException { if (obj == null) { return new byte[0]; } Class clazz = obj.getClass(); Schema schema = getSchema(clazz); return ProtostuffIOUtil.toByteArray(obj, schema, buffer()); } @Override public Object deserialize(byte[] data) throws SerializationException { if (data == null || data.length == 0) { return null; } // Protostuff反序列化需要已知类型 // 实际应用中需要配合类型信息存储 throw new SerializationException("Protostuff反序列化需要指定类型"); } @SuppressWarnings("unchecked") private Schema getSchema(Class clazz) { return (Schema) SCHEMA_CACHE.computeIfAbsent(clazz, k -> RuntimeSchema.createFrom(clazz)); } private static ThreadLocal buffer = ThreadLocal.withInitial(() -> LinkedBuffer.allocate(512)); } /** * Kryo序列化实现(最高性能) */ public static class KryoSerializer implements RedisSerializer { private static final ThreadLocal kryoThreadLocal = ThreadLocal.withInitial(() -> { Kryo kryo = new Kryo(); kryo.setRegistrationRequired(false); kryo.setReferences(true); return kryo; }); @Override public byte[] serialize(Object obj) throws SerializationException { if (obj == null) { return new byte[0]; } try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); Output output = new Output(bos)) { Kryo kryo = kryoThreadLocal.get(); kryo.writeClassAndObject(output, obj); output.flush(); return bos.toByteArray(); } catch (IOException e) { throw new SerializationException("Kryo序列化失败", e); } } @Override public Object deserialize(byte[] data) throws SerializationException { if (data == null || data.length == 0) { return null; } try (Input input = new Input(new ByteArrayInputStream(data))) { Kryo kryo = kryoThreadLocal.get(); return kryo.readClassAndObject(input); } catch (IOException e) { throw new SerializationException("Kryo反序列化失败", e); } } } } ``` ### 2.3 策略模式 - 缓存过期策略 ```java /** * 缓存过期策略 - 策略模式 * 支持不同的缓存过期策略 */ public class CacheExpireStrategy { /** * 过期策略接口 */ public interface ExpireStrategy { /** * 计算过期时间(秒) */ long calculateExpireTime(String key, Object value); /** * 判断是否需要刷新缓存 */ boolean shouldRefresh(String key, Object value); } /** * 固定过期时间策略 */ public static class FixedExpireStrategy implements ExpireStrategy { private final long expireSeconds; public FixedExpireStrategy(long expireSeconds) { this.expireSeconds = expireSeconds; } @Override public long calculateExpireTime(String key, Object value) { return expireSeconds; } @Override public boolean shouldRefresh(String key, Object value) { return false; } } /** * 滑动过期时间策略 */ public static class SlidingExpireStrategy implements ExpireStrategy { private final long idleSeconds; public SlidingExpireStrategy(long idleSeconds) { this.idleSeconds = idleSeconds; } @Override public long calculateExpireTime(String key, Object value) { return idleSeconds; } @Override public boolean shouldRefresh(String key, Object value) { return true; // 每次访问都刷新 } } /** * 基于LRU的过期策略 */ public static class LRUExpireStrategy implements ExpireStrategy { private final int maxSize; private final Map accessTimeMap = new ConcurrentHashMap<>(); public LRUExpireStrategy(int maxSize) { this.maxSize = maxSize; } @Override public synchronized long calculateExpireTime(String key, Object value) { accessTimeMap.put(key, System.currentTimeMillis()); return 0; // 不设置过期时间,由LRU淘汰 } @Override public synchronized boolean shouldRefresh(String key, Object value) { accessTimeMap.put(key, System.currentTimeMillis()); return false; } /** * 获取需要淘汰的key */ public synchronized String getEvictKey() { if (accessTimeMap.size() <= maxSize) { return null; } return accessTimeMap.entrySet().stream() .min(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) .orElse(null); } } /** * 基于访问频率的过期策略(LFU变种) */ public static class FrequencyExpireStrategy implements ExpireStrategy { private final long maxAccessCount; private final Map accessCountMap = new ConcurrentHashMap<>(); public FrequencyExpireStrategy(long maxAccessCount) { this.maxAccessCount = maxAccessCount; } @Override public synchronized long calculateExpireTime(String key, Object value) { long count = accessCountMap.getOrDefault(key, 0L); accessCountMap.put(key, count + 1); // 访问次数越多,过期时间越长 return Math.min(3600, count * 10); } @Override public boolean shouldRefresh(String key, Object value) { return false; } } } ``` --- ## 3. Java客户端反射机制深度应用 ### 3.1 基于反射的Redis键生成器 ```java /** * Redis键生成器 - 使用反射机制自动生成缓存键 */ public class RedisKeyGenerator { /** * 键生成策略枚举 */ public enum KeyGenerationStrategy { SIMPLE, // 简单策略:类名+方法名+参数值 WITH_PACKAGE, // 包含包名 WITH_HASH, // 包含哈希值 CUSTOM_PREFIX // 自定义前缀 } private KeyGenerationStrategy strategy; private String customPrefix; public RedisKeyGenerator(KeyGenerationStrategy strategy) { this.strategy = strategy; } public RedisKeyGenerator(String customPrefix) { this.strategy = KeyGenerationStrategy.CUSTOM_PREFIX; this.customPrefix = customPrefix; } /** * 生成Redis键 */ public String generateKey(Method method, Object[] args) throws Exception { StringBuilder keyBuilder = new StringBuilder(); // 添加前缀 if (strategy == KeyGenerationStrategy.CUSTOM_PREFIX) { keyBuilder.append(customPrefix).append(":"); } else if (strategy == KeyGenerationStrategy.WITH_PACKAGE) { keyBuilder.append(method.getDeclaringClass().getName()).append("."); } else { keyBuilder.append(method.getDeclaringClass().getSimpleName()).append("."); } // 添加方法名 keyBuilder.append(method.getName()); // 添加参数 if (args != null && args.length > 0) { keyBuilder.append(":"); String params = generateParamsString(args); if (strategy == KeyGenerationStrategy.WITH_HASH) { // 使用哈希值作为参数部分 String hash = DigestUtils.md5Hex(params); keyBuilder.append(hash); } else { keyBuilder.append(params); } } return keyBuilder.toString(); } /** * 生成参数字符串(使用反射处理复杂对象) */ private String generateParamsString(Object[] args) throws Exception { List paramList = new ArrayList<>(); for (Object arg : args) { if (arg == null) { paramList.add("null"); continue; } Class argClass = arg.getClass(); if (isPrimitiveOrWrapper(argClass) || arg instanceof String) { paramList.add(String.valueOf(arg)); } else { // 复杂对象,使用反射获取字段值 paramList.append(generateObjectString(arg)); } } return String.join(",", paramList); } /** * 生成复杂对象的字符串表示 */ private String generateObjectString(Object obj) throws Exception { StringBuilder sb = new StringBuilder(obj.getClass().getSimpleName()).append("["); Field[] fields = obj.getClass().getDeclaredFields(); List fieldValues = new ArrayList<>(); for (Field field : fields) { field.setAccessible(true); Object value = field.get(obj); if (value == null) { fieldValues.add(field.getName() + "=null"); } else if (isPrimitiveOrWrapper(value.getClass())) { fieldValues.add(field.getName() + "=" + value); } else { // 递归处理嵌套对象 fieldValues.add(field.getName() + "=" + generateObjectString(value)); } } sb.append(String.join(",", fieldValues)).append("]"); return sb.toString(); } /** * 判断是否为基本类型或包装类型 */ private boolean isPrimitiveOrWrapper(Class clazz) { return clazz.isPrimitive() || clazz == Byte.class || clazz == Short.class || clazz == Integer.class || clazz == Long.class || clazz == Float.class || clazz == Double.class || clazz == Boolean.class || clazz == Character.class; } } ``` ### 3.2 动态Redis操作代理 ```java /** * 动态Redis操作代理 * 使用反射和动态代理实现Redis操作的通用化 */ public class RedisOperationProxy implements InvocationHandler { private final Jedis jedis; private final String keyPrefix; private final RedisKeyGenerator keyGenerator; private final Map> serializerMap = new ConcurrentHashMap<>(); public RedisOperationProxy(Jedis jedis, String keyPrefix, RedisKeyGenerator.KeyGenerationStrategy strategy) { this.jedis = jedis; this.keyPrefix = keyPrefix; this.keyGenerator = new RedisKeyGenerator(strategy); } /** * 创建代理实例 */ @SuppressWarnings("unchecked") public static T createProxy(Jedis jedis, String keyPrefix, RedisKeyGenerator.KeyGenerationStrategy strategy, Class interfaceClass) { return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new RedisOperationProxy(jedis, keyPrefix, strategy) ); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); // 生成Redis键 String redisKey = keyPrefix + ":" + methodName; // 根据方法名路由到不同的Redis操作 if (methodName.startsWith("get")) { return handleGet(method, args, redisKey); } else if (methodName.startsWith("set")) { return handleSet(method, args, redisKey); } else if (methodName.startsWith("del")) { return handleDelete(method, args, redisKey); } else if (methodName.startsWith("hget")) { return handleHGet(method, args, redisKey); } else if (methodName.startsWith("hset")) { return handleHSet(method, args, redisKey); } else { throw new UnsupportedOperationException("不支持的操作: " + methodName); } } /** * 处理GET操作 */ private Object handleGet(Method method, Object[] args, String redisKey) throws Exception { String key = generateKeyWithParams(method, args, redisKey); byte[] data = jedis.get(key.getBytes()); if (data == null) { return null; } @SuppressWarnings("unchecked") RedisSerializer serializer = (RedisSerializer) getSerializer(method.getReturnType()); return serializer.deserialize(data); } /** * 处理SET操作 */ private Object handleSet(Method method, Object[] args, String redisKey) throws Exception { if (args == null || args.length == 0) { throw new IllegalArgumentException("SET操作需要参数"); } String key = generateKeyWithParams(method, Arrays.copyOf(args, args.length - 1), redisKey); Object value = args[args.length - 1]; RedisSerializer serializer = getSerializer(value.getClass()); byte[] data = serializer.serialize(value); jedis.set(key.getBytes(), data); return true; } /** * 处理DELETE操作 */ private Long handleDelete(Method method, Object[] args, String redisKey) throws Exception { String key = generateKeyWithParams(method, args, redisKey); return jedis.del(key); } /** * 处理HGET操作 */ private Object handleHGet(Method method, Object[] args, String redisKey) throws Exception { if (args == null || args.length != 1) { throw new IllegalArgumentException("HGET操作需要1个参数"); } String hashKey = String.valueOf(args[0]); byte[] data = jedis.hget(redisKey.getBytes(), hashKey.getBytes()); if (data == null) { return null; } @SuppressWarnings("unchecked") RedisSerializer serializer = (RedisSerializer) getSerializer(method.getReturnType()); return serializer.deserialize(data); } /** * 处理HSET操作 */ private Object handleHSet(Method method, Object[] args, String redisKey) throws Exception { if (args == null || args.length != 2) { throw new IllegalArgumentException("HSET操作需要2个参数"); } String hashKey = String.valueOf(args[0]); Object value = args[1]; RedisSerializer serializer = getSerializer(value.getClass()); byte[] data = serializer.serialize(value); jedis.hset(redisKey.getBytes(), hashKey.getBytes(), data); return true; } /** * 生成包含参数的完整键 */ private String generateKeyWithParams(Method method, Object[] args, String baseKey) throws Exception { if (args == null || args.length == 0) { return baseKey; } return baseKey + ":" + keyGenerator.generateKey(method, args); } /** * 获取序列化器 */ @SuppressWarnings("unchecked") private RedisSerializer getSerializer(Class clazz) { return (RedisSerializer) serializerMap.computeIfAbsent(clazz.getName(), k -> SerializerFactory.create(SerializerFactory.SerializerType.KRYO)); } } ``` --- ## 4. 自定义注解实现缓存策略 ### 4.1 缓存注解定义 ```java /** * Redis缓存相关注解定义 */ public class CacheAnnotations { /** * 缓存注解 - 标记方法使用缓存 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Cacheable { /** * 缓存键前缀 */ String prefix() default ""; /** * 缓存键的SpEL表达式 */ String key() default ""; /** * 缓存过期时间(秒) */ long expire() default 300; /** * 序列化器类型 */ SerializerFactory.SerializerType serializer() default SerializerFactory.SerializerType.KRYO; /** * 是否使用本地缓存 */ boolean localCache() default false; /** * 本地缓存最大数量 */ int localCacheSize() default 1000; } /** * 缓存更新注解 - 执行方法后更新缓存 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface CachePut { /** * 缓存键前缀 */ String prefix() default ""; /** * 缓存键的SpEL表达式 */ String key() default ""; /** * 缓存过期时间(秒) */ long expire() default 300; /** * 序列化器类型 */ SerializerFactory.SerializerType serializer() default SerializerFactory.SerializerType.KRYO; } /** * 缓存删除注解 - 执行方法后删除缓存 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface CacheEvict { /** * 缓存键前缀 */ String prefix() default ""; /** * 缓存键的SpEL表达式 */ String key() default ""; /** * 是否删除所有匹配前缀的缓存 */ boolean allEntries() default false; /** * 是否在方法执行前删除 */ boolean beforeInvocation() default false; } /** * 缓存配置注解 - 标记类使用缓存 */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface CacheConfig { /** * 默认缓存键前缀 */ String prefix() default ""; /** * 默认缓存过期时间(秒) */ long expire() default 300; /** * 默认序列化器类型 */ SerializerFactory.SerializerType serializer() default SerializerFactory.SerializerType.KRYO; /** * 是否启用缓存 */ boolean enabled() default true; } /** * 缓存预热注解 - 应用启动时预热缓存 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface CacheWarmUp { /** * 预热参数 */ String[] params() default {}; /** * 是否异步执行 */ boolean async() default true; } } ``` ### 4.2 缓存切面实现 ```java /** * Redis缓存切面 - 使用AOP实现缓存功能 */ @Aspect @Component public class RedisCacheAspect { private static final Logger logger = LoggerFactory.getLogger(RedisCacheAspect.class); @Autowired private RedisTemplate redisTemplate; @Autowired private RedisKeyGenerator keyGenerator; /** * 缓存本地存储(Caffeine) */ private static final Map> LOCAL_CACHES = new ConcurrentHashMap<>(); /** * 表达式解析器 */ private final SpelExpressionParser parser = new SpelExpressionParser(); /** * 拦截@Cacheable注解 */ @Around("@annotation(cacheable)") public Object handleCacheable(ProceedingJoinPoint joinPoint, Cacheable cacheable) throws Throwable { Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); // 生成缓存键 String cacheKey = generateCacheKey(joinPoint, cacheable.prefix(), cacheable.key()); // 尝试从本地缓存获取 if (cacheable.localCache()) { Object localValue = getFromLocalCache(cacheKey, cacheable.localCacheSize()); if (localValue != null) { logger.debug("从本地缓存获取: {}", cacheKey); return localValue; } } // 尝试从Redis获取 Object value = redisTemplate.opsForValue().get(cacheKey); if (value != null) { logger.debug("从Redis缓存获取: {}", cacheKey); // 存入本地缓存 if (cacheable.localCache()) { putToLocalCache(cacheKey, value, cacheable.localCacheSize()); } return value; } // 执行目标方法 value = joinPoint.proceed(); // 存入缓存 if (value != null) { redisTemplate.opsForValue().set(cacheKey, value, cacheable.expire(), TimeUnit.SECONDS); if (cacheable.localCache()) { putToLocalCache(cacheKey, value, cacheable.localCacheSize()); } logger.debug("存入缓存: {}", cacheKey); } return value; } /** * 拦截@CachePut注解 */ @Around("@annotation(cachePut)") public Object handleCachePut(ProceedingJoinPoint joinPoint, CachePut cachePut) throws Throwable { // 执行目标方法 Object value = joinPoint.proceed(); // 生成缓存键 String cacheKey = generateCacheKey(joinPoint, cachePut.prefix(), cachePut.key()); // 更新缓存 if (value != null) { redisTemplate.opsForValue().set(cacheKey, value, cachePut.expire(), TimeUnit.SECONDS); // 更新本地缓存 Cache localCache = LOCAL_CACHES.get("default"); if (localCache != null) { localCache.put(cacheKey, value); } logger.debug("更新缓存: {}", cacheKey); } return value; } /** * 拦截@CacheEvict注解 */ @Around("@annotation(cacheEvict)") public Object handleCacheEvict(ProceedingJoinPoint joinPoint, CacheEvict cacheEvict) throws Throwable { // 如果在方法执行前删除 if (cacheEvict.beforeInvocation()) { evictCache(joinPoint, cacheEvict); } // 执行目标方法 Object result = joinPoint.proceed(); // 如果在方法执行后删除 if (!cacheEvict.beforeInvocation()) { evictCache(joinPoint, cacheEvict); } return result; } /** * 删除缓存 */ private void evictCache(ProceedingJoinPoint joinPoint, CacheEvict cacheEvict) { try { if (cacheEvict.allEntries()) { // 删除所有匹配前缀的缓存 String prefix = cacheEvict.prefix(); Set keys = redisTemplate.keys(prefix + ":*"); if (keys != null && !keys.isEmpty()) { redisTemplate.delete(keys); logger.debug("删除所有缓存: {}, 数量: {}", prefix, keys.size()); } } else { // 删除单个缓存 String cacheKey = generateCacheKey(joinPoint, cacheEvict.prefix(), cacheEvict.key()); redisTemplate.delete(cacheKey); logger.debug("删除缓存: {}", cacheKey); } } catch (Exception e) { logger.error("删除缓存失败", e); } } /** * 生成缓存键 */ private String generateCacheKey(ProceedingJoinPoint joinPoint, String prefix, String key) { Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); // 如果指定了SpEL表达式 if (!key.isEmpty()) { StandardEvaluationContext context = new StandardEvaluationContext(); context.setVariable("methodName", method.getName()); context.setVariable("args", joinPoint.getArgs()); context.setVariable("target", joinPoint.getTarget()); Expression expression = parser.parseExpression(key); String keyPart = expression.getValue(context, String.class); return prefix + ":" + keyPart; } // 使用反射生成键 try { String keyPart = keyGenerator.generateKey(method, joinPoint.getArgs()); return prefix + ":" + keyPart; } catch (Exception e) { logger.error("生成缓存键失败", e); return prefix + ":" + System.currentTimeMillis(); } } /** * 从本地缓存获取 */ private Object getFromLocalCache(String key, int maxSize) { Cache cache = LOCAL_CACHES.computeIfAbsent("default", k -> Caffeine.newBuilder() .maximumSize(maxSize) .expireAfterWrite(5, TimeUnit.MINUTES) .recordStats() .build()); return cache.getIfPresent(key); } /** * 存入本地缓存 */ private void putToLocalCache(String key, Object value, int maxSize) { Cache cache = LOCAL_CACHES.computeIfAbsent("default", k -> Caffeine.newBuilder() .maximumSize(maxSize) .expireAfterWrite(5, TimeUnit.MINUTES) .recordStats() .build()); cache.put(key, value); } } ``` --- ## 5. Redis性能优化与JVM调优 ### 5.1 Redis服务器端优化配置 ```bash # redis.conf 优化配置 # 内存配置 maxmemory 4gb maxmemory-policy allkeys-lru maxmemory-samples 5 # 持久化配置 save 900 1 save 300 10 save 60 10000 appendonly yes appendfsync everysec no-appendfsync-on-rewrite no auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb # 网络配置 tcp-backlog 511 timeout 0 tcp-keepalive 300 tcp-backlog 511 # 客户端连接配置 maxclients 10000 # 慢查询配置 slowlog-log-slower-than 10000 slowlog-max-len 128 # 哈希表配置 hash-max-ziplist-entries 512 hash-max-ziplist-value 64 # 列表配置 list-max-ziplist-size -2 list-compress-depth 0 # 集合配置 set-max-intset-entries 512 # 有序集合配置 zset-max-ziplist-entries 128 zset-max-ziplist-value 64 # HyperLogLog配置 hll-sparse-max-bytes 3000 # 流配置 stream-node-max-bytes 4096 stream-node-max-entries 100 # 激进重置配置 activerehashing yes # 客户端输出缓冲区配置 client-output-buffer-limit normal 0 0 0 client-output-buffer-limit replica 256mb 64mb 60 client-output-buffer-limit pubsub 32mb 8mb 60 # 频率限制 hz 10 dynamic-hz yes # AOF重写配置 aof-rewrite-incremental-fsync yes rdb-save-incremental-fsync yes ``` ### 5.2 Java客户端JVM调优 ```java /** * Redis客户端JVM优化配置 */ public class RedisJVMOptimizer { /** * JVM参数配置 * * 启动参数: * -Xms4G -Xmx4G * -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=512m * -XX:+UseG1GC * -XX:MaxGCPauseMillis=50 * -XX:G1NewSizePercent=30 * -XX:G1MaxNewSizePercent=50 * -XX:ParallelGCThreads=8 * -XX:ConcGCThreads=2 * -XX:+UseLargePages * -XX:+AlwaysPreTouch * -XX:MaxDirectMemorySize=2G * -XX:+HeapDumpOnOutOfMemoryError * -XX:HeapDumpPath=/var/log/redis/heapdump.hprof * -Djava.awt.headless=true * -Dfile.encoding=UTF-8 */ /** * 网络I/O优化配置 */ public static void optimizeNetworkIO(JedisPoolConfig poolConfig) { // 设置TCP参数 System.setProperty("sun.net.useExclusiveBind", "false"); // 优化连接池配置 poolConfig.setMaxTotal(500); // 最大连接数 poolConfig.setMaxIdle(100); // 最大空闲连接 poolConfig.setMinIdle(20); // 最小空闲连接 poolConfig.setMaxWaitMillis(3000); // 获取连接最大等待时间 poolConfig.setTestOnBorrow(true); // 借出连接时测试 poolConfig.setTestOnReturn(false); // 归还连接时不测试 poolConfig.setTestWhileIdle(true); // 空闲时测试 poolConfig.setTimeBetweenEvictionRunsMillis(60000); // 检测周期 poolConfig.setMinEvictableIdleTimeMillis(300000); // 最小空闲时间 poolConfig.setNumTestsPerEvictionRun(10); // 每次检测的连接数 poolConfig.setBlockWhenExhausted(true); // 连接耗尽时阻塞 } /** * 序列化优化配置 */ public static void optimizeSerialization() { // 使用Kryo序列化器,性能最优 System.setProperty("redis.serializer.type", "KRYO"); // 配置Kryo线程池 System.setProperty("kryo.thread.pool.size", String.valueOf(Runtime.getRuntime().availableProcessors())); } /** * 监控配置 */ public static void configureMonitoring(JedisPool jedisPool) { // 定期输出连接池状态 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { JedisPool pool = (JedisPool) jedisPool; logger.info("Redis连接池状态 - Active: {}, Idle: {}, Waiters: {}", pool.getNumActive(), pool.getNumIdle(), pool.getNumWaiters()); }, 1, 1, TimeUnit.MINUTES); } /** * 优化直接内存使用 */ public static void optimizeDirectMemory() { // 配置Netty的堆外内存(如果使用Netty) System.setProperty("io.netty.allocator.type", "pooled"); System.setProperty("io.netty.leakDetection.level", "simple"); } } ``` ### 5.3 性能监控与诊断 ```java /** * Redis性能监控工具 */ public class RedisPerformanceMonitor { private static final Logger logger = LoggerFactory.getLogger(RedisPerformanceMonitor.class); private final JedisPool jedisPool; // 性能指标收集 private final AtomicLong totalRequests = new AtomicLong(0); private final AtomicLong totalErrors = new AtomicLong(0); private final AtomicLong totalLatency = new AtomicLong(0); // 慢查询阈值(毫秒) private static final long SLOW_QUERY_THRESHOLD = 100; public RedisPerformanceMonitor(JedisPool jedisPool) { this.jedisPool = jedisPool; } /** * 执行Redis操作并收集性能指标 */ public T execute(String command, RedisCallback callback) { long startTime = System.currentTimeMillis(); totalRequests.incrementAndGet(); try (Jedis jedis = jedisPool.getResource()) { T result = callback.doInRedis(jedis); long latency = System.currentTimeMillis() - startTime; totalLatency.addAndGet(latency); // 慢查询记录 if (latency > SLOW_QUERY_THRESHOLD) { logger.warn("Redis慢查询 - 命令: {}, 耗时: {}ms", command, latency); recordSlowQuery(command, latency); } return result; } catch (Exception e) { totalErrors.incrementAndGet(); logger.error("Redis操作失败 - 命令: {}, 错误: {}", command, e.getMessage()); throw new RedisException("Redis操作失败", e); } } /** * 记录慢查询 */ private void recordSlowQuery(String command, long latency) { try (Jedis jedis = jedisPool.getResource()) { jedis.sadd("redis:slow:queries", String.format("%s|%d|%d", command, latency, System.currentTimeMillis())); // 保留最近1000条慢查询 jedis.spop("redis:slow:queries", Math.max(0, jedis.scard("redis:slow:queries") - 1000)); } catch (Exception e) { logger.error("记录慢查询失败", e); } } /** * 获取性能指标 */ public Map getMetrics() { Map metrics = new HashMap<>(); long requests = totalRequests.get(); long errors = totalErrors.get(); long latency = totalLatency.get(); metrics.put("totalRequests", requests); metrics.put("totalErrors", errors); metrics.put("errorRate", requests > 0 ? (double) errors / requests * 100 : 0); metrics.put("avgLatency", requests > 0 ? (double) latency / requests : 0); metrics.put("poolActive", jedisPool.getNumActive()); metrics.put("poolIdle", jedisPool.getNumIdle()); metrics.put("poolWaiters", jedisPool.getNumWaiters()); return metrics; } /** * 获取Redis服务器信息 */ public Map getRedisInfo() { try (Jedis jedis = jedisPool.getResource()) { String info = jedis.info(); return parseRedisInfo(info); } catch (Exception e) { logger.error("获取Redis服务器信息失败", e); return Collections.emptyMap(); } } /** * 解析Redis INFO命令输出 */ private Map parseRedisInfo(String info) { Map result = new HashMap<>(); String[] lines = info.split("\r\n"); String currentSection = ""; for (String line : lines) { line = line.trim(); if (line.isEmpty()) { continue; } // 识别section if (line.startsWith("# ")) { currentSection = line.substring(2); continue; } // 解析key-value String[] parts = line.split(":"); if (parts.length == 2) { String key = parts[0]; String value = parts[1]; // 数值转换 try { if (value.contains(".")) { result.put(key, Double.parseDouble(value)); } else { result.put(key, Long.parseLong(value)); } } catch (NumberFormatException e) { result.put(key, value); } } } return result; } /** * Redis操作回调接口 */ @FunctionalInterface public interface RedisCallback { T doInRedis(Jedis jedis); } } ``` --- ## 6. 生产级Redis工具类完整实现 ```java /** * 生产级Redis工具类 * 整合设计模式、反射、注解和性能优化 */ public class RedisUtils { private static final Logger logger = LoggerFactory.getLogger(RedisUtils.class); // 单例连接池 private static volatile RedisConnectionPool connectionPool; // 性能监控器 private static RedisPerformanceMonitor performanceMonitor; // 序列化器缓存 private static final Map, RedisSerializer> SERIALIZER_MAP = new ConcurrentHashMap<>(); // 默认序列化器 private static final RedisSerializer DEFAULT_SERIALIZER = SerializerFactory.create(SerializerFactory.SerializerType.KRYO); /** * 初始化Redis连接池(双重检查锁定) */ public static void init(String host, int port, String password) { if (connectionPool == null) { synchronized (RedisUtils.class) { if (connectionPool == null) { connectionPool = RedisConnectionPool.getInstance(); connectionPool.init(host, port, password); JedisPool jedisPool = ((JedisPool) connectionPool.getClass() .getDeclaredField("jedisPool").get(connectionPool)); performanceMonitor = new RedisPerformanceMonitor(jedisPool); logger.info("Redis工具类初始化成功"); } } } } /** * 获取Jedis连接 */ private static Jedis getJedis() { if (connectionPool == null) { throw new IllegalStateException("Redis未初始化"); } return connectionPool.getResource(); } /** * 关闭连接池 */ public static void close() { if (connectionPool != null) { connectionPool.close(); } } // ==================== String操作 ==================== /** * 设置字符串 */ public static boolean set(String key, String value) { return set(key, value, 0); } /** * 设置字符串(带过期时间) */ public static boolean set(String key, String value, int seconds) { return performanceMonitor.execute("SET", jedis -> { if (seconds > 0) { jedis.setex(key, seconds, value); } else { jedis.set(key, value); } return true; }); } /** * 获取字符串 */ public static String get(String key) { return performanceMonitor.execute("GET", jedis -> jedis.get(key)); } /** * 设置对象(使用序列化) */ public static boolean setObject(String key, T value) { return setObject(key, value, 0); } /** * 设置对象(带过期时间) */ @SuppressWarnings("unchecked") public static boolean setObject(String key, T value, int seconds) { return performanceMonitor.execute("SETOBJ", jedis -> { RedisSerializer serializer = (RedisSerializer) SERIALIZER_MAP.computeIfAbsent(value.getClass(), k -> SerializerFactory.create(SerializerFactory.SerializerType.KRYO)); byte[] data = serializer.serialize(value); if (seconds > 0) { jedis.setex(key.getBytes(), seconds, data); } else { jedis.set(key.getBytes(), data); } return true; }); } /** * 获取对象 */ @SuppressWarnings("unchecked") public static T getObject(String key, Class clazz) { return performanceMonitor.execute("GETOBJ", jedis -> { byte[] data = jedis.get(key.getBytes()); if (data == null) { return null; } RedisSerializer serializer = (RedisSerializer) SERIALIZER_MAP.computeIfAbsent(clazz, k -> SerializerFactory.create(SerializerFactory.SerializerType.KRYO)); return serializer.deserialize(data); }); } // ==================== Hash操作 ==================== /** * 设置Hash字段 */ public static boolean hset(String key, String field, String value) { return performanceMonitor.execute("HSET", jedis -> { jedis.hset(key, field, value); return true; }); } /** * 获取Hash字段 */ public static String hget(String key, String field) { return performanceMonitor.execute("HGET", jedis -> jedis.hget(key, field)); } /** * 设置Hash对象字段 */ public static boolean hsetObject(String key, String field, Object value) { return performanceMonitor.execute("HSETOBJ", jedis -> { @SuppressWarnings("unchecked") RedisSerializer serializer = (RedisSerializer) SERIALIZER_MAP.computeIfAbsent(value.getClass(), k -> SerializerFactory.create(SerializerFactory.SerializerType.KRYO)); byte[] data = serializer.serialize(value); jedis.hset(key.getBytes(), field.getBytes(), data); return true; }); } /** * 获取Hash对象字段 */ public static T hgetObject(String key, String field, Class clazz) { return performanceMonitor.execute("HGETOBJ", jedis -> { byte[] data = jedis.hget(key.getBytes(), field.getBytes()); if (data == null) { return null; } @SuppressWarnings("unchecked") RedisSerializer serializer = (RedisSerializer) SERIALIZER_MAP.computeIfAbsent(clazz, k -> SerializerFactory.create(SerializerFactory.SerializerType.KRYO)); return serializer.deserialize(data); }); } /** * 获取所有Hash字段 */ public static Map hgetAll(String key) { return performanceMonitor.execute("HGETALL", jedis -> jedis.hgetAll(key)); } /** * 删除Hash字段 */ public static long hdel(String key, String... fields) { return performanceMonitor.execute("HDEL", jedis -> jedis.hdel(key, fields)); } // ==================== List操作 ==================== /** * 左侧推入列表 */ public static long lpush(String key, String... values) { return performanceMonitor.execute("LPUSH", jedis -> jedis.lpush(key, values)); } /** * 右侧弹出列表 */ public static String rpop(String key) { return performanceMonitor.execute("RPOP", jedis -> jedis.rpop(key)); } /** * 获取列表长度 */ public static long llen(String key) { return performanceMonitor.execute("LLEN", jedis -> jedis.llen(key)); } /** * 获取列表范围内的元素 */ public static List lrange(String key, long start, long stop) { return performanceMonitor.execute("LRANGE", jedis -> jedis.lrange(key, start, stop)); } // ==================== Set操作 ==================== /** * 添加集合成员 */ public static long sadd(String key, String... members) { return performanceMonitor.execute("SADD", jedis -> jedis.sadd(key, members)); } /** * 判断成员是否存在 */ public static boolean sismember(String key, String member) { return performanceMonitor.execute("SISMEMBER", jedis -> jedis.sismember(key, member)); } /** * 获取所有成员 */ public static Set smembers(String key) { return performanceMonitor.execute("SMEMBERS", jedis -> jedis.smembers(key)); } /** * 移除成员 */ public static long srem(String key, String... members) { return performanceMonitor.execute("SREM", jedis -> jedis.srem(key, members)); } // ==================== Sorted Set操作 ==================== /** * 添加有序集合成员 */ public static boolean zadd(String key, double score, String member) { return performanceMonitor.execute("ZADD", jedis -> { jedis.zadd(key, score, member); return true; }); } /** * 获取成员分数 */ public static Double zscore(String key, String member) { return performanceMonitor.execute("ZSCORE", jedis -> jedis.zscore(key, member)); } /** * 获取排名范围内的成员 */ public static Set zrange(String key, long start, long stop) { return performanceMonitor.execute("ZRANGE", jedis -> jedis.zrange(key, start, stop)); } /** * 获取分数范围内的成员 */ public static Set zrangeByScore(String key, double min, double max) { return performanceMonitor.execute("ZRANGEBYSCORE", jedis -> jedis.zrangeByScore(key, min, max)); } // ==================== 通用操作 ==================== /** * 删除键 */ public static long del(String... keys) { return performanceMonitor.execute("DEL", jedis -> jedis.del(keys)); } /** * 判断键是否存在 */ public static boolean exists(String key) { return performanceMonitor.execute("EXISTS", jedis -> jedis.exists(key)); } /** * 设置过期时间 */ public static boolean expire(String key, int seconds) { return performanceMonitor.execute("EXPIRE", jedis -> jedis.expire(key, seconds) == 1); } /** * 获取过期时间 */ public static long ttl(String key) { return performanceMonitor.execute("TTL", jedis -> jedis.ttl(key)); } /** * 获取键的类型 */ public static String type(String key) { return performanceMonitor.execute("TYPE", jedis -> jedis.type(key)); } /** * 模糊查询键 */ public static Set keys(String pattern) { return performanceMonitor.execute("KEYS", jedis -> jedis.keys(pattern)); } /** * 批量获取 */ public static List mget(String... keys) { return performanceMonitor.execute("MGET", jedis -> jedis.mget(keys)); } /** * 批量设置 */ public static boolean mset(Map keyValueMap) { return performanceMonitor.execute("MSET", jedis -> { String[] keyValueArray = keyValueMap.entrySet().stream() .flatMap(e -> Stream.of(e.getKey(), e.getValue())) .toArray(String[]::new); jedis.mset(keyValueArray); return true; }); } /** * 自增 */ public static long incr(String key) { return performanceMonitor.execute("INCR", jedis -> jedis.incr(key)); } /** * 指定步长自增 */ public static long incrBy(String key, long increment) { return performanceMonitor.execute("INCRBY", jedis -> jedis.incrBy(key, increment)); } /** * 自减 */ public static long decr(String key) { return performanceMonitor.execute("DECR", jedis -> jedis.decr(key)); } /** * 指定步长自减 */ public static long decrBy(String key, long decrement) { return performanceMonitor.execute("DECRBY", jedis -> jedis.decrBy(key, decrement)); } // ==================== Pipeline操作 ==================== /** * 批量Pipeline操作 */ public static List pipeline(List> callbacks) { return performanceMonitor.execute("PIPELINE", jedis -> { Pipeline pipeline = jedis.pipelined(); List> responses = new ArrayList<>(); for (RedisCallback callback : callbacks) { responses.add((Response) callback.doInRedis(pipeline)); } pipeline.sync(); return responses.stream() .map(Response::get) .collect(Collectors.toList()); }); } // ==================== 事务操作 ==================== /** * 执行事务 */ public static List transaction(List> callbacks) { return performanceMonitor.execute("TRANSACTION", jedis -> { Transaction transaction = jedis.multi(); for (RedisCallback callback : callbacks) { callback.doInRedis(transaction); } return transaction.exec(); }); } // ==================== 监控与统计 ==================== /** * 获取性能指标 */ public static Map getPerformanceMetrics() { return performanceMonitor.getMetrics(); } /** * 获取Redis服务器信息 */ public static Map getRedisInfo() { return performanceMonitor.getRedisInfo(); } /** * 获取连接池状态 */ public static Map getPoolStatus() { return connectionPool.getPoolStats(); } /** * 获取慢查询列表 */ public static List getSlowQueries(int count) { return performanceMonitor.execute("SLOWLOG", jedis -> { List slowlogs = jedis.slowlogGet(count); return slowlogs.stream() .map(slowlog -> String.format("命令: %s, 耗时: %dus, 时间: %s", slowlog.getCommand(), slowlog.getExecutionTime(), slowlog.getTime())) .collect(Collectors.toList()); }); } } ``` --- ## 7. 分布式锁实战 ```java /** * Redis分布式锁实现 * 基于RedLock算法,支持锁重入、锁续期、锁自动过期 */ public class RedisDistributedLock { private static final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class); private final JedisPool jedisPool; private final String lockKey; private final String lockValue; private final long lockTimeout; // 锁超时时间(毫秒) private final long waitTimeout; // 获取锁等待时间(毫秒) private boolean locked = false; // 锁续期线程 private ScheduledExecutorService watchdogExecutor; private ScheduledFuture watchdogFuture; /** * 创建分布式锁 */ public RedisDistributedLock(JedisPool jedisPool, String lockKey, long lockTimeout, long waitTimeout) { this.jedisPool = jedisPool; this.lockKey = lockKey; this.lockValue = UUID.randomUUID().toString() + ":" + Thread.currentThread().getId(); this.lockTimeout = lockTimeout; this.waitTimeout = waitTimeout; } /** * 尝试获取锁(使用SET NX EX命令) */ public boolean tryLock() { long startTime = System.currentTimeMillis(); long remaining = waitTimeout; while (remaining >= 0) { try (Jedis jedis = jedisPool.getResource()) { // SET key value NX EX timeout String result = jedis.set(lockKey, lockValue, "NX", "PX", lockTimeout); if ("OK".equals(result)) { locked = true; startWatchdog(); // 启动锁续期看门狗 logger.info("获取分布式锁成功: key={}, value={}", lockKey, lockValue); return true; } // 检查是否是自己的锁(支持锁重入) String currentValue = jedis.get(lockKey); if (lockValue.equals(currentValue)) { jedis.pexpire(lockKey, lockTimeout); locked = true; startWatchdog(); logger.info("重入分布式锁成功: key={}, value={}", lockKey, lockValue); return true; } // 等待后重试 Thread.sleep(100); remaining = waitTimeout - (System.currentTimeMillis() - startTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } catch (Exception e) { logger.error("获取分布式锁异常", e); return false; } } logger.warn("获取分布式锁超时: key={}, waitTimeout={}ms", lockKey, waitTimeout); return false; } /** * 释放锁(使用Lua脚本保证原子性) */ public boolean unlock() { if (!locked) { return false; } try (Jedis jedis = jedisPool.getResource()) { // Lua脚本:只有当lockValue匹配时才删除 String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; Object result = jedis.eval(luaScript, Collections.singletonList(lockKey), Collections.singletonList(lockValue)); if (Long.valueOf(1).equals(result)) { locked = false; stopWatchdog(); // 停止锁续期看门狗 logger.info("释放分布式锁成功: key={}, value={}", lockKey, lockValue); return true; } else { logger.warn("释放分布式锁失败,锁已过期或被其他线程获取: key={}", lockKey); return false; } } catch (Exception e) { logger.error("释放分布式锁异常", e); return false; } } /** * 启动锁续期看门狗 */ private void startWatchdog() { if (watchdogExecutor == null) { watchdogExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread thread = new Thread(r, "RedisLock-Watchdog"); thread.setDaemon(true); return thread; }); } // 每 lockTimeout/3 时间续期一次 long renewInterval = lockTimeout / 3; watchdogFuture = watchdogExecutor.scheduleAtFixedRate(() -> { try (Jedis jedis = jedisPool.getResource()) { String currentValue = jedis.get(lockKey); if (lockValue.equals(currentValue)) { jedis.pexpire(lockKey, lockTimeout); logger.debug("锁续期成功: key={}", lockKey); } else { // 锁已不属于自己,停止续期 stopWatchdog(); } } catch (Exception e) { logger.error("锁续期异常", e); } }, renewInterval, renewInterval, TimeUnit.MILLISECONDS); } /** * 停止锁续期看门狗 */ private void stopWatchdog() { if (watchdogFuture != null) { watchdogFuture.cancel(false); watchdogFuture = null; } if (watchdogExecutor != null) { watchdogExecutor.shutdown(); watchdogExecutor = null; } } /** * 检查是否持有锁 */ public boolean isLocked() { return locked; } /** * 强制释放锁(谨慎使用) */ public boolean forceUnlock() { try (Jedis jedis = jedisPool.getResource()) { jedis.del(lockKey); locked = false; stopWatchdog(); logger.warn("强制释放分布式锁: key={}", lockKey); return true; } catch (Exception e) { logger.error("强制释放分布式锁异常", e); return false; } } /** * 分布式锁工具类 */ public static class LockHelper { /** * 执行带锁的操作 */ public static T executeWithLock(JedisPool jedisPool, String lockKey, long lockTimeout, long waitTimeout, LockCallback callback) throws Exception { RedisDistributedLock lock = new RedisDistributedLock( jedisPool, lockKey, lockTimeout, waitTimeout); try { if (!lock.tryLock()) { throw new LockAcquisitionException("获取锁失败: " + lockKey); } return callback.doWithLock(); } finally { lock.unlock(); } } /** * 锁操作回调接口 */ @FunctionalInterface public interface LockCallback { T doWithLock() throws Exception; } } /** * 获取锁异常 */ public static class LockAcquisitionException extends RuntimeException { public LockAcquisitionException(String message) { super(message); } } /** * 使用示例 */ public static class LockUsageExample { public static void main(String[] args) { JedisPool jedisPool = RedisConnectionPool.getInstance() .getClass() .getDeclaredField("jedisPool"); // 方式1:手动获取和释放锁 RedisDistributedLock lock = new RedisDistributedLock( jedisPool, "my_lock", 30000, 5000); if (lock.tryLock()) { try { // 执行业务逻辑 System.out.println("执行业务逻辑..."); } finally { lock.unlock(); } } // 方式2:使用工具类自动管理锁 try { String result = LockHelper.executeWithLock( jedisPool, "my_lock", 30000, 5000, () -> { // 执行业务逻辑 return "业务结果"; }); System.out.println("结果: " + result); } catch (Exception e) { logger.error("执行带锁操作失败", e); } } } } ``` --- ## 8. 缓存穿透/击穿/雪崩解决方案 ### 8.1 缓存穿透解决方案 ```java /** * 缓存穿透防护 */ public class CachePenetrationProtection { private static final Logger logger = LoggerFactory.getLogger(CachePenetrationProtection.class); private final JedisPool jedisPool; private final RedisSerializer serializer; // 空值缓存过期时间(秒) private static final int NULL_CACHE_EXPIRE = 300; // 布隆过滤器 private final BloomFilter bloomFilter; public CachePenetrationProtection(JedisPool jedisPool) { this.jedisPool = jedisPool; this.serializer = SerializerFactory.create(SerializerFactory.SerializerType.KRYO); // 初始化布隆过滤器(预期100万数据,误判率0.01%) this.bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), 1_000_000, 0.0001); // 预加载热点数据到布隆过滤器 preloadBloomFilter(); } /** * 预加载布隆过滤器 */ private void preloadBloomFilter() { try (Jedis jedis = jedisPool.getResource()) { // 获取所有键 Set keys = jedis.keys("*"); for (String key : keys) { bloomFilter.put(key); } logger.info("布隆过滤器预加载完成,数据量: {}", keys.size()); } catch (Exception e) { logger.error("预加载布隆过滤器失败", e); } } /** * 获取缓存数据(防护穿透) */ public T getWithProtection(String key, Class clazz, CacheLoader loader) throws Exception { try (Jedis jedis = jedisPool.getResource()) { // 1. 布隆过滤器快速判断 if (!bloomFilter.mightContain(key)) { logger.debug("布隆过滤器判断键不存在: {}", key); return null; } // 2. 尝试从缓存获取 byte[] data = jedis.get(key.getBytes()); // 3. 空值缓存处理 if (data != null && data.length == 1 && data[0] == 0) { // 空值标记,直接返回null return null; } // 4. 正常缓存数据 if (data != null) { @SuppressWarnings("unchecked") T result = (T) serializer.deserialize(data); return result; } // 5. 缓存未命中,从数据源加载 T result = loader.load(); // 6. 存入缓存(包括空值) if (result == null) { // 存储空值标记 jedis.setex(key.getBytes(), NULL_CACHE_EXPIRE, new byte[]{0}); bloomFilter.put(key); // 更新布隆过滤器 } else { // 存储正常数据 byte[] serializedData = serializer.serialize(result); jedis.setex(key.getBytes(), 300, serializedData); bloomFilter.put(key); // 更新布隆过滤器 } return result; } } /** * 缓存加载器接口 */ @FunctionalInterface public interface CacheLoader { T load() throws Exception; } } ``` ### 8.2 缓存击穿解决方案 ```java /** * 缓存击穿防护 */ public class CacheBreakdownProtection { private static final Logger logger = LoggerFactory.getLogger(CacheBreakdownProtection.class); private final JedisPool jedisPool; private final RedisSerializer serializer; // 互斥锁缓存 private final Map lockMap = new ConcurrentHashMap<>(); // 本地缓存(二级缓存) private static final Cache LOCAL_CACHE = Caffeine.newBuilder() .maximumSize(1000) .expireAfterWrite(5, TimeUnit.MINUTES) .build(); public CacheBreakdownProtection(JedisPool jedisPool) { this.jedisPool = jedisPool; this.serializer = SerializerFactory.create(SerializerFactory.SerializerType.KRYO); } /** * 获取缓存数据(防护击穿 - 使用互斥锁) */ public T getWithMutex(String key, Class clazz, CacheLoader loader) throws Exception { // 1. 尝试从本地缓存获取 Object localValue = LOCAL_CACHE.getIfPresent(key); if (localValue != null) { @SuppressWarnings("unchecked") T result = (T) localValue; return result; } try (Jedis jedis = jedisPool.getResource()) { // 2. 尝试从Redis缓存获取 byte[] data = jedis.get(key.getBytes()); if (data != null) { @SuppressWarnings("unchecked") T result = (T) serializer.deserialize(data); LOCAL_CACHE.put(key, result); return result; } // 3. 缓存未命中,使用分布式锁防止击穿 RedisDistributedLock lock = lockMap.computeIfAbsent(key, k -> new RedisDistributedLock(jedisPool, "lock:" + key, 10000, 3000)); if (lock.tryLock()) { try { // 双重检查:其他线程可能已经加载了缓存 data = jedis.get(key.getBytes()); if (data != null) { @SuppressWarnings("unchecked") T result = (T) serializer.deserialize(data); LOCAL_CACHE.put(key, result); return result; } // 从数据源加载数据 T result = loader.load(); // 存入缓存 if (result != null) { byte[] serializedData = serializer.serialize(result); jedis.setex(key.getBytes(), 300, serializedData); LOCAL_CACHE.put(key, result); } return result; } finally { lock.unlock(); lockMap.remove(key); } } else { // 获取锁失败,等待后重试 Thread.sleep(100); return getWithMutex(key, clazz, loader); } } } /** * 获取缓存数据(防护击穿 - 使用逻辑过期) */ public T getWithLogicalExpire(String key, Class clazz, CacheLoader loader) throws Exception { // 1. 尝试从本地缓存获取 Object localValue = LOCAL_CACHE.getIfPresent(key); if (localValue != null) { @SuppressWarnings("unchecked") T result = (T) localValue; return result; } try (Jedis jedis = jedisPool.getResource()) { // 2. 从Redis获取缓存数据(包含逻辑过期时间) String cacheData = jedis.get(key); if (cacheData != null) { // 解析数据 Map cacheMap = parseCacheData(cacheData); long expireTime = (Long) cacheMap.get("expireTime"); // 3. 检查是否逻辑过期 if (System.currentTimeMillis() < expireTime) { // 未过期,返回缓存数据 @SuppressWarnings("unchecked") T result = (T) cacheMap.get("data"); LOCAL_CACHE.put(key, result); // 异步重建缓存 rebuildCacheAsync(key, loader); return result; } } // 4. 缓存未命中或已过期,直接从数据源加载 T result = loader.load(); // 5. 存入缓存(设置逻辑过期时间) if (result != null) { long logicalExpireTime = System.currentTimeMillis() + 300000; // 5分钟后逻辑过期 Map cacheMap = new HashMap<>(); cacheMap.put("data", result); cacheMap.put("expireTime", logicalExpireTime); String cacheDataStr = buildCacheData(cacheMap); jedis.setex(key, 3600, cacheDataStr); // 实际过期时间1小时 LOCAL_CACHE.put(key, result); } return result; } } /** * 异步重建缓存 */ private void rebuildCacheAsync(String key, CacheLoader loader) { CompletableFuture.runAsync(() -> { try { T result = loader.load(); if (result != null) { try (Jedis jedis = jedisPool.getResource()) { long logicalExpireTime = System.currentTimeMillis() + 300000; Map cacheMap = new HashMap<>(); cacheMap.put("data", result); cacheMap.put("expireTime", logicalExpireTime); String cacheDataStr = buildCacheData(cacheMap); jedis.setex(key, 3600, cacheDataStr); logger.info("异步重建缓存成功: {}", key); } } } catch (Exception e) { logger.error("异步重建缓存失败: {}", key, e); } }); } /** * 解析缓存数据 */ private Map parseCacheData(String data) { // 实际实现可以使用JSON解析 return new HashMap<>(); } /** * 构建缓存数据 */ private String buildCacheData(Map data) { // 实际实现可以使用JSON序列化 return data.toString(); } } ``` ### 8.3 缓存雪崩解决方案 ```java /** * 缓存雪崩防护 */ public class CacheAvalancheProtection { private static final Logger logger = LoggerFactory.getLogger(CacheAvalancheProtection.class); private final JedisPool jedisPool; private final RedisSerializer serializer; // 随机过期时间范围(秒) private static final int RANDOM_EXPIRE_MIN = 280; private static final int RANDOM_EXPIRE_MAX = 320; // 多级缓存 private static final Cache L1_CACHE = Caffeine.newBuilder() .maximumSize(10000) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); private static final Cache L2_CACHE = Caffeine.newBuilder() .maximumSize(50000) .expireAfterWrite(30, TimeUnit.MINUTES) .build(); // 熔断器 private final Map circuitBreakerMap = new ConcurrentHashMap<>(); public CacheAvalancheProtection(JedisPool jedisPool) { this.jedisPool = jedisPool; this.serializer = SerializerFactory.create(SerializerFactory.SerializerType.KRYO); } /** * 获取缓存数据(防护雪崩 - 多级缓存+随机过期) */ public T getWithProtection(String key, Class clazz, CacheLoader loader) throws Exception { // 1. 尝试从L1缓存获取 Object l1Value = L1_CACHE.getIfPresent(key); if (l1Value != null) { @SuppressWarnings("unchecked") T result = (T) l1Value; return result; } // 2. 尝试从L2缓存获取 Object l2Value = L2_CACHE.getIfPresent(key); if (l2Value != null) { @SuppressWarnings("unchecked") T result = (T) l2Value; L1_CACHE.put(key, result); // 回填L1缓存 return result; } // 3. 检查熔断器状态 CircuitBreaker breaker = circuitBreakerMap.computeIfAbsent(key, k -> new CircuitBreaker(5, 10000)); if (!breaker.allowRequest()) { // 熔断开启,直接返回降级数据 logger.warn("熔断器开启,返回降级数据: {}", key); return getFallbackData(key, clazz); } try (Jedis jedis = jedisPool.getResource()) { // 4. 尝试从Redis获取 byte[] data = jedis.get(key.getBytes()); if (data != null) { @SuppressWarnings("unchecked") T result = (T) serializer.deserialize(data); L2_CACHE.put(key, result); L1_CACHE.put(key, result); breaker.recordSuccess(); return result; } // 5. Redis缓存未命中,从数据源加载 T result = loader.load(); if (result != null) { // 6. 存入缓存(使用随机过期时间) int randomExpire = ThreadLocalRandom.current() .nextInt(RANDOM_EXPIRE_MIN, RANDOM_EXPIRE_MAX); byte[] serializedData = serializer.serialize(result); jedis.setex(key.getBytes(), randomExpire, serializedData); L2_CACHE.put(key, result); L1_CACHE.put(key, result); breaker.recordSuccess(); } else { breaker.recordFailure(); } return result; } catch (Exception e) { breaker.recordFailure(); logger.error("获取缓存数据异常: {}", key, e); // 降级处理 return getFallbackData(key, clazz); } } /** * 获取降级数据 */ @SuppressWarnings("unchecked") private T getFallbackData(String key, Class clazz) { // 尝试从L2缓存获取旧数据 Object l2Value = L2_CACHE.getIfPresent(key); if (l2Value != null) { logger.info("返回L2缓存的旧数据: {}", key); return (T) l2Value; } // 返回null或默认值 return null; } /** * 熔断器实现 */ public static class CircuitBreaker { private final int failureThreshold; // 失败阈值 private final long timeout; // 超时时间(毫秒) private int failureCount = 0; // 失败计数 private long lastFailureTime = 0; // 最后失败时间 private boolean open = false; // 熔断器状态 public CircuitBreaker(int failureThreshold, long timeout) { this.failureThreshold = failureThreshold; this.timeout = timeout; } /** * 是否允许请求 */ public synchronized boolean allowRequest() { if (!open) { return true; } // 检查是否可以尝试恢复 if (System.currentTimeMillis() - lastFailureTime > timeout) { open = false; failureCount = 0; return true; } return false; } /** * 记录成功 */ public synchronized void recordSuccess() { failureCount = 0; open = false; } /** * 记录失败 */ public synchronized void recordFailure() { failureCount++; lastFailureTime = System.currentTimeMillis(); if (failureCount >= failureThreshold) { open = true; } } public boolean isOpen() { return open; } } /** * 预热缓存 */ public void warmUpCache(String key, T value) { try (Jedis jedis = jedisPool.getResource()) { int randomExpire = ThreadLocalRandom.current() .nextInt(RANDOM_EXPIRE_MIN, RANDOM_EXPIRE_MAX); byte[] serializedData = serializer.serialize(value); jedis.setex(key.getBytes(), randomExpire, serializedData); L2_CACHE.put(key, value); L1_CACHE.put(key, value); logger.info("缓存预热成功: {}", key); } catch (Exception e) { logger.error("缓存预热失败: {}", key, e); } } } ``` --- ## 9. 完整使用示例 ```java /** * Redis完整使用示例 */ public class RedisCompleteExample { private static final Logger logger = LoggerFactory.getLogger(RedisCompleteExample.class); public static void main(String[] args) { // 1. 初始化Redis连接 RedisUtils.init("localhost", 6379, "password"); // 2. 基础操作示例 basicOperations(); // 3. 对象操作示例 objectOperations(); // 4. 缓存注解示例 cacheAnnotationExample(); // 5. 分布式锁示例 distributedLockExample(); // 6. 缓存防护示例 cacheProtectionExample(); // 7. 关闭连接 RedisUtils.close(); } /** * 基础操作示例 */ private static void basicOperations() { System.out.println("=== 基础操作示例 ==="); // String操作 RedisUtils.set("user:1001:name", "张三"); String name = RedisUtils.get("user:1001:name"); System.out.println("用户名: " + name); // Hash操作 RedisUtils.hset("user:1001", "name", "张三"); RedisUtils.hset("user:1001", "age", "25"); RedisUtils.hset("user:1001", "email", "zhangsan@example.com"); Map user = RedisUtils.hgetAll("user:1001"); System.out.println("用户信息: " + user); // List操作 RedisUtils.lpush("logs", "log3", "log2", "log1"); List logs = RedisUtils.lrange("logs", 0, -1); System.out.println("日志列表: " + logs); // Set操作 RedisUtils.sadd("tags", "java", "redis", "mysql"); Set tags = RedisUtils.smembers("tags"); System.out.println("标签集合: " + tags); // ZSet操作 RedisUtils.zadd("leaderboard", 100.0, "user1"); RedisUtils.zadd("leaderboard", 90.0, "user2"); RedisUtils.zadd("leaderboard", 95.0, "user3"); Set topUsers = RedisUtils.zrange("leaderboard", 0, 2); System.out.println("排行榜前三: " + topUsers); } /** * 对象操作示例 */ private static void objectOperations() { System.out.println("\n=== 对象操作示例 ==="); User user = new User(); user.setId(1001L); user.setUsername("张三"); user.setEmail("zhangsan@example.com"); user.setAge(25); // 存储对象 RedisUtils.setObject("user:1001", user, 300); System.out.println("对象已存储"); // 获取对象 User cachedUser = RedisUtils.getObject("user:1001", User.class); System.out.println("从缓存获取对象: " + cachedUser); // Hash存储对象 RedisUtils.hsetObject("user:hash", "1001", user); User hashUser = RedisUtils.hgetObject("user:hash", "1001", User.class); System.out.println("从Hash获取对象: " + hashUser); } /** * 缓存注解示例 */ @CacheConfig(prefix = "user", expire = 600) private static void cacheAnnotationExample() { System.out.println("\n=== 缓存注解示例 ==="); UserService userService = new UserServiceImpl(); // 第一次调用,会从数据库查询 User user1 = userService.getUserById(1001L); System.out.println("第一次调用: " + user1); // 第二次调用,会从缓存获取 User user2 = userService.getUserById(1001L); System.out.println("第二次调用: " + user2); // 更新用户 User updateUser = new User(); updateUser.setId(1001L); updateUser.setUsername("李四"); userService.updateUser(updateUser); // 再次查询,会获取更新后的数据 User user3 = userService.getUserById(1001L); System.out.println("更新后查询: " + user3); } /** * 分布式锁示例 */ private static void distributedLockExample() { System.out.println("\n=== 分布式锁示例 ==="); try { JedisPool jedisPool = RedisConnectionPool.getInstance() .getClass() .getDeclaredField("jedisPool"); // 使用分布式锁执行任务 String result = RedisDistributedLock.LockHelper.executeWithLock( jedisPool, "task:lock", 30000, 5000, () -> { System.out.println("获取锁成功,执行任务..."); Thread.sleep(2000); return "任务完成"; }); System.out.println(result); } catch (Exception e) { logger.error("分布式锁示例执行失败", e); } } /** * 缓存防护示例 */ private static void cacheProtectionExample() { System.out.println("\n=== 缓存防护示例 ==="); try { JedisPool jedisPool = RedisConnectionPool.getInstance() .getClass() .getDeclaredField("jedisPool"); // 缓存穿透防护 CachePenetrationProtection penetrationProtection = new CachePenetrationProtection(jedisPool); User user1 = penetrationProtection.getWithProtection( "user:9999", User.class, () -> null); // 模拟数据库查询返回null System.out.println("穿透防护测试: " + user1); // 缓存击穿防护 CacheBreakdownProtection breakdownProtection = new CacheBreakdownProtection(jedisPool); User user2 = breakdownProtection.getWithMutex( "user:hot:1001", User.class, () -> { System.out.println("从数据库加载热点数据..."); return new User(1001L, "热点用户"); }); System.out.println("击穿防护测试: " + user2); // 缓存雪崩防护 CacheAvalancheProtection avalancheProtection = new CacheAvalancheProtection(jedisPool); User user3 = avalancheProtection.getWithProtection( "user:1001", User.class, () -> { System.out.println("从数据库加载数据..."); return new User(1001L, "普通用户"); }); System.out.println("雪崩防护测试: " + user3); } catch (Exception e) { logger.error("缓存防护示例执行失败", e); } } /** * 用户实体类 */ @Data public static class User { private Long id; private String username; private String email; private Integer age; public User() {} public User(Long id, String username) { this.id = id; this.username = username; } } /** * 用户服务接口 */ public interface UserService { User getUserById(Long id); void updateUser(User user); } /** * 用户服务实现 */ @Component public static class UserServiceImpl implements UserService { @Override @Cacheable(key = "#id") public User getUserById(Long id) { System.out.println("从数据库查询用户: " + id); // 模拟数据库查询 User user = new User(); user.setId(id); user.setUsername("用户" + id); return user; } @Override @CachePut(key = "#user.id") public void updateUser(User user) { System.out.println("更新用户: " + user); // 模拟数据库更新 } } } ``` --- ## 10. 总结 本文详细介绍了Redis的深度实战应用,涵盖了以下核心内容: 1. **设计模式应用**:单例模式(连接池)、工厂模式(序列化器)、策略模式(缓存过期策略) 2. **反射机制**:动态键生成、操作代理、参数解析 3. **注解应用**:@Cacheable、@CachePut、@CacheEvict实现缓存切面 4. **JVM调优**:连接池优化、网络I/O优化、序列化优化、内存管理 5. **生产级实现**:分布式锁、缓存穿透/击穿/雪崩防护、性能监控 通过这些技术点的整合应用,可以构建一个高性能、高可用的Redis客户端解决方案,满足生产环境的各种复杂场景需求。
评论 0

发表评论 取消回复

Shift+Enter 换行  ·  Enter 发送
还没有评论,来发表第一条吧