Redis 缓存实战教程 - 提升应用性能 10 倍
# Redis 缓存实战教程 - 提升应用性能 10 倍
Redis 是最流行的内存数据库,合理使用缓存可以将应用性能提升 10 倍以上。本文详解 Redis 的安装、使用和最佳实践。
---
一、Redis 简介
Redis(Remote Dictionary Server)是一个开源的内存数据存储,支持多种数据结构,广泛用于缓存、会话存储、消息队列等场景。
核心特性
- 高性能:基于内存,读写速度极快(10万+ QPS)
- 数据结构丰富:String、Hash、List、Set、Sorted Set
- 持久化:RDB 和 AOF 两种持久化方式
- 集群支持:主从复制、哨兵模式、集群模式
二、安装与配置
macOS 安装
brew install redis brew services start redis
Ubuntu 安装
sudo apt update sudo apt install redis-server sudo systemctl start redis
Docker 安装
docker run -d -p 6379:6379 --name redis redis:latest
验证安装
redis-cli ping # 输出: PONG
---
三、基础数据类型
1. String 字符串
# 设置值 SET user:1:name "张三"# 获取值 GET user:1:name
# 设置过期时间(秒) SETEX session:abc 3600 "user_data"
# 自增 INCR page_views INCRBY page_views 10
2. Hash 哈希
# 设置字段 HSET user:1 name "张三" age 25 email "test@example.com"# 获取单个字段 HGET user:1 name
# 获取所有字段 HGETALL user:1
# 删除字段 HDEL user:1 email
3. List 列表
# 左侧插入 LPUSH notifications "消息1" "消息2"# 右侧弹出 RPOP notifications
# 获取列表长度 LLEN notifications
# 获取范围 LRANGE notifications 0 -1
4. Set 集合
# 添加成员 SADD tags "python" "redis" "fastapi"# 检查成员 SISMEMBER tags "python"
# 获取所有成员 SMEMBERS tags
# 集合运算 SINTER set1 set2 # 交集 SUNION set1 set2 # 并集 SDIFF set1 set2 # 差集
5. Sorted Set 有序集合
# 添加成员(带分数) ZADD leaderboard 100 "player1" 200 "player2" 150 "player3"# 获取排名(从高到低) ZREVRANGE leaderboard 0 9 WITHSCORES
# 获取成员排名 ZRANK leaderboard "player1"
---
四、Python 操作 Redis
安装客户端
pip install redis
基础操作
import redis# 创建连接 r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# String 操作 r.set('name', '张三') print(r.get('name')) # 输出: 张三
# Hash 操作 r.hset('user:1', mapping={'name': '李四', 'age': 30}) print(r.hgetall('user:1'))
# List 操作 r.lpush('messages', 'hello', 'world') print(r.lrange('messages', 0, -1))
# 设置过期时间 r.setex('token:abc', 3600, 'user_session')
连接池
from redis import ConnectionPool# 创建连接池 pool = ConnectionPool(host='localhost', port=6379, max_connections=10)
# 使用连接池 r = redis.Redis(connection_pool=pool)
---
五、缓存实战
缓存策略
1. 缓存穿透:查询不存在的数据,穿透到数据库
def get_user(user_id):
# 先查缓存
cache_key = f"user:{user_id}"
cached = r.get(cache_key)
if cached is not None:
return json.loads(cached)
# 查数据库
user = db.query(User).get(user_id)
if user is None:
# 缓存空值,防止穿透
r.setex(cache_key, 60, "null")
return None
# 写入缓存
r.setex(cache_key, 3600, json.dumps(user.to_dict()))
return user
2. 缓存击穿:热点数据过期瞬间大量请求
import threadingdef get_hot_data(key): cached = r.get(key) if cached: return cached # 使用分布式锁 lock_key = f"lock:{key}" if r.set(lock_key, "1", nx=True, ex=10): try: # 查数据库 data = db.query_hot_data() r.setex(key, 3600, data) return data finally: r.delete(lock_key) else: # 等待并重试 time.sleep(0.1) return get_hot_data(key)
3. 缓存雪崩:大量缓存同时过期
import randomdef set_cache_with_jitter(key, value, base_ttl=3600): # 添加随机过期时间,避免同时过期 jitter = random.randint(0, 300) r.setex(key, base_ttl + jitter, value)
---
六、实际应用场景
1. 会话存储
import uuiddef create_session(user_id): session_id = str(uuid.uuid4()) session_data = { "user_id": user_id, "created_at": time.time() } r.setex(f"session:{session_id}", 86400, json.dumps(session_data)) return session_id
def get_session(session_id): data = r.get(f"session:{session_id}") return json.loads(data) if data else None
def delete_session(session_id): r.delete(f"session:{session_id}")
2. 接口限流
def rate_limit(user_id, limit=100, window=60):
key = f"rate_limit:{user_id}"
current = r.get(key)
if current is None:
r.setex(key, window, 1)
return True
if int(current) >= limit:
return False
r.incr(key)
return True
3. 排行榜
def update_score(user_id, score):
r.zadd('leaderboard', {user_id: score})def get_top_users(n=10):
return r.zrevrange('leaderboard', 0, n-1, withscores=True)
def get_user_rank(user_id):
rank = r.zrevrank('leaderboard', user_id)
return rank + 1 if rank is not None else None
4. 消息队列
def push_message(queue_name, message):
r.lpush(queue_name, json.dumps(message))def pop_message(queue_name, timeout=10):
result = r.brpop(queue_name, timeout)
if result:
return json.loads(result[1])
return None
---
七、高级特性
1. 发布订阅
# 发布者
def publish(channel, message):
r.publish(channel, message)# 订阅者
def subscribe(channel):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
2. 事务
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.execute()
3. Lua 脚本
-- 限流脚本 local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2])local current = redis.call('GET', key) if current and tonumber(current) >= limit then return 0 end
redis.call('INCR', key) if tonumber(current) == 0 then redis.call('EXPIRE', key, window) end
return 1
---
八、性能优化
1. 批量操作
# 使用 pipeline 减少网络往返
pipe = r.pipeline()
for i in range(1000):
pipe.set(f"key:{i}", f"value:{i}")
pipe.execute()
2. 选择合适的数据结构
| 场景 | 推荐数据结构 | |------|--------------| | 简单缓存 | String | | 对象存储 | Hash | | 消息队列 | List | | 去重集合 | Set | | 排行榜 | Sorted Set |
3. 内存优化
# 配置最大内存 maxmemory 2gb# 配置淘汰策略 maxmemory-policy allkeys-lru
---
九、监控与运维
查看状态
redis-cli info redis-cli info memory redis-cli info stats
慢查询日志
redis-cli slowlog get 10
内存分析
redis-cli --scan --pattern "user:*" | head -100 redis-cli memory usage "user:1"
---
Redis 是提升应用性能的利器,合理使用缓存能带来 10 倍以上的性能提升。>
下一篇预告:Git 版本控制完全指南 - 团队协作必备技能
---
作者:廖万里 发布时间:2026年3月16日
本文链接:https://www.kkkliao.cn/?id=651 转载需授权!
版权声明:本文由廖万里的博客发布,如需转载请注明出处。



手机流量卡
免费领卡
号卡合伙人
产品服务
关于本站
