Python案例如何实现数据缓存?

wen python案例 6

Python数据缓存实现案例

使用字典实现简单缓存

import time
import functools
class SimpleCache:
    def __init__(self, expire_time=60):
        self.cache = {}
        self.expire_time = expire_time
    def get(self, key):
        if key in self.cache:
            value, timestamp = self.cache[key]
            if time.time() - timestamp < self.expire_time:
                return value
            else:
                del self.cache[key]
        return None
    def set(self, key, value):
        self.cache[key] = (value, time.time())
# 使用示例
cache = SimpleCache(expire_time=30)
cache.set('user_1', {'name': 'Alice', 'age': 30})
data = cache.get('user_1')
print(data)  # {'name': 'Alice', 'age': 30}

使用 functools.lru_cache 装饰器

from functools import lru_cache
import time
@lru_cache(maxsize=128)
def expensive_function(n):
    """计算斐波那契数列(演示用)"""
    print(f"Computing {n}...")
    if n <= 1:
        return n
    return expensive_function(n-1) + expensive_function(n-2)
# 使用示例
print(expensive_function(10))  # 计算结果并缓存
print(expensive_function(10))  # 直接返回缓存结果
print(expensive_function(20))  # 计算结果并缓存

使用缓存工具库 - cachetools

首先安装:pip install cachetools

Python案例如何实现数据缓存?

from cachetools import cached, TTLCache
import time
# 创建TTL缓存(时间失效)
cache = TTLCache(maxsize=100, ttl=60)  # 60秒后过期
@cached(cache)
def get_user_info(user_id):
    """模拟从数据库获取用户信息"""
    print(f"从数据库获取用户 {user_id} 的信息")
    time.sleep(2)  # 模拟耗时操作
    return {
        'id': user_id,
        'name': f'User_{user_id}',
        'timestamp': time.time()
    }
# 使用示例
print(get_user_info(1))  # 第一次会查询数据库
print(get_user_info(1))  # 第二次直接从缓存返回
time.sleep(3)  # 等待3秒(假设TTL为60秒)
print(get_user_info(1))  # 缓存还在,直接返回

使用 Redis 实现分布式缓存

import redis
import json
import time
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
class RedisCache:
    def __init__(self, expire_time=300):
        self.expire_time = expire_time
    def get(self, key):
        data = r.get(key)
        if data:
            return json.loads(data)
        return None
    def set(self, key, value):
        r.setex(key, self.expire_time, json.dumps(value))
    def delete(self, key):
        r.delete(key)
# 使用示例
cache = RedisCache(expire_time=60)
cache.set('user:123', {'name': 'Alice', 'role': 'admin'})
user_data = cache.get('user:123')
print(user_data)

带缓存的装饰器实现

import time
import hashlib
import json
def cache_decorator(expire_time=60):
    def decorator(func):
        cache = {}
        def wrapper(*args, **kwargs):
            # 生成缓存键
            key = hashlib.md5(
                json.dumps({
                    'args': args,
                    'kwargs': kwargs
                }).encode()
            ).hexdigest()
            # 检查缓存
            if key in cache:
                value, timestamp = cache[key]
                if time.time() - timestamp < expire_time:
                    print("返回缓存数据...")
                    return value
            # 执行函数并缓存结果
            result = func(*args, **kwargs)
            cache[key] = (result, time.time())
            return result
        # 添加清理方法
        def clear_cache():
            cache.clear()
        wrapper.clear_cache = clear_cache
        return wrapper
    return decorator
@cache_decorator(expire_time=30)
def get_product_info(product_id):
    """模拟获取产品信息"""
    print(f"从数据库获取产品 {product_id} 的信息")
    time.sleep(1)  # 模拟耗时
    return {
        'id': product_id,
        'name': f'Product_{product_id}',
        'price': 99.99
    }
# 使用示例
print(get_product_info(1))
print(get_product_info(1))  # 会返回缓存
get_product_info.clear_cache()  # 清除缓存
print(get_product_info(1))  # 重新查询

带淘汰策略的缓存实现

from collections import OrderedDict
import time
class LRUCache:
    """LRU(最近最少使用)缓存"""
    def __init__(self, capacity=100):
        self.cache = OrderedDict()
        self.capacity = capacity
    def get(self, key):
        if key not in self.cache:
            return None
        # 移动到末尾(表示最近使用)
        self.cache.move_to_end(key)
        return self.cache[key]
    def put(self, key, value):
        if key in self.cache:
            # 更新已有项
            self.cache[key] = value
            self.cache.move_to_end(key)
        else:
            # 添加新项
            if len(self.cache) >= self.capacity:
                # 删除最久未使用的项
                self.cache.popitem(last=False)
            self.cache[key] = value
# 使用示例
lru = LRUCache(capacity=3)
lru.put('A', 1)
lru.put('B', 2)
lru.put('C', 3)
print(lru.cache)  # OrderedDict([('A', 1), ('B', 2), ('C', 3)])
lru.get('A')  # 访问A,A变为最近使用
lru.put('D', 4)  # 删除最久未使用的B
print(lru.cache)  # OrderedDict([('C', 3), ('A', 1), ('D', 4)])

选择建议

  1. 简单应用:使用 functools.lru_cache 或自定义字典缓存
  2. 需要过期时间:使用 cachetools.TTLCache
  3. 分布式系统:使用 Redis
  4. 内存控制:实现 LRU 淘汰策略

根据你的具体需求和数据规模选择合适的缓存方案。

抱歉,评论功能暂时关闭!