本文目录导读:

在Python中防止缓存过期通常有几种常用策略,下面我详细介绍几种方法:
使用TTL(Time To Live)缓存
基础实现
import time
from functools import wraps
def ttl_cache(seconds=300):
def decorator(func):
cache = {}
@wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
# 检查缓存是否存在且未过期
if key in cache:
value, timestamp = cache[key]
if time.time() - timestamp < seconds:
print(f"使用缓存数据: {key}")
return value
# 重新获取数据
value = func(*args, **kwargs)
cache[key] = (value, time.time())
print(f"更新缓存: {key}")
return value
return wrapper
return decorator
# 使用示例
@ttl_cache(seconds=60)
def get_user_data(user_id):
# 模拟耗时的API调用
time.sleep(2)
return {"id": user_id, "name": "张三", "role": "admin"}
使用第三方库(推荐)
使用cachetools
from cachetools import cached, TTLCache
from datetime import timedelta
# 创建缓存,设置最大大小和过期时间
cache = TTLCache(maxsize=100, ttl=300) # 5分钟过期
@cached(cache)
def get_expensive_data(query):
# 模拟耗时操作
return f"结果: {query}"
使用redis(分布式缓存)
import redis
import json
from datetime import timedelta
class RedisCache:
def __init__(self, host='localhost', port=6379, db=0):
self.client = redis.Redis(host=host, port=port, db=db)
def get_or_cache(self, key, func, ttl=300):
# 尝试从缓存获取
cached_data = self.client.get(key)
if cached_data:
print(f"从Redis获取缓存: {key}")
return json.loads(cached_data)
# 执行函数获取数据
data = func()
# 存入缓存并设置过期时间
self.client.setex(key, ttl, json.dumps(data))
print(f"缓存到Redis: {key}")
return data
# 使用示例
redis_cache = RedisCache()
def get_user_profile(user_id):
return redis_cache.get_or_cache(
f"user:{user_id}",
lambda: fetch_from_database(user_id),
ttl=3600 # 1小时过期
)
智能刷新策略
提前刷新(Prefetch)
import threading
import time
class SmartCache:
def __init__(self, ttl=300, prefetch_time=60):
self.ttl = ttl
self.prefetch_time = prefetch_time # 提前多少秒刷新
self.cache = {}
self.lock = threading.Lock()
def get(self, key, fetch_func):
with self.lock:
now = time.time()
if key in self.cache:
value, timestamp = self.cache[key]
age = now - timestamp
# 如果缓存未过期
if age < self.ttl:
# 如果接近过期时间,异步刷新
if age > (self.ttl - self.prefetch_time):
threading.Thread(
target=self._async_refresh,
args=(key, fetch_func)
).start()
return value
# 缓存过期或不存在,同步刷新
return self._sync_refresh(key, fetch_func)
def _sync_refresh(self, key, fetch_func):
value = fetch_func()
self.cache[key] = (value, time.time())
return value
def _async_refresh(self, key, fetch_func):
try:
value = fetch_func()
with self.lock:
self.cache[key] = (value, time.time())
except Exception as e:
print(f"异步刷新失败: {e}")
# 使用示例
smart_cache = SmartCache(ttl=300, prefetch_time=60)
def get_data():
return smart_cache.get("my_data", lambda: fetch_expensive_data())
基于状态变化的失效策略
class EventDrivenCache:
def __init__(self):
self.cache = {}
self.dependencies = {}
def set(self, key, value, dependencies=None):
self.cache[key] = {
'value': value,
'dependencies': dependencies or [],
'timestamp': time.time()
}
def get(self, key):
if key not in self.cache:
return None
entry = self.cache[key]
# 检查依赖是否发生变化
if self._check_dependencies_changed(entry['dependencies']):
del self.cache[key]
return None
return entry['value']
def invalidate(self, dependency_key):
"""当某个依赖发生变化时,清除所有相关缓存"""
keys_to_delete = []
for key, entry in self.cache.items():
if dependency_key in entry['dependencies']:
keys_to_delete.append(key)
for key in keys_to_delete:
del self.cache[key]
def _check_dependencies_changed(self, dependencies):
# 实现依赖检查逻辑
pass
实用建议
最佳实践
# 1. 使用多层缓存
class MultiLayerCache:
def __init__(self):
self.l1_cache = {} # 内存缓存,短TTL
self.l2_cache = {} # Redis缓存,长TTL
def get(self, key):
# 先查L1缓存
if key in self.l1_cache:
return self.l1_cache[key]
# 再查L2缓存
if key in self.l2_cache:
self.l1_cache[key] = self.l2_cache[key]
return self.l2_cache[key]
return None
# 2. 设置合理的过期时间
def get_user_data(user_id):
cache_key = f"user:{user_id}"
# 不同类型数据设置不同TTL
basic_info_ttl = 3600 # 基本信息1小时
permissions_ttl = 300 # 权限信息5分钟
activity_ttl = 60 # 活动状态1分钟
# 根据数据类型使用不同TTL
pass
防止缓存过期的关键策略:
- 合理设置TTL:根据数据更新频率调整
- 异步刷新:在过期前提前更新
- 事件驱动:数据更新时主动清除相关缓存
- 多层缓存:平衡性能和一致性
- 监控和调整:根据实际使用情况动态调整策略
选择哪种方案取决于你的具体需求:数据一致性要求、性能要求、系统规模等。