Python案例如何解决缓存失效?

wen python案例 65

本文目录导读:

Python案例如何解决缓存失效?

  1. 基于时间的缓存失效(TTL)
  2. 使用redis实现分布式缓存
  3. 缓存失效策略:主动失效
  4. 基于事件触发的缓存失效
  5. 使用Python内置的lru_cache
  6. 最佳实践建议

在Python中解决缓存失效问题,常见的方法和案例包括:

基于时间的缓存失效(TTL)

import time
from functools import wraps
def ttl_cache(seconds=60):
    def decorator(func):
        cache = {}
        @wraps(func)
        def wrapper(*args, **kwargs):
            key = (args, tuple(kwargs.items()))
            current_time = time.time()
            # 检查缓存是否存在且未过期
            if key in cache:
                result, timestamp = cache[key]
                if current_time - timestamp < seconds:
                    return result
            # 重新计算结果并缓存
            result = func(*args, **kwargs)
            cache[key] = (result, current_time)
            return result
        return wrapper
    return decorator
@ttl_cache(seconds=30)
def get_user_info(user_id):
    # 模拟耗时操作
    time.sleep(1)
    return {"id": user_id, "name": "User"}
# 使用示例
print(get_user_info(1))  # 第一次调用,执行原函数
print(get_user_info(1))  # 第二次调用,命中缓存
time.sleep(31)
print(get_user_info(1))  # 缓存过期,重新计算

使用redis实现分布式缓存

import redis
import json
from functools import wraps
class RedisCache:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            db=db,
            decode_responses=True
        )
    def cache(self, key_prefix, ttl=300):
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 生成唯一的缓存键
                cache_key = f"{key_prefix}:{hash(str(args) + str(kwargs))}"
                # 尝试从缓存获取
                cached_result = self.redis_client.get(cache_key)
                if cached_result:
                    print(f"命中缓存: {cache_key}")
                    return json.loads(cached_result)
                # 执行原函数
                result = func(*args, **kwargs)
                # 存入缓存
                self.redis_client.setex(
                    cache_key,
                    ttl,
                    json.dumps(result, default=str)
                )
                print(f"设置缓存: {cache_key}")
                return result
            return wrapper
        return decorator
# 使用示例
cache_manager = RedisCache()
@cache_manager.cache("user_info", ttl=60)
def get_user_data(user_id):
    # 模拟数据库查询
    return {"id": user_id, "data": "user_data"}
print(get_user_data(1))

缓存失效策略:主动失效

class CacheManager:
    def __init__(self):
        self.cache = {}
        self.expire_times = {}
    def set(self, key, value, ttl=None):
        self.cache[key] = value
        if ttl:
            self.expire_times[key] = time.time() + ttl
    def get(self, key):
        if key in self.cache:
            # 检查是否过期
            if key in self.expire_times:
                if time.time() > self.expire_times[key]:
                    self.delete(key)
                    return None
            return self.cache[key]
        return None
    def delete(self, key):
        self.cache.pop(key, None)
        self.expire_times.pop(key, None)
    def clear_expired(self):
        current_time = time.time()
        expired_keys = [
            key for key, expire_time in self.expire_times.items()
            if current_time > expire_time
        ]
        for key in expired_keys:
            self.delete(key)
# 使用示例
cache = CacheManager()
cache.set("important_data", "value", ttl=10)
print(cache.get("important_data"))  # 有效
time.sleep(11)
print(cache.get("important_data"))  # 已过期

基于事件触发的缓存失效

class EventBasedCache:
    def __init__(self):
        self.cache = {}
        self.invalidation_events = set()
    def register_invalidation(self, event_name):
        """注册缓存失效事件"""
        self.invalidation_events.add(event_name)
    def set_with_events(self, key, value, events=None):
        """设置缓存并关联失效事件"""
        self.cache[key] = {
            'value': value,
            'events': events or set()
        }
    def get(self, key):
        if key in self.cache:
            return self.cache[key]['value']
        return None
    def invalidate(self, event_name):
        """触发事件,使相关缓存失效"""
        keys_to_remove = [
            key for key, data in self.cache.items()
            if event_name in data['events']
        ]
        for key in keys_to_remove:
            del self.cache[key]
        print(f"事件 '{event_name}' 触发了 {len(keys_to_remove)} 个缓存失效")
# 使用示例
cache = EventBasedCache()
# 设置缓存并关联失效事件
cache.set_with_events("user_123_data", {"name": "Alice"}, events={'user_update', 'user_delete'})
cache.set_with_events("user_123_friends", ["Bob"], events={'user_friends_update'})
# 当用户信息更新时,使相关缓存失效
cache.invalidate('user_update')

使用Python内置的lru_cache

from functools import lru_cache
import time
@lru_cache(maxsize=128)
def expensive_function(n):
    """模拟耗时计算"""
    time.sleep(2)
    return n * n
# 使用示例
print(expensive_function(10))  # 计算并缓存
print(expensive_function(10))  # 命中缓存
print(expensive_function.cache_info())  # 查看缓存统计
# 手动清除缓存
expensive_function.cache_clear()
print(expensive_function(10))  # 重新计算

最佳实践建议

  1. 选择合适的TTL:根据数据更新频率设置合理的过期时间
  2. 使用cache-aside模式:先查缓存,没命中再查数据库,然后更新缓存
  3. 缓存预热:系统启动时预先加载热点数据
  4. 缓存降级:缓存不可用时,使用备用方案
  5. 监控缓存命中率:使用 cache_info() 等方法监控性能

选择哪种方案取决于你的具体需求:单机应用可用内存缓存,分布式系统推荐Redis,需要精确控制失效时机可使用事件驱动方案。

抱歉,评论功能暂时关闭!