本文目录导读:

下面我来介绍Python操作Redis缓存的完整案例,包括基础操作、连接管理和实际应用场景。
安装与连接
安装redis库
pip install redis
连接Redis
import redis
# 方式一:直接连接
r = redis.Redis(host='localhost', port=6379, db=0, password='your_password')
# 方式二:连接池(推荐)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)
# 方式三:带超时和重试的配置
r = redis.Redis(
host='localhost',
port=6379,
db=0,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
decode_responses=True # 自动解码为字符串
)
基础操作案例
字符串操作
# 设置值
r.set('name', '张三')
r.set('counter', 1)
r.set('expire_key', '过期数据', ex=60) # 60秒后过期
# 批量设置
r.mset({'key1': 'value1', 'key2': 'value2'})
# 获取值
name = r.get('name') # b'张三'
print(f"Name: {name}")
# 计数器操作
r.incr('counter') # +1
r.incrby('counter', 5) # +5
r.decr('counter') # -1
print(f"Counter: {r.get('counter')}")
# 检查键是否存在
if r.exists('name'):
print("name键存在")
# 删除键
r.delete('temp_key')
# 设置过期时间
r.expire('counter', 120) # 120秒后过期
哈希操作
# 存储用户信息
user_data = {
'id': 1001,
'name': '张三',
'age': 25,
'email': 'zhangsan@example.com'
}
# 设置哈希表
r.hset('user:1001', mapping=user_data)
# 获取单个字段
print(r.hget('user:1001', 'name')) # 张三
# 获取所有字段
user = r.hgetall('user:1001')
print(user) # {'id': '1001', 'name': '张三', 'age': '25', 'email': 'zhangsan@example.com'}
# 获取所有键
keys = r.hkeys('user:1001')
print(keys) # ['id', 'name', 'age', 'email']
# 获取所有值
values = r.hvals('user:1001')
print(values) # ['1001', '张三', '25', 'zhangsan@example.com']
# 更新单个字段
r.hset('user:1001', 'age', 26)
列表操作
# 添加数据到列表
r.rpush('logs', 'log1', 'log2', 'log3') # 右侧添加
r.lpush('logs', 'log0') # 左侧添加
# 获取列表范围
all_logs = r.lrange('logs', 0, -1) # 获取所有
print(all_logs)
# 列表长度
print(r.llen('logs'))
# 弹出元素
first = r.lpop('logs') # 左侧弹出
last = r.rpop('logs') # 右侧弹出
# 只保留指定范围的元素
r.ltrim('logs', 0, 10) # 只保留前11个
集合操作
# 添加元素
r.sadd('tags', 'python', 'redis', 'database')
r.sadd('tags', 'python') # 重复添加不会生效
# 获取所有成员
members = r.smembers('tags')
print(members) # {'python', 'redis', 'database'}
# 从集合中移除
r.srem('tags', 'database')
# 检查元素是否存在
print(r.sismember('tags', 'redis')) # True
# 集合运算
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')
# 交集
inter = r.sinter('set1', 'set2')
print(f"交集: {inter}") # {'b', 'c'}
# 并集
union = r.sunion('set1', 'set2')
print(f"并集: {union}") # {'a', 'b', 'c', 'd'}
# 差集
diff = r.sdiff('set1', 'set2')
print(f"差集: {diff}") # {'a'}
有序集合操作
# 添加数据(分数从低到高)
r.zadd('leaderboard', {'player1': 100, 'player2': 200, 'player3': 150})
# 获取排名(从0开始)
rank = r.zrank('leaderboard', 'player1') # 最低分排名第0
print(f"Player1排名: {rank}")
# 获取逆序排名
rev_rank = r.zrevrank('leaderboard', 'player2')
print(f"Player2逆序排名: {rev_rank}")
# 获取前两名
top2 = r.zrevrange('leaderboard', 0, 1, withscores=True)
print(top2) # [('player2', 200.0), ('player3', 150.0)]
# 获取分数区间内的成员
between = r.zrangebyscore('leaderboard', 100, 150)
print(between) # ['player1', 'player3']
实际应用场景:缓存API响应
import json
import time
from datetime import datetime
class RedisCache:
def __init__(self, host='localhost', port=6379, db=0, default_expire=300):
self.pool = redis.ConnectionPool(
host=host,
port=port,
db=db,
decode_responses=True
)
self.redis = redis.Redis(connection_pool=self.pool)
self.default_expire = default_expire
def get_or_set(self, key, func, expire=None):
"""
获取缓存,如果不存在则调用函数获取并缓存
"""
# 尝试从缓存获取
cached_data = self.redis.get(key)
if cached_data is not None:
print(f"缓存命中: {key}")
return json.loads(cached_data)
# 缓存未命中,执行函数
print(f"缓存未命中,执行函数: {key}")
result = func()
# 存储到缓存
self.redis.setex(
key,
expire or self.default_expire,
json.dumps(result, ensure_ascii=False)
)
return result
def clear(self, pattern=None):
"""清除缓存"""
if pattern:
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
print(f"清除了 {len(keys)} 个缓存")
else:
self.redis.flushdb()
print("清除了所有缓存")
# 使用示例
def get_user_info(user_id):
"""模拟从数据库获取用户信息"""
time.sleep(2) # 模拟数据库查询延迟
return {
'id': user_id,
'name': f'用户{user_id}',
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
cache = RedisCache()
# 第一次调用(会执行函数)
user1 = cache.get_or_set('user:1001', lambda: get_user_info(1001))
print(f"第一次: {user1}")
# 第二次调用(从缓存获取,立即返回)
user2 = cache.get_or_set('user:1001', lambda: get_user_info(1001))
print(f"第二次: {user2}")
# 清除特定缓存
cache.clear('user:*')
高级特性:管道和事务
# 管道(批量操作,提高性能)
def batch_operation_example():
pipe = r.pipeline()
# 批量添加命令
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.incr('counter')
pipe.get('key1')
# 执行所有命令
results = pipe.execute()
print(results) # [True, True, 1, 'value1']
# 事务(保证原子性)
def transaction_example():
try:
# 开启事务
pipe = r.pipeline(transaction=True)
# 监控键
pipe.watch('counter')
# 开始事务
pipe.multi()
pipe.incr('counter', 10)
pipe.decr('counter', 5)
# 执行事务
results = pipe.execute()
print("事务执行成功")
except redis.WatchError:
print("监视的键被修改,事务回滚")
# 使用 Lua 脚本(保证原子性)
def lua_script_example():
# 定义 Lua 脚本
script = """
local current = redis.call('GET', KEYS[1])
if current then
redis.call('SET', KEYS[1], ARGV[1])
return current
end
return nil
"""
# 注册脚本
update_script = r.register_script(script)
# 执行脚本
old_value = update_script(keys=['key1'], args=['new_value'])
print(f"旧值: {old_value}")
实战:限流器实现
class RateLimiter:
"""滑动窗口限流器"""
def __init__(self, redis_client, max_requests=100, window_seconds=60):
self.redis = redis_client
self.max_requests = max_requests
self.window_seconds = window_seconds
def allow_request(self, key):
"""
检查是否允许请求
返回: (是否允许, 当前请求数)
"""
current_time = int(time.time())
window_start = current_time - self.window_seconds
pipe = self.redis.pipeline()
# 移除窗口外的请求
pipe.zremrangebyscore(key, 0, window_start)
# 获取当前窗口内的请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(current_time): current_time})
# 设置过期时间
pipe.expire(key, self.window_seconds + 1)
_, current_count, _, _ = pipe.execute()
if current_count > self.max_requests:
return False, current_count
return True, current_count
# 使用示例
limiter = RateLimiter(r, max_requests=10, window_seconds=1)
for i in range(15):
allowed, count = limiter.allow_request('api:user:1001')
print(f"请求 {i+1}: {'允许' if allowed else '拒绝'}, 当前请求数: {count}")
time.sleep(0.1)
最佳实践建议
配置优化
# 使用连接池
pool = redis.ConnectionPool(
max_connections=20,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
health_check_interval=30
)
# 设置键的命名规范
# user:{id}:{field}
# cache:{module}:{key}
# rate:{action}:{user_id}
异常处理
def safe_redis_operation():
try:
result = r.get('key')
return result
except redis.ConnectionError:
print("Redis连接失败")
# 降级处理:从数据库获取数据
return database_fallback()
except redis.TimeoutError:
print("Redis操作超时")
return fallback_value()
except Exception as e:
print(f"Redis操作异常: {e}")
return None
这些案例涵盖了Python操作Redis的主要场景,根据实际需求,你可以选择合适的操作方式和数据结构来优化应用性能,Redis在缓存、限流、排行榜、分布式锁等场景中都有广泛应用。