本文目录导读:

- 使用
functools.lru_cache+ 手动过期控制 - 使用
cachetools库(推荐) - 使用自定义缓存类
- 使用第三方库
cacheout - 使用 Redis 实现分布式缓存过期
- 装饰器方式(最实用)
- 推荐
在Python中设置缓存过期时间有多种方式,我来介绍几种常用的方法:
使用 functools.lru_cache + 手动过期控制
import time
from functools import lru_cache
def timed_lru_cache(seconds: int, maxsize: int = 128):
"""自定义带过期时间的lru_cache"""
def wrapper_cache(func):
func = lru_cache(maxsize=maxsize)(func)
func.lifetime = seconds
func.expiration = time.time() + seconds
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
if time.time() >= func.expiration:
func.cache_clear()
func.expiration = time.time() + func.lifetime
return func(*args, **kwargs)
return wrapped_func
return wrapper_cache
# 使用示例
@timed_lru_cache(seconds=60) # 缓存60秒
def get_user(user_id):
print(f"从数据库查询用户: {user_id}")
return {"id": user_id, "name": "用户"}
# 测试
print(get_user(1)) # 查询数据库
print(get_user(1)) # 使用缓存
time.sleep(61)
print(get_user(1)) # 缓存过期,重新查询
使用 cachetools 库(推荐)
from cachetools import TTLCache
import time
# 创建TTL缓存:最多存100个,每个缓存60秒过期
cache = TTLCache(maxsize=100, ttl=60)
def get_user(user_id):
if user_id in cache:
print("使用缓存")
return cache[user_id]
print("从数据库查询")
user = {"id": user_id, "name": "用户"}
cache[user_id] = user
return user
# 测试
print(get_user(1)) # 查询数据库
print(get_user(1)) # 使用缓存
time.sleep(61)
print(get_user(1)) # 缓存过期,重新查询
使用自定义缓存类
import time
from typing import Any, Dict, Optional
class SimpleCache:
def __init__(self):
self._cache: Dict[str, Dict] = {}
def set(self, key: str, value: Any, expire_seconds: int = 300):
"""设置缓存,默认5分钟过期"""
self._cache[key] = {
'value': value,
'expire_time': time.time() + expire_seconds
}
def get(self, key: str) -> Optional[Any]:
"""获取缓存,如果过期返回None"""
if key not in self._cache:
return None
cache_data = self._cache[key]
if time.time() > cache_data['expire_time']:
# 缓存过期
del self._cache[key]
return None
return cache_data['value']
def delete(self, key: str):
"""删除缓存"""
self._cache.pop(key, None)
def clear(self):
"""清空所有缓存"""
self._cache.clear()
# 使用示例
cache = SimpleCache()
cache.set('user_1', {'name': '张三'}, expire_seconds=60) # 60秒过期
print(cache.get('user_1')) # 输出: {'name': '张三'}
time.sleep(61)
print(cache.get('user_1')) # 输出: None
使用第三方库 cacheout
from cacheout import Cache
import time
# 创建缓存实例,默认60秒过期
cache = Cache(maxsize=100, ttl=60)
# 设置带过期时间的缓存
cache.set('user_1', {'name': '张三'}, ttl=30) # 30秒过期
cache.set('user_2', {'name': '李四'}) # 使用默认60秒过期
# 获取缓存
print(cache.get('user_1')) # 输出: {'name': '张三'}
# 检查缓存是否过期
if cache.has('user_1'):
print("缓存有效")
else:
print("缓存已过期")
# 设置过期回调函数
def on_expire(key, value):
print(f"缓存 {key} 已过期,值: {value}")
cache.set('text', '测试', ttl=5, expire_callback=on_expire)
time.sleep(6)
print(cache.get('text')) # 输出: None,并触发回调
使用 Redis 实现分布式缓存过期
import redis
import json
class RedisCache:
def __init__(self, host='localhost', port=6379, db=0):
self.client = redis.Redis(host=host, port=port, db=db)
def set(self, key: str, value: Any, expire_seconds: int = 300):
"""设置缓存,默认5分钟过期"""
self.client.setex(
key,
expire_seconds,
json.dumps(value, ensure_ascii=False)
)
def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
value = self.client.get(key)
if value:
return json.loads(value)
return None
def delete(self, key: str):
"""删除缓存"""
self.client.delete(key)
# 使用示例
cache = RedisCache()
cache.set('user_1', {'name': '张三'}, expire_seconds=60)
print(cache.get('user_1')) # 输出: {'name': '张三'}
装饰器方式(最实用)
import time
import functools
from typing import Dict, Any, Optional
class TTLCacheDecorator:
def __init__(self, ttl: int = 300):
self.ttl = ttl
self.cache: Dict[str, Dict] = {}
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存key
key = str(args) + str(sorted(kwargs.items()))
# 检查缓存是否有效
if key in self.cache:
cached_value = self.cache[key]
if time.time() < cached_value['expire_time']:
print("使用缓存")
return cached_value['value']
else:
# 删除过期缓存
del self.cache[key]
# 执行函数并缓存结果
result = func(*args, **kwargs)
self.cache[key] = {
'value': result,
'expire_time': time.time() + self.ttl
}
return result
return wrapper
# 使用示例
@TTLCacheDecorator(ttl=60) # 缓存60秒
def get_database_data(query_id):
print(f"从数据库查询: {query_id}")
return f"数据_{query_id}"
# 测试
print(get_database_data(1)) # 查询数据库
print(get_database_data(1)) # 使用缓存
time.sleep(61)
print(get_database_data(1)) # 缓存过期,重新查询
推荐
- 简单场景:使用
functools.lru_cache配合自建过期机制 - 中等复杂度:使用
cachetools或cacheout库 - 分布式/生产环境:使用 Redis 等专业的缓存中间件
选择哪种方式取决于你的具体需求、项目规模和性能要求。