Python案例如何操作Redis缓存?

wen python案例 9

本文目录导读:

Python案例如何操作Redis缓存?

  1. 安装与连接
  2. 基础操作案例
  3. 实际应用场景:缓存API响应
  4. 高级特性:管道和事务
  5. 实战:限流器实现
  6. 最佳实践建议

下面我来介绍Python操作Redis缓存的完整案例,包括基础操作、连接管理和实际应用场景。

安装与连接

安装redis库

pip install redis

连接Redis

import redis
# 方式一:直接连接
r = redis.Redis(host='localhost', port=6379, db=0, password='your_password')
# 方式二:连接池(推荐)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)
# 方式三:带超时和重试的配置
r = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    socket_connect_timeout=5,
    socket_timeout=5,
    retry_on_timeout=True,
    decode_responses=True  # 自动解码为字符串
)

基础操作案例

字符串操作

# 设置值
r.set('name', '张三')
r.set('counter', 1)
r.set('expire_key', '过期数据', ex=60)  # 60秒后过期
# 批量设置
r.mset({'key1': 'value1', 'key2': 'value2'})
# 获取值
name = r.get('name')  # b'张三'
print(f"Name: {name}")
# 计数器操作
r.incr('counter')     # +1
r.incrby('counter', 5)  # +5
r.decr('counter')     # -1
print(f"Counter: {r.get('counter')}")
# 检查键是否存在
if r.exists('name'):
    print("name键存在")
# 删除键
r.delete('temp_key')
# 设置过期时间
r.expire('counter', 120)  # 120秒后过期

哈希操作

# 存储用户信息
user_data = {
    'id': 1001,
    'name': '张三',
    'age': 25,
    'email': 'zhangsan@example.com'
}
# 设置哈希表
r.hset('user:1001', mapping=user_data)
# 获取单个字段
print(r.hget('user:1001', 'name'))     # 张三
# 获取所有字段
user = r.hgetall('user:1001')
print(user)  # {'id': '1001', 'name': '张三', 'age': '25', 'email': 'zhangsan@example.com'}
# 获取所有键
keys = r.hkeys('user:1001')
print(keys)  # ['id', 'name', 'age', 'email']
# 获取所有值
values = r.hvals('user:1001')
print(values)  # ['1001', '张三', '25', 'zhangsan@example.com']
# 更新单个字段
r.hset('user:1001', 'age', 26)

列表操作

# 添加数据到列表
r.rpush('logs', 'log1', 'log2', 'log3')  # 右侧添加
r.lpush('logs', 'log0')                   # 左侧添加
# 获取列表范围
all_logs = r.lrange('logs', 0, -1)  # 获取所有
print(all_logs)
# 列表长度
print(r.llen('logs'))
# 弹出元素
first = r.lpop('logs')   # 左侧弹出
last = r.rpop('logs')    # 右侧弹出
# 只保留指定范围的元素
r.ltrim('logs', 0, 10)  # 只保留前11个

集合操作

# 添加元素
r.sadd('tags', 'python', 'redis', 'database')
r.sadd('tags', 'python')  # 重复添加不会生效
# 获取所有成员
members = r.smembers('tags')
print(members)  # {'python', 'redis', 'database'}
# 从集合中移除
r.srem('tags', 'database')
# 检查元素是否存在
print(r.sismember('tags', 'redis'))  # True
# 集合运算
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')
# 交集
inter = r.sinter('set1', 'set2')
print(f"交集: {inter}")  # {'b', 'c'}
# 并集
union = r.sunion('set1', 'set2')
print(f"并集: {union}")  # {'a', 'b', 'c', 'd'}
# 差集
diff = r.sdiff('set1', 'set2')
print(f"差集: {diff}")  # {'a'}

有序集合操作

# 添加数据(分数从低到高)
r.zadd('leaderboard', {'player1': 100, 'player2': 200, 'player3': 150})
# 获取排名(从0开始)
rank = r.zrank('leaderboard', 'player1')  # 最低分排名第0
print(f"Player1排名: {rank}")
# 获取逆序排名
rev_rank = r.zrevrank('leaderboard', 'player2')
print(f"Player2逆序排名: {rev_rank}")
# 获取前两名
top2 = r.zrevrange('leaderboard', 0, 1, withscores=True)
print(top2)  # [('player2', 200.0), ('player3', 150.0)]
# 获取分数区间内的成员
between = r.zrangebyscore('leaderboard', 100, 150)
print(between)  # ['player1', 'player3']

实际应用场景:缓存API响应

