本文目录导读:

在Python中操作Redis数据库有多种方式,我来介绍几种常见的新增数据方法:
基础安装和连接
# 首先安装redis库 # pip install redis import redis # 连接Redis r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) # 或者使用连接池 pool = redis.ConnectionPool(host='localhost', port=6379, db=0) r = redis.Redis(connection_pool=pool)
新增不同类型的数据
字符串(String)
# 新增字符串
r.set('name', '张三')
r.set('age', 25)
# 批量新增
r.mset({'city': '北京', 'job': '工程师'})
# 设置过期时间(单位:秒)
r.setex('temp', 3600, '临时数据') # 1小时后过期
r.setnx('unique_key', '值') # 仅当key不存在时设置
列表(List)
# 从左侧添加
r.lpush('mylist', '元素1', '元素2', '元素3')
# 从右侧添加
r.rpush('mylist2', 'a', 'b', 'c')
# 插入到指定位置
r.linsert('mylist', 'BEFORE', '元素2', '新元素')
哈希(Hash)
# 新增单个字段
r.hset('user:1', 'username', '李四')
r.hset('user:1', 'email', 'lisi@example.com')
# 新增多个字段
user_data = {
'name': '王五',
'age': 30,
'city': '上海'
}
r.hset('user:2', mapping=user_data)
集合(Set)
# 新增集合元素
r.sadd('myset', '元素A', '元素B', '元素C')
r.sadd('myset', '元素D')
有序集合(Sorted Set)
# 新增有序集合元素(带分数)
r.zadd('scores', {'张三': 95, '李四': 88, '王五': 92})
# 添加单个元素
r.zadd('scores', {'赵六': 78})
完整示例程序
import redis
import json
from datetime import datetime
class RedisDemo:
def __init__(self):
self.r = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True,
password='your_password' # 如果有密码
)
def add_string_data(self):
"""新增字符串类型数据"""
# 基本字符串
self.r.set('welcome_msg', 'Hello, Redis!')
# 存储JSON对象
user_info = {
'name': '小明',
'age': 28,
'hobbies': ['读书', '编程', '运动']
}
self.r.set('user:info', json.dumps(user_info))
# 计数器
self.r.set('page_views', 0)
self.r.incr('page_views') # 自增1
def add_list_data(self):
"""新增列表数据"""
# 存储用户操作日志
actions = ['login', 'search', 'view_product', 'add_to_cart']
for action in actions:
self.r.rpush('user:actions', action)
# 获取列表长度
length = self.r.llen('user:actions')
print(f"列表长度: {length}")
def add_hash_data(self):
"""新增哈希数据"""
# 存储用户详细信息
user = {
'id': '1001',
'username': 'admin',
'email': 'admin@example.com',
'created_at': datetime.now().isoformat(),
'status': 'active'
}
# 方式1:一次性设置所有字段
self.r.hset('user:1001', mapping=user)
# 方式2:逐个设置字段
self.r.hset('user:1001', 'last_login', datetime.now().isoformat())
def add_set_data(self):
"""新增集合数据"""
# 存储用户标签
tags = ['python', 'redis', 'database', 'cache']
self.r.sadd('user:1001:tags', *tags)
# 添加额外标签
self.r.sadd('user:1001:tags', 'programming')
def add_sorted_set_data(self):
"""新增有序集合数据"""
# 存储排行榜数据
players = {
'player1': 1500,
'player2': 1200,
'player3': 1800,
'player4': 1100
}
self.r.zadd('game:leaderboard', players)
def add_with_expiration(self):
"""设置过期时间的数据"""
# 验证码,5分钟过期
self.r.setex('sms:code:13800000000', 300, '123456')
# 会话token,1小时过期
self.r.setex('session:token:abc123', 3600, 'user_data')
# 使用pipeline批量操作
pipe = self.r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.expire('key1', 60) # 60秒过期
pipe.expire('key2', 120) # 120秒过期
pipe.execute()
# 使用示例
if __name__ == '__main__':
demo = RedisDemo()
print("=== 新增字符串数据 ===")
demo.add_string_data()
print("=== 新增列表数据 ===")
demo.add_list_data()
print("=== 新增哈希数据 ===")
demo.add_hash_data()
print("=== 新增集合数据 ===")
demo.add_set_data()
print("=== 新增有序集合数据 ===")
demo.add_sorted_set_data()
print("=== 设置过期数据 ===")
demo.add_with_expiration()
print("数据添加完成!")
# 验证数据
r = redis.Redis(decode_responses=True)
print(f"welcome_msg: {r.get('welcome_msg')}")
print(f"user:info: {json.loads(r.get('user:info'))}")
print(f"user:1001: {r.hgetall('user:1001')}")
高级用法
# 使用装饰器处理连接
def redis_connection(func):
def wrapper(*args, **kwargs):
r = redis.Redis(decode_responses=True)
return func(r, *args, **kwargs)
return wrapper
@redis_connection
def add_redis_data(r):
r.set('decorator_test', '使用装饰器添加的数据')
return r.get('decorator_test')
# 使用上下文管理器
with redis.Redis(decode_responses=True) as r:
r.set('context_test', '上下文管理器添加的数据')
注意事项
- 编码问题:设置
decode_responses=True避免字节串问题 - 批量操作:使用 pipeline 提高性能
- 错误处理:添加异常处理
- 密码验证:如果Redis有密码,需要提供密码参数
这样你就掌握了在Python中新增Redis数据的基础操作!