Python案例如何读取Redis数据?

wen python案例 9

本文目录导读:

Python案例如何读取Redis数据?

  1. 基础安装和连接
  2. 读取不同类型的数据
  3. 实战案例:用户登录系统
  4. 批量读取和高效操作
  5. 错误处理
  6. 完整示例:缓存系统
  7. 注意事项

我来介绍几种在Python中读取Redis数据的常用方法:

基础安装和连接

# 首先安装redis库
# pip install redis
import redis
# 方法1:基本连接
r = redis.Redis(host='localhost', port=6379, db=0, password='your_password')
# 方法2:使用连接池(推荐用于生产环境)
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    password='your_password',
    max_connections=10
)
r = redis.Redis(connection_pool=pool)

读取不同类型的数据

import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 1. 读取字符串类型
r.set('username', '张三')
username = r.get('username')  # 返回: '张三'
print(f"用户名: {username}")
# 2. 读取Hash类型
r.hset('user:1', mapping={
    'name': '李四',
    'age': '25',
    'city': '北京'
})
user_info = r.hgetall('user:1')  # 返回字典
name = r.hget('user:1', 'name')  # 获取单个字段
print(f"用户信息: {user_info}")
print(f"用户名: {name}")
# 3. 读取List类型
r.rpush('mylist', 'a', 'b', 'c')
list_data = r.lrange('mylist', 0, -1)  # 获取所有元素
print(f"列表数据: {list_data}")
# 4. 读取Set类型
r.sadd('myset', 'apple', 'banana', 'orange')
set_data = r.smembers('myset')  # 获取所有成员
print(f"集合数据: {set_data}")
# 5. 读取Sorted Set类型
r.zadd('leaderboard', {'player1': 100, 'player2': 200, 'player3': 150})
sorted_data = r.zrange('leaderboard', 0, -1, withscores=True)
print(f"排行榜: {sorted_data}")

实战案例:用户登录系统

import redis
import json
import time
class RedisSessionManager:
    def __init__(self, redis_client):
        self.redis = redis_client
    def save_user_session(self, user_id, session_data, expire_time=3600):
        """保存用户会话"""
        key = f"session:{user_id}"
        self.redis.setex(
            key, 
            expire_time,
            json.dumps(session_data)
        )
    def get_user_session(self, user_id):
        """获取用户会话"""
        key = f"session:{user_id}"
        session_data = self.redis.get(key)
        if session_data:
            return json.loads(session_data)
        return None
    def delete_user_session(self, user_id):
        """删除用户会话"""
        key = f"session:{user_id}"
        self.redis.delete(key)
# 使用示例
if __name__ == "__main__":
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    session_manager = RedisSessionManager(r)
    # 保存用户会话
    user_session = {
        'user_id': 1001,
        'username': '张三',
        'login_time': time.time(),
        'role': 'admin'
    }
    session_manager.save_user_session(1001, user_session, 7200)
    # 读取用户会话
    session = session_manager.get_user_session(1001)
    print(f"用户会话: {session}")
    # 检查会话是否过期
    if session and time.time() - session['login_time'] < 3600:
        print(f"欢迎回来, {session['username']}")

批量读取和高效操作

import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 1. 批量读取多个key
keys = ['user:1', 'user:2', 'user:3']
users = r.mget(keys)
print(f"批量读取: {users}")
# 2. 使用管道读取(提高性能)
pipe = r.pipeline()
pipe.get('key1')
pipe.get('key2')
pipe.hgetall('user:1')
results = pipe.execute()
print(f"管道结果: {results}")
# 3. 扫描匹配的key
cursor = 0
pattern = "user:*"
while True:
    cursor, keys = r.scan(cursor, match=pattern, count=10)
    for key in keys:
        value = r.get(key)
        print(f"Key: {key}, Value: {value}")
    if cursor == 0:
        break

错误处理

import redis
from redis.exceptions import RedisError, ConnectionError
class RedisClient:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis = None
        self.config = {
            'host': host,
            'port': port,
            'db': db,
            'socket_connect_timeout': 3,
            'socket_timeout': 3
        }
        self.connect()
    def connect(self):
        try:
            self.redis = redis.Redis(**self.config)
            self.redis.ping()  # 测试连接
            print("Redis连接成功")
        except ConnectionError as e:
            print(f"连接失败: {e}")
    def safe_get(self, key):
        """安全读取数据"""
        try:
            value = self.redis.get(key)
            return value
        except RedisError as e:
            print(f"读取失败: {e}")
            return None
# 使用示例
client = RedisClient()
data = client.safe_get('test_key')
print(f"读取的数据: {data}")

完整示例:缓存系统

import redis
import json
from functools import wraps
class RedisCache:
    def __init__(self):
        self.redis = redis.Redis(
            host='localhost',
            port=6379,
            decode_responses=True,
            db=1  # 使用专门的数据库
        )
    def get(self, key):
        """获取缓存"""
        data = self.redis.get(key)
        if data:
            return json.loads(data)
        return None
    def set(self, key, value, expire=300):
        """设置缓存"""
        self.redis.setex(key, expire, json.dumps(value))
    def delete(self, key):
        """删除缓存"""
        self.redis.delete(key)
# 装饰器:自动缓存函数结果
def cache(expire=300):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
            cache_instance = RedisCache()
            # 尝试从缓存获取
            cached_result = cache_instance.get(cache_key)
            if cached_result is not None:
                print(f"从缓存读取: {cache_key}")
                return cached_result
            # 执行原函数
            result = func(*args, **kwargs)
            # 存入缓存
            cache_instance.set(cache_key, result, expire)
            print(f"缓存数据: {cache_key}")
            return result
        return wrapper
    return decorator
# 使用示例
@cache(expire=60)
def get_user_info(user_id):
    """模拟从数据库获取用户信息"""
    # 实际应用中这里会是数据库查询
    return {
        'id': user_id,
        'name': f'User_{user_id}',
        'time': '模拟数据库查询'
    }
# 测试
if __name__ == "__main__":
    # 第一次调用,会查询并缓存
    user1 = get_user_info(1)
    print(f"第一次调用: {user1}")
    # 第二次调用,从缓存读取
    user2 = get_user_info(1)
    print(f"第二次调用: {user2}")

注意事项

  1. 连接池管理:生产环境建议使用连接池
  2. 超时设置:设置合理的连接超时时间
  3. 异常处理:处理Redis连接失败、命令错误等异常
  4. 编码问题:使用decode_responses=True自动解码
  5. 性能优化:使用管道批量操作,避免频繁网络请求

这些示例涵盖了Python读取Redis的常见场景,你可以根据实际需求选择合适的方案。

抱歉,评论功能暂时关闭!