本文目录导读:

在Python中解决缓存失效问题,常见的方法和案例包括:
基于时间的缓存失效(TTL)
import time
from functools import wraps
def ttl_cache(seconds=60):
def decorator(func):
cache = {}
@wraps(func)
def wrapper(*args, **kwargs):
key = (args, tuple(kwargs.items()))
current_time = time.time()
# 检查缓存是否存在且未过期
if key in cache:
result, timestamp = cache[key]
if current_time - timestamp < seconds:
return result
# 重新计算结果并缓存
result = func(*args, **kwargs)
cache[key] = (result, current_time)
return result
return wrapper
return decorator
@ttl_cache(seconds=30)
def get_user_info(user_id):
# 模拟耗时操作
time.sleep(1)
return {"id": user_id, "name": "User"}
# 使用示例
print(get_user_info(1)) # 第一次调用,执行原函数
print(get_user_info(1)) # 第二次调用,命中缓存
time.sleep(31)
print(get_user_info(1)) # 缓存过期,重新计算
使用redis实现分布式缓存
import redis
import json
from functools import wraps
class RedisCache:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True
)
def cache(self, key_prefix, ttl=300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成唯一的缓存键
cache_key = f"{key_prefix}:{hash(str(args) + str(kwargs))}"
# 尝试从缓存获取
cached_result = self.redis_client.get(cache_key)
if cached_result:
print(f"命中缓存: {cache_key}")
return json.loads(cached_result)
# 执行原函数
result = func(*args, **kwargs)
# 存入缓存
self.redis_client.setex(
cache_key,
ttl,
json.dumps(result, default=str)
)
print(f"设置缓存: {cache_key}")
return result
return wrapper
return decorator
# 使用示例
cache_manager = RedisCache()
@cache_manager.cache("user_info", ttl=60)
def get_user_data(user_id):
# 模拟数据库查询
return {"id": user_id, "data": "user_data"}
print(get_user_data(1))
缓存失效策略:主动失效
class CacheManager:
def __init__(self):
self.cache = {}
self.expire_times = {}
def set(self, key, value, ttl=None):
self.cache[key] = value
if ttl:
self.expire_times[key] = time.time() + ttl
def get(self, key):
if key in self.cache:
# 检查是否过期
if key in self.expire_times:
if time.time() > self.expire_times[key]:
self.delete(key)
return None
return self.cache[key]
return None
def delete(self, key):
self.cache.pop(key, None)
self.expire_times.pop(key, None)
def clear_expired(self):
current_time = time.time()
expired_keys = [
key for key, expire_time in self.expire_times.items()
if current_time > expire_time
]
for key in expired_keys:
self.delete(key)
# 使用示例
cache = CacheManager()
cache.set("important_data", "value", ttl=10)
print(cache.get("important_data")) # 有效
time.sleep(11)
print(cache.get("important_data")) # 已过期
基于事件触发的缓存失效
class EventBasedCache:
def __init__(self):
self.cache = {}
self.invalidation_events = set()
def register_invalidation(self, event_name):
"""注册缓存失效事件"""
self.invalidation_events.add(event_name)
def set_with_events(self, key, value, events=None):
"""设置缓存并关联失效事件"""
self.cache[key] = {
'value': value,
'events': events or set()
}
def get(self, key):
if key in self.cache:
return self.cache[key]['value']
return None
def invalidate(self, event_name):
"""触发事件,使相关缓存失效"""
keys_to_remove = [
key for key, data in self.cache.items()
if event_name in data['events']
]
for key in keys_to_remove:
del self.cache[key]
print(f"事件 '{event_name}' 触发了 {len(keys_to_remove)} 个缓存失效")
# 使用示例
cache = EventBasedCache()
# 设置缓存并关联失效事件
cache.set_with_events("user_123_data", {"name": "Alice"}, events={'user_update', 'user_delete'})
cache.set_with_events("user_123_friends", ["Bob"], events={'user_friends_update'})
# 当用户信息更新时,使相关缓存失效
cache.invalidate('user_update')
使用Python内置的lru_cache
from functools import lru_cache
import time
@lru_cache(maxsize=128)
def expensive_function(n):
"""模拟耗时计算"""
time.sleep(2)
return n * n
# 使用示例
print(expensive_function(10)) # 计算并缓存
print(expensive_function(10)) # 命中缓存
print(expensive_function.cache_info()) # 查看缓存统计
# 手动清除缓存
expensive_function.cache_clear()
print(expensive_function(10)) # 重新计算
最佳实践建议
- 选择合适的TTL:根据数据更新频率设置合理的过期时间
- 使用cache-aside模式:先查缓存,没命中再查数据库,然后更新缓存
- 缓存预热:系统启动时预先加载热点数据
- 缓存降级:缓存不可用时,使用备用方案
- 监控缓存命中率:使用
cache_info()等方法监控性能
选择哪种方案取决于你的具体需求:单机应用可用内存缓存,分布式系统推荐Redis,需要精确控制失效时机可使用事件驱动方案。