Python案例怎么设置缓存过期时间?

wen python案例 11

本文目录导读:

Python案例怎么设置缓存过期时间?

  1. 使用 functools.lru_cache + 手动过期控制
  2. 使用 cachetools 库(推荐)
  3. 使用自定义缓存类
  4. 使用第三方库 cacheout
  5. 使用 Redis 实现分布式缓存过期
  6. 装饰器方式(最实用)
  7. 推荐

在Python中设置缓存过期时间有多种方式,我来介绍几种常用的方法:

使用 functools.lru_cache + 手动过期控制

import time
from functools import lru_cache
def timed_lru_cache(seconds: int, maxsize: int = 128):
    """自定义带过期时间的lru_cache"""
    def wrapper_cache(func):
        func = lru_cache(maxsize=maxsize)(func)
        func.lifetime = seconds
        func.expiration = time.time() + seconds
        @functools.wraps(func)
        def wrapped_func(*args, **kwargs):
            if time.time() >= func.expiration:
                func.cache_clear()
                func.expiration = time.time() + func.lifetime
            return func(*args, **kwargs)
        return wrapped_func
    return wrapper_cache
# 使用示例
@timed_lru_cache(seconds=60)  # 缓存60秒
def get_user(user_id):
    print(f"从数据库查询用户: {user_id}")
    return {"id": user_id, "name": "用户"}
# 测试
print(get_user(1))  # 查询数据库
print(get_user(1))  # 使用缓存
time.sleep(61)
print(get_user(1))  # 缓存过期,重新查询

使用 cachetools 库(推荐)

from cachetools import TTLCache
import time
# 创建TTL缓存:最多存100个,每个缓存60秒过期
cache = TTLCache(maxsize=100, ttl=60)
def get_user(user_id):
    if user_id in cache:
        print("使用缓存")
        return cache[user_id]
    print("从数据库查询")
    user = {"id": user_id, "name": "用户"}
    cache[user_id] = user
    return user
# 测试
print(get_user(1))  # 查询数据库
print(get_user(1))  # 使用缓存
time.sleep(61)
print(get_user(1))  # 缓存过期,重新查询

使用自定义缓存类

import time
from typing import Any, Dict, Optional
class SimpleCache:
    def __init__(self):
        self._cache: Dict[str, Dict] = {}
    def set(self, key: str, value: Any, expire_seconds: int = 300):
        """设置缓存,默认5分钟过期"""
        self._cache[key] = {
            'value': value,
            'expire_time': time.time() + expire_seconds
        }
    def get(self, key: str) -> Optional[Any]:
        """获取缓存,如果过期返回None"""
        if key not in self._cache:
            return None
        cache_data = self._cache[key]
        if time.time() > cache_data['expire_time']:
            # 缓存过期
            del self._cache[key]
            return None
        return cache_data['value']
    def delete(self, key: str):
        """删除缓存"""
        self._cache.pop(key, None)
    def clear(self):
        """清空所有缓存"""
        self._cache.clear()
# 使用示例
cache = SimpleCache()
cache.set('user_1', {'name': '张三'}, expire_seconds=60)  # 60秒过期
print(cache.get('user_1'))  # 输出: {'name': '张三'}
time.sleep(61)
print(cache.get('user_1'))  # 输出: None

使用第三方库 cacheout

from cacheout import Cache
import time
# 创建缓存实例,默认60秒过期
cache = Cache(maxsize=100, ttl=60)
# 设置带过期时间的缓存
cache.set('user_1', {'name': '张三'}, ttl=30)  # 30秒过期
cache.set('user_2', {'name': '李四'})          # 使用默认60秒过期
# 获取缓存
print(cache.get('user_1'))  # 输出: {'name': '张三'}
# 检查缓存是否过期
if cache.has('user_1'):
    print("缓存有效")
else:
    print("缓存已过期")
# 设置过期回调函数
def on_expire(key, value):
    print(f"缓存 {key} 已过期,值: {value}")
cache.set('text', '测试', ttl=5, expire_callback=on_expire)
time.sleep(6)
print(cache.get('text'))  # 输出: None,并触发回调

使用 Redis 实现分布式缓存过期

import redis
import json
class RedisCache:
    def __init__(self, host='localhost', port=6379, db=0):
        self.client = redis.Redis(host=host, port=port, db=db)
    def set(self, key: str, value: Any, expire_seconds: int = 300):
        """设置缓存,默认5分钟过期"""
        self.client.setex(
            key,
            expire_seconds,
            json.dumps(value, ensure_ascii=False)
        )
    def get(self, key: str) -> Optional[Any]:
        """获取缓存"""
        value = self.client.get(key)
        if value:
            return json.loads(value)
        return None
    def delete(self, key: str):
        """删除缓存"""
        self.client.delete(key)
# 使用示例
cache = RedisCache()
cache.set('user_1', {'name': '张三'}, expire_seconds=60)
print(cache.get('user_1'))  # 输出: {'name': '张三'}

装饰器方式(最实用)

import time
import functools
from typing import Dict, Any, Optional
class TTLCacheDecorator:
    def __init__(self, ttl: int = 300):
        self.ttl = ttl
        self.cache: Dict[str, Dict] = {}
    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 生成缓存key
            key = str(args) + str(sorted(kwargs.items()))
            # 检查缓存是否有效
            if key in self.cache:
                cached_value = self.cache[key]
                if time.time() < cached_value['expire_time']:
                    print("使用缓存")
                    return cached_value['value']
                else:
                    # 删除过期缓存
                    del self.cache[key]
            # 执行函数并缓存结果
            result = func(*args, **kwargs)
            self.cache[key] = {
                'value': result,
                'expire_time': time.time() + self.ttl
            }
            return result
        return wrapper
# 使用示例
@TTLCacheDecorator(ttl=60)  # 缓存60秒
def get_database_data(query_id):
    print(f"从数据库查询: {query_id}")
    return f"数据_{query_id}"
# 测试
print(get_database_data(1))  # 查询数据库
print(get_database_data(1))  # 使用缓存
time.sleep(61)
print(get_database_data(1))  # 缓存过期,重新查询

推荐

  • 简单场景:使用 functools.lru_cache 配合自建过期机制
  • 中等复杂度:使用 cachetoolscacheout
  • 分布式/生产环境:使用 Redis 等专业的缓存中间件

选择哪种方式取决于你的具体需求、项目规模和性能要求。

抱歉,评论功能暂时关闭!