本文目录导读:

在Python中删除Redis缓存有多种方法,以下是常用且详细的案例:
删除单个键
使用 delete() 方法
import redis
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 设置一些缓存数据
r.set('user:1:name', '张三')
r.set('product:1001:price', 99.9)
# 删除单个键
result = r.delete('user:1:name')
print(f"删除结果: {result}") # 返回1表示成功删除,0表示键不存在
批量删除多个键
同时删除多个键
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 设置多个缓存
r.set('session:token:abc', 'user_data')
r.set('session:token:def', 'user_data')
r.set('session:token:ghi', 'user_data')
# 批量删除多个键
deleted_count = r.delete('session:token:abc', 'session:token:def', 'session:token:ghi')
print(f"成功删除 {deleted_count} 个键")
使用列表或元组
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 定义需要删除的键列表
keys_to_delete = ['cache:key1', 'cache:key2', 'cache:key3']
# 批量删除
deleted_count = r.delete(*keys_to_delete)
print(f"成功删除 {deleted_count} 个键")
使用通配符模式删除(需要扫描)
Redis 没有直接支持通配符删除的命令,需要先使用 keys 或 scan 获取匹配的键,再删除。
使用 KEYS 命令(不推荐生产环境)
import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 查找所有以 'user:' 开头的键
keys = r.keys('user:*')
print(f"找到 {len(keys)} 个匹配的键")
if keys:
deleted_count = r.delete(*keys)
print(f"成功删除 {deleted_count} 个键")
使用 SCAN 命令(推荐生产环境)
import redis
def delete_keys_by_pattern(pattern, redis_client, batch_size=100):
"""
使用SCAN命令批量删除匹配模式的键
"""
cursor = 0
total_deleted = 0
while True:
# 使用SCAN逐步扫描
cursor, keys = redis_client.scan(cursor=cursor, match=pattern, count=batch_size)
if keys:
# 删除当前批次找到的键
deleted = redis_client.delete(*keys)
total_deleted += deleted
print(f"删除批次 {len(keys)} 个键")
# 如果cursor返回0,表示扫描完成
if cursor == 0:
break
return total_deleted
# 使用示例
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 删除所有以 'temp:' 开头的键
deleted_count = delete_keys_by_pattern('temp:*', r)
print(f"总共删除 {deleted_count} 个临时缓存键")
删除整个数据库或特定db
清空当前数据库
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 清空当前数据库(谨慎使用)
r.flushdb()
print("当前数据库已清空")
清空所有数据库
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 清空所有数据库(非常谨慎使用)
r.flushall()
print("所有数据库已清空")
带过期时间的自动删除
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 设置缓存并指定过期时间(自动删除)
r.setex('temp:data', 60, '临时数据') # 60秒后自动删除
print("缓存已设置,将在60秒后自动删除")
# 或者使用单独的过期设置
r.set('session:user:123', 'session_data')
r.expire('session:user:123', 300) # 300秒后自动过期
print("会话缓存将在5分钟后自动删除")
# 查看剩余生存时间
ttl = r.ttl('session:user:123')
print(f"剩余生存时间: {ttl}秒")
完整的缓存管理类示例
import redis
from typing import List, Optional
class RedisCacheManager:
def __init__(self, host='localhost', port=6379, db=0, password=None):
"""初始化Redis连接"""
self.client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
def delete_key(self, key: str) -> bool:
"""删除单个键"""
try:
result = self.client.delete(key)
return result > 0
except redis.RedisError as e:
print(f"删除键 {key} 失败: {e}")
return False
def delete_keys(self, keys: List[str]) -> int:
"""批量删除多个键"""
try:
if not keys:
return 0
deleted_count = self.client.delete(*keys)
return deleted_count
except redis.RedisError as e:
print(f"批量删除失败: {e}")
return 0
def delete_by_pattern(self, pattern: str, batch_size: int = 100) -> int:
"""
根据模式删除键
模式示例: 'user:*', 'cache:*', 'session:*'
"""
cursor = 0
total_deleted = 0
try:
while True:
cursor, keys = self.client.scan(
cursor=cursor,
match=pattern,
count=batch_size
)
if keys:
deleted = self.client.delete(*keys)
total_deleted += deleted
if cursor == 0:
break
return total_deleted
except redis.RedisError as e:
print(f"模式删除失败: {e}")
return 0
def clear_database(self, db: Optional[int] = None) -> bool:
"""清空数据库"""
try:
if db is not None:
# 清空指定数据库
self.client.execute_command('SELECT', db)
self.client.flushdb()
else:
# 清空当前数据库
self.client.flushdb()
return True
except redis.RedisError as e:
print(f"清空数据库失败: {e}")
return False
def set_with_expiry(self, key: str, value: str, expiry_seconds: int) -> bool:
"""设置带过期时间的缓存"""
try:
return self.client.setex(key, expiry_seconds, value)
except redis.RedisError as e:
print(f"设置缓存失败: {e}")
return False
# 使用示例
def main():
# 创建缓存管理器
cache = RedisCacheManager()
# 设置一些缓存数据
cache.set_with_expiry('user:1:profile', '用户数据', 3600)
cache.set_with_expiry('temp:log:app', '日志数据', 60)
# 删除单个键
if cache.delete_key('temp:log:app'):
print("成功删除临时日志缓存")
# 按模式删除(删除所有用户缓存)
deleted = cache.delete_by_pattern('user:*')
print(f"删除了 {deleted} 个用户缓存")
# 批量删除
keys = ['test:1', 'test:2', 'test:3']
deleted = cache.delete_keys(keys)
print(f"批量删除 {deleted} 个键")
if __name__ == "__main__":
main()
注意事项
- 生产环境避免使用
KEYS命令:在大量数据时会导致Redis阻塞,建议始终使用SCAN命令 - 使用连接池:高并发场景建议使用连接池
- 错误处理:始终添加try-except处理连接异常
- 批量操作注意:一次删除太多键会影响性能,建议分批处理
- 数据安全:
flushdb和flushall操作不可逆,谨慎使用