import json
import time
from datetime import datetime
class RedisCache:
    def __init__(self, host='localhost', port=6379, db=0, default_expire=300):
        self.pool = redis.ConnectionPool(
            host=host, 
            port=port, 
            db=db,
            decode_responses=True
        )
        self.redis = redis.Redis(connection_pool=self.pool)
        self.default_expire = default_expire
    def get_or_set(self, key, func, expire=None):
        """
        获取缓存,如果不存在则调用函数获取并缓存
        """
        # 尝试从缓存获取
        cached_data = self.redis.get(key)
        if cached_data is not None:
            print(f"缓存命中: {key}")
            return json.loads(cached_data)
        # 缓存未命中,执行函数
        print(f"缓存未命中,执行函数: {key}")
        result = func()
        # 存储到缓存
        self.redis.setex(
            key, 
            expire or self.default_expire, 
            json.dumps(result, ensure_ascii=False)
        )
        return result
    def clear(self, pattern=None):
        """清除缓存"""
        if pattern:
            keys = self.redis.keys(pattern)
            if keys:
                self.redis.delete(*keys)
                print(f"清除了 {len(keys)} 个缓存")
        else:
            self.redis.flushdb()
            print("清除了所有缓存")
# 使用示例
def get_user_info(user_id):
    """模拟从数据库获取用户信息"""
    time.sleep(2)  # 模拟数据库查询延迟
    return {
        'id': user_id,
        'name': f'用户{user_id}',
        'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }
cache = RedisCache()
# 第一次调用(会执行函数)
user1 = cache.get_or_set('user:1001', lambda: get_user_info(1001))
print(f"第一次: {user1}")
# 第二次调用(从缓存获取,立即返回)
user2 = cache.get_or_set('user:1001', lambda: get_user_info(1001))
print(f"第二次: {user2}")
# 清除特定缓存
cache.clear('user:*')

高级特性:管道和事务

# 管道(批量操作,提高性能)
def batch_operation_example():
    pipe = r.pipeline()
    # 批量添加命令
    pipe.set('key1', 'value1')
    pipe.set('key2', 'value2')
    pipe.incr('counter')
    pipe.get('key1')
    # 执行所有命令
    results = pipe.execute()
    print(results)  # [True, True, 1, 'value1']
# 事务(保证原子性)
def transaction_example():
    try:
        # 开启事务
        pipe = r.pipeline(transaction=True)
        # 监控键
        pipe.watch('counter')
        # 开始事务
        pipe.multi()
        pipe.incr('counter', 10)
        pipe.decr('counter', 5)
        # 执行事务
        results = pipe.execute()
        print("事务执行成功")
    except redis.WatchError:
        print("监视的键被修改,事务回滚")
# 使用 Lua 脚本(保证原子性)
def lua_script_example():
    # 定义 Lua 脚本
    script = """
    local current = redis.call('GET', KEYS[1])
    if current then
        redis.call('SET', KEYS[1], ARGV[1])
        return current
    end
    return nil
    """
    # 注册脚本
    update_script = r.register_script(script)
    # 执行脚本
    old_value = update_script(keys=['key1'], args=['new_value'])
    print(f"旧值: {old_value}")

实战:限流器实现

class RateLimiter:
    """滑动窗口限流器"""
    def __init__(self, redis_client, max_requests=100, window_seconds=60):
        self.redis = redis_client
        self.max_requests = max_requests
        self.window_seconds = window_seconds
    def allow_request(self, key):
        """
        检查是否允许请求
        返回: (是否允许, 当前请求数)
        """
        current_time = int(time.time())
        window_start = current_time - self.window_seconds
        pipe = self.redis.pipeline()
        # 移除窗口外的请求
        pipe.zremrangebyscore(key, 0, window_start)
        # 获取当前窗口内的请求数
        pipe.zcard(key)
        # 添加当前请求
        pipe.zadd(key, {str(current_time): current_time})
        # 设置过期时间
        pipe.expire(key, self.window_seconds + 1)
        _, current_count, _, _ = pipe.execute()
        if current_count > self.max_requests:
            return False, current_count
        return True, current_count
# 使用示例
limiter = RateLimiter(r, max_requests=10, window_seconds=1)
for i in range(15):
    allowed, count = limiter.allow_request('api:user:1001')
    print(f"请求 {i+1}: {'允许' if allowed else '拒绝'}, 当前请求数: {count}")
    time.sleep(0.1)

最佳实践建议

配置优化

# 使用连接池
pool = redis.ConnectionPool(
    max_connections=20,
    socket_connect_timeout=5,
    socket_timeout=5,
    retry_on_timeout=True,
    health_check_interval=30
)
# 设置键的命名规范
# user:{id}:{field}
# cache:{module}:{key}
# rate:{action}:{user_id}

异常处理

def safe_redis_operation():
    try:
        result = r.get('key')
        return result
    except redis.ConnectionError:
        print("Redis连接失败")
        # 降级处理:从数据库获取数据
        return database_fallback()
    except redis.TimeoutError:
        print("Redis操作超时")
        return fallback_value()
    except Exception as e:
        print(f"Redis操作异常: {e}")
        return None

这些案例涵盖了Python操作Redis的主要场景,根据实际需求,你可以选择合适的操作方式和数据结构来优化应用性能,Redis在缓存、限流、排行榜、分布式锁等场景中都有广泛应用。

抱歉,评论功能暂时关闭!