Python数据缓存实现案例
使用字典实现简单缓存
import time
import functools
class SimpleCache:
def __init__(self, expire_time=60):
self.cache = {}
self.expire_time = expire_time
def get(self, key):
if key in self.cache:
value, timestamp = self.cache[key]
if time.time() - timestamp < self.expire_time:
return value
else:
del self.cache[key]
return None
def set(self, key, value):
self.cache[key] = (value, time.time())
# 使用示例
cache = SimpleCache(expire_time=30)
cache.set('user_1', {'name': 'Alice', 'age': 30})
data = cache.get('user_1')
print(data) # {'name': 'Alice', 'age': 30}
使用 functools.lru_cache 装饰器
from functools import lru_cache
import time
@lru_cache(maxsize=128)
def expensive_function(n):
"""计算斐波那契数列(演示用)"""
print(f"Computing {n}...")
if n <= 1:
return n
return expensive_function(n-1) + expensive_function(n-2)
# 使用示例
print(expensive_function(10)) # 计算结果并缓存
print(expensive_function(10)) # 直接返回缓存结果
print(expensive_function(20)) # 计算结果并缓存
使用缓存工具库 - cachetools
首先安装:pip install cachetools

from cachetools import cached, TTLCache
import time
# 创建TTL缓存(时间失效)
cache = TTLCache(maxsize=100, ttl=60) # 60秒后过期
@cached(cache)
def get_user_info(user_id):
"""模拟从数据库获取用户信息"""
print(f"从数据库获取用户 {user_id} 的信息")
time.sleep(2) # 模拟耗时操作
return {
'id': user_id,
'name': f'User_{user_id}',
'timestamp': time.time()
}
# 使用示例
print(get_user_info(1)) # 第一次会查询数据库
print(get_user_info(1)) # 第二次直接从缓存返回
time.sleep(3) # 等待3秒(假设TTL为60秒)
print(get_user_info(1)) # 缓存还在,直接返回
使用 Redis 实现分布式缓存
import redis
import json
import time
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
class RedisCache:
def __init__(self, expire_time=300):
self.expire_time = expire_time
def get(self, key):
data = r.get(key)
if data:
return json.loads(data)
return None
def set(self, key, value):
r.setex(key, self.expire_time, json.dumps(value))
def delete(self, key):
r.delete(key)
# 使用示例
cache = RedisCache(expire_time=60)
cache.set('user:123', {'name': 'Alice', 'role': 'admin'})
user_data = cache.get('user:123')
print(user_data)
带缓存的装饰器实现
import time
import hashlib
import json
def cache_decorator(expire_time=60):
def decorator(func):
cache = {}
def wrapper(*args, **kwargs):
# 生成缓存键
key = hashlib.md5(
json.dumps({
'args': args,
'kwargs': kwargs
}).encode()
).hexdigest()
# 检查缓存
if key in cache:
value, timestamp = cache[key]
if time.time() - timestamp < expire_time:
print("返回缓存数据...")
return value
# 执行函数并缓存结果
result = func(*args, **kwargs)
cache[key] = (result, time.time())
return result
# 添加清理方法
def clear_cache():
cache.clear()
wrapper.clear_cache = clear_cache
return wrapper
return decorator
@cache_decorator(expire_time=30)
def get_product_info(product_id):
"""模拟获取产品信息"""
print(f"从数据库获取产品 {product_id} 的信息")
time.sleep(1) # 模拟耗时
return {
'id': product_id,
'name': f'Product_{product_id}',
'price': 99.99
}
# 使用示例
print(get_product_info(1))
print(get_product_info(1)) # 会返回缓存
get_product_info.clear_cache() # 清除缓存
print(get_product_info(1)) # 重新查询
带淘汰策略的缓存实现
from collections import OrderedDict
import time
class LRUCache:
"""LRU(最近最少使用)缓存"""
def __init__(self, capacity=100):
self.cache = OrderedDict()
self.capacity = capacity
def get(self, key):
if key not in self.cache:
return None
# 移动到末尾(表示最近使用)
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key, value):
if key in self.cache:
# 更新已有项
self.cache[key] = value
self.cache.move_to_end(key)
else:
# 添加新项
if len(self.cache) >= self.capacity:
# 删除最久未使用的项
self.cache.popitem(last=False)
self.cache[key] = value
# 使用示例
lru = LRUCache(capacity=3)
lru.put('A', 1)
lru.put('B', 2)
lru.put('C', 3)
print(lru.cache) # OrderedDict([('A', 1), ('B', 2), ('C', 3)])
lru.get('A') # 访问A,A变为最近使用
lru.put('D', 4) # 删除最久未使用的B
print(lru.cache) # OrderedDict([('C', 3), ('A', 1), ('D', 4)])
选择建议
- 简单应用:使用
functools.lru_cache或自定义字典缓存 - 需要过期时间:使用
cachetools.TTLCache - 分布式系统:使用 Redis
- 内存控制:实现 LRU 淘汰策略
根据你的具体需求和数据规模选择合适的缓存方案。