本文目录导读:

我来介绍几种在Python中读取Redis数据的常用方法:
基础安装和连接
# 首先安装redis库
# pip install redis
import redis
# 方法1:基本连接
r = redis.Redis(host='localhost', port=6379, db=0, password='your_password')
# 方法2:使用连接池(推荐用于生产环境)
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
password='your_password',
max_connections=10
)
r = redis.Redis(connection_pool=pool)
读取不同类型的数据
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 1. 读取字符串类型
r.set('username', '张三')
username = r.get('username') # 返回: '张三'
print(f"用户名: {username}")
# 2. 读取Hash类型
r.hset('user:1', mapping={
'name': '李四',
'age': '25',
'city': '北京'
})
user_info = r.hgetall('user:1') # 返回字典
name = r.hget('user:1', 'name') # 获取单个字段
print(f"用户信息: {user_info}")
print(f"用户名: {name}")
# 3. 读取List类型
r.rpush('mylist', 'a', 'b', 'c')
list_data = r.lrange('mylist', 0, -1) # 获取所有元素
print(f"列表数据: {list_data}")
# 4. 读取Set类型
r.sadd('myset', 'apple', 'banana', 'orange')
set_data = r.smembers('myset') # 获取所有成员
print(f"集合数据: {set_data}")
# 5. 读取Sorted Set类型
r.zadd('leaderboard', {'player1': 100, 'player2': 200, 'player3': 150})
sorted_data = r.zrange('leaderboard', 0, -1, withscores=True)
print(f"排行榜: {sorted_data}")
实战案例:用户登录系统
import redis
import json
import time
class RedisSessionManager:
def __init__(self, redis_client):
self.redis = redis_client
def save_user_session(self, user_id, session_data, expire_time=3600):
"""保存用户会话"""
key = f"session:{user_id}"
self.redis.setex(
key,
expire_time,
json.dumps(session_data)
)
def get_user_session(self, user_id):
"""获取用户会话"""
key = f"session:{user_id}"
session_data = self.redis.get(key)
if session_data:
return json.loads(session_data)
return None
def delete_user_session(self, user_id):
"""删除用户会话"""
key = f"session:{user_id}"
self.redis.delete(key)
# 使用示例
if __name__ == "__main__":
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
session_manager = RedisSessionManager(r)
# 保存用户会话
user_session = {
'user_id': 1001,
'username': '张三',
'login_time': time.time(),
'role': 'admin'
}
session_manager.save_user_session(1001, user_session, 7200)
# 读取用户会话
session = session_manager.get_user_session(1001)
print(f"用户会话: {session}")
# 检查会话是否过期
if session and time.time() - session['login_time'] < 3600:
print(f"欢迎回来, {session['username']}")
批量读取和高效操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 1. 批量读取多个key
keys = ['user:1', 'user:2', 'user:3']
users = r.mget(keys)
print(f"批量读取: {users}")
# 2. 使用管道读取(提高性能)
pipe = r.pipeline()
pipe.get('key1')
pipe.get('key2')
pipe.hgetall('user:1')
results = pipe.execute()
print(f"管道结果: {results}")
# 3. 扫描匹配的key
cursor = 0
pattern = "user:*"
while True:
cursor, keys = r.scan(cursor, match=pattern, count=10)
for key in keys:
value = r.get(key)
print(f"Key: {key}, Value: {value}")
if cursor == 0:
break
错误处理
import redis
from redis.exceptions import RedisError, ConnectionError
class RedisClient:
def __init__(self, host='localhost', port=6379, db=0):
self.redis = None
self.config = {
'host': host,
'port': port,
'db': db,
'socket_connect_timeout': 3,
'socket_timeout': 3
}
self.connect()
def connect(self):
try:
self.redis = redis.Redis(**self.config)
self.redis.ping() # 测试连接
print("Redis连接成功")
except ConnectionError as e:
print(f"连接失败: {e}")
def safe_get(self, key):
"""安全读取数据"""
try:
value = self.redis.get(key)
return value
except RedisError as e:
print(f"读取失败: {e}")
return None
# 使用示例
client = RedisClient()
data = client.safe_get('test_key')
print(f"读取的数据: {data}")
完整示例:缓存系统
import redis
import json
from functools import wraps
class RedisCache:
def __init__(self):
self.redis = redis.Redis(
host='localhost',
port=6379,
decode_responses=True,
db=1 # 使用专门的数据库
)
def get(self, key):
"""获取缓存"""
data = self.redis.get(key)
if data:
return json.loads(data)
return None
def set(self, key, value, expire=300):
"""设置缓存"""
self.redis.setex(key, expire, json.dumps(value))
def delete(self, key):
"""删除缓存"""
self.redis.delete(key)
# 装饰器:自动缓存函数结果
def cache(expire=300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
cache_instance = RedisCache()
# 尝试从缓存获取
cached_result = cache_instance.get(cache_key)
if cached_result is not None:
print(f"从缓存读取: {cache_key}")
return cached_result
# 执行原函数
result = func(*args, **kwargs)
# 存入缓存
cache_instance.set(cache_key, result, expire)
print(f"缓存数据: {cache_key}")
return result
return wrapper
return decorator
# 使用示例
@cache(expire=60)
def get_user_info(user_id):
"""模拟从数据库获取用户信息"""
# 实际应用中这里会是数据库查询
return {
'id': user_id,
'name': f'User_{user_id}',
'time': '模拟数据库查询'
}
# 测试
if __name__ == "__main__":
# 第一次调用,会查询并缓存
user1 = get_user_info(1)
print(f"第一次调用: {user1}")
# 第二次调用,从缓存读取
user2 = get_user_info(1)
print(f"第二次调用: {user2}")
注意事项
- 连接池管理:生产环境建议使用连接池
- 超时设置:设置合理的连接超时时间
- 异常处理:处理Redis连接失败、命令错误等异常
- 编码问题:使用
decode_responses=True自动解码 - 性能优化:使用管道批量操作,避免频繁网络请求
这些示例涵盖了Python读取Redis的常见场景,你可以根据实际需求选择合适的方案。