当前位置:首页 > 学习笔记 > 正文内容

Redis 缓存实战教程 - 提升应用性能 10 倍

廖万里14小时前学习笔记1

# Redis 缓存实战教程 - 提升应用性能 10 倍

Redis 是最流行的内存数据库,合理使用缓存可以将应用性能提升 10 倍以上。本文详解 Redis 的安装、使用和最佳实践。

---

一、Redis 简介

Redis(Remote Dictionary Server)是一个开源的内存数据存储,支持多种数据结构,广泛用于缓存、会话存储、消息队列等场景。

核心特性

  • 高性能:基于内存,读写速度极快(10万+ QPS)
  • 数据结构丰富:String、Hash、List、Set、Sorted Set
  • 持久化:RDB 和 AOF 两种持久化方式
  • 集群支持:主从复制、哨兵模式、集群模式
---

二、安装与配置

macOS 安装

brew install redis
brew services start redis

Ubuntu 安装

sudo apt update
sudo apt install redis-server
sudo systemctl start redis

Docker 安装

docker run -d -p 6379:6379 --name redis redis:latest

验证安装

redis-cli ping
# 输出: PONG

---

三、基础数据类型

1. String 字符串

# 设置值
SET user:1:name "张三"

# 获取值 GET user:1:name

# 设置过期时间(秒) SETEX session:abc 3600 "user_data"

# 自增 INCR page_views INCRBY page_views 10

2. Hash 哈希

# 设置字段
HSET user:1 name "张三" age 25 email "test@example.com"

# 获取单个字段 HGET user:1 name

# 获取所有字段 HGETALL user:1

# 删除字段 HDEL user:1 email

3. List 列表

# 左侧插入
LPUSH notifications "消息1" "消息2"

# 右侧弹出 RPOP notifications

# 获取列表长度 LLEN notifications

# 获取范围 LRANGE notifications 0 -1

4. Set 集合

# 添加成员
SADD tags "python" "redis" "fastapi"

# 检查成员 SISMEMBER tags "python"

# 获取所有成员 SMEMBERS tags

# 集合运算 SINTER set1 set2 # 交集 SUNION set1 set2 # 并集 SDIFF set1 set2 # 差集

5. Sorted Set 有序集合

# 添加成员(带分数)
ZADD leaderboard 100 "player1" 200 "player2" 150 "player3"

# 获取排名(从高到低) ZREVRANGE leaderboard 0 9 WITHSCORES

# 获取成员排名 ZRANK leaderboard "player1"

---

四、Python 操作 Redis

安装客户端

pip install redis

基础操作

import redis

# 创建连接 r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# String 操作 r.set('name', '张三') print(r.get('name')) # 输出: 张三

# Hash 操作 r.hset('user:1', mapping={'name': '李四', 'age': 30}) print(r.hgetall('user:1'))

# List 操作 r.lpush('messages', 'hello', 'world') print(r.lrange('messages', 0, -1))

# 设置过期时间 r.setex('token:abc', 3600, 'user_session')

连接池

from redis import ConnectionPool

# 创建连接池 pool = ConnectionPool(host='localhost', port=6379, max_connections=10)

# 使用连接池 r = redis.Redis(connection_pool=pool)

---

五、缓存实战

缓存策略

1. 缓存穿透:查询不存在的数据,穿透到数据库

def get_user(user_id):
    # 先查缓存
    cache_key = f"user:{user_id}"
    cached = r.get(cache_key)
    
    if cached is not None:
        return json.loads(cached)
    
    # 查数据库
    user = db.query(User).get(user_id)
    
    if user is None:
        # 缓存空值,防止穿透
        r.setex(cache_key, 60, "null")
        return None
    
    # 写入缓存
    r.setex(cache_key, 3600, json.dumps(user.to_dict()))
    return user

2. 缓存击穿:热点数据过期瞬间大量请求

import threading

def get_hot_data(key): cached = r.get(key) if cached: return cached # 使用分布式锁 lock_key = f"lock:{key}" if r.set(lock_key, "1", nx=True, ex=10): try: # 查数据库 data = db.query_hot_data() r.setex(key, 3600, data) return data finally: r.delete(lock_key) else: # 等待并重试 time.sleep(0.1) return get_hot_data(key)

3. 缓存雪崩:大量缓存同时过期

import random

def set_cache_with_jitter(key, value, base_ttl=3600): # 添加随机过期时间,避免同时过期 jitter = random.randint(0, 300) r.setex(key, base_ttl + jitter, value)

---

六、实际应用场景

1. 会话存储

import uuid

def create_session(user_id): session_id = str(uuid.uuid4()) session_data = { "user_id": user_id, "created_at": time.time() } r.setex(f"session:{session_id}", 86400, json.dumps(session_data)) return session_id

def get_session(session_id): data = r.get(f"session:{session_id}") return json.loads(data) if data else None

def delete_session(session_id): r.delete(f"session:{session_id}")

2. 接口限流

def rate_limit(user_id, limit=100, window=60):
    key = f"rate_limit:{user_id}"
    current = r.get(key)
    
    if current is None:
        r.setex(key, window, 1)
        return True
    
    if int(current) >= limit:
        return False
    
    r.incr(key)
    return True

3. 排行榜

def update_score(user_id, score):
    r.zadd('leaderboard', {user_id: score})

def get_top_users(n=10): return r.zrevrange('leaderboard', 0, n-1, withscores=True)

def get_user_rank(user_id): rank = r.zrevrank('leaderboard', user_id) return rank + 1 if rank is not None else None

4. 消息队列

def push_message(queue_name, message):
    r.lpush(queue_name, json.dumps(message))

def pop_message(queue_name, timeout=10): result = r.brpop(queue_name, timeout) if result: return json.loads(result[1]) return None

---

七、高级特性

1. 发布订阅

# 发布者
def publish(channel, message):
    r.publish(channel, message)

# 订阅者 def subscribe(channel): pubsub = r.pubsub() pubsub.subscribe(channel) for message in pubsub.listen(): if message['type'] == 'message': print(message['data'])

2. 事务

pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.execute()

3. Lua 脚本

-- 限流脚本
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

local current = redis.call('GET', key) if current and tonumber(current) >= limit then return 0 end

redis.call('INCR', key) if tonumber(current) == 0 then redis.call('EXPIRE', key, window) end

return 1

---

八、性能优化

1. 批量操作

# 使用 pipeline 减少网络往返
pipe = r.pipeline()
for i in range(1000):
    pipe.set(f"key:{i}", f"value:{i}")
pipe.execute()

2. 选择合适的数据结构

| 场景 | 推荐数据结构 | |------|--------------| | 简单缓存 | String | | 对象存储 | Hash | | 消息队列 | List | | 去重集合 | Set | | 排行榜 | Sorted Set |

3. 内存优化

# 配置最大内存
maxmemory 2gb

# 配置淘汰策略 maxmemory-policy allkeys-lru

---

九、监控与运维

查看状态

redis-cli info
redis-cli info memory
redis-cli info stats

慢查询日志

redis-cli slowlog get 10

内存分析

redis-cli --scan --pattern "user:*" | head -100
redis-cli memory usage "user:1"

---

Redis 是提升应用性能的利器,合理使用缓存能带来 10 倍以上的性能提升。
>
下一篇预告:Git 版本控制完全指南 - 团队协作必备技能

---

作者:廖万里 发布时间:2026年3月16日

本文链接:https://www.kkkliao.cn/?id=655 转载需授权!

分享到:

版权声明:本文由廖万里的博客发布,如需转载请注明出处。


发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。