Python案例怎么新增Redis数据?

wen python案例 9

本文目录导读:

Python案例怎么新增Redis数据?

  1. 基础安装和连接
  2. 新增不同类型的数据
  3. 完整示例程序
  4. 高级用法
  5. 注意事项

在Python中操作Redis数据库有多种方式,我来介绍几种常见的新增数据方法:

基础安装和连接

# 首先安装redis库
# pip install redis
import redis
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 或者使用连接池
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)

新增不同类型的数据

字符串(String)

# 新增字符串
r.set('name', '张三')
r.set('age', 25)
# 批量新增
r.mset({'city': '北京', 'job': '工程师'})
# 设置过期时间(单位:秒)
r.setex('temp', 3600, '临时数据')  # 1小时后过期
r.setnx('unique_key', '值')  # 仅当key不存在时设置

列表(List)

# 从左侧添加
r.lpush('mylist', '元素1', '元素2', '元素3')
# 从右侧添加
r.rpush('mylist2', 'a', 'b', 'c')
# 插入到指定位置
r.linsert('mylist', 'BEFORE', '元素2', '新元素')

哈希(Hash)

# 新增单个字段
r.hset('user:1', 'username', '李四')
r.hset('user:1', 'email', 'lisi@example.com')
# 新增多个字段
user_data = {
    'name': '王五',
    'age': 30,
    'city': '上海'
}
r.hset('user:2', mapping=user_data)

集合(Set)

# 新增集合元素
r.sadd('myset', '元素A', '元素B', '元素C')
r.sadd('myset', '元素D')

有序集合(Sorted Set)

# 新增有序集合元素(带分数)
r.zadd('scores', {'张三': 95, '李四': 88, '王五': 92})
# 添加单个元素
r.zadd('scores', {'赵六': 78})

完整示例程序

import redis
import json
from datetime import datetime
class RedisDemo:
    def __init__(self):
        self.r = redis.Redis(
            host='localhost',
            port=6379,
            db=0,
            decode_responses=True,
            password='your_password'  # 如果有密码
        )
    def add_string_data(self):
        """新增字符串类型数据"""
        # 基本字符串
        self.r.set('welcome_msg', 'Hello, Redis!')
        # 存储JSON对象
        user_info = {
            'name': '小明',
            'age': 28,
            'hobbies': ['读书', '编程', '运动']
        }
        self.r.set('user:info', json.dumps(user_info))
        # 计数器
        self.r.set('page_views', 0)
        self.r.incr('page_views')  # 自增1
    def add_list_data(self):
        """新增列表数据"""
        # 存储用户操作日志
        actions = ['login', 'search', 'view_product', 'add_to_cart']
        for action in actions:
            self.r.rpush('user:actions', action)
        # 获取列表长度
        length = self.r.llen('user:actions')
        print(f"列表长度: {length}")
    def add_hash_data(self):
        """新增哈希数据"""
        # 存储用户详细信息
        user = {
            'id': '1001',
            'username': 'admin',
            'email': 'admin@example.com',
            'created_at': datetime.now().isoformat(),
            'status': 'active'
        }
        # 方式1:一次性设置所有字段
        self.r.hset('user:1001', mapping=user)
        # 方式2:逐个设置字段
        self.r.hset('user:1001', 'last_login', datetime.now().isoformat())
    def add_set_data(self):
        """新增集合数据"""
        # 存储用户标签
        tags = ['python', 'redis', 'database', 'cache']
        self.r.sadd('user:1001:tags', *tags)
        # 添加额外标签
        self.r.sadd('user:1001:tags', 'programming')
    def add_sorted_set_data(self):
        """新增有序集合数据"""
        # 存储排行榜数据
        players = {
            'player1': 1500,
            'player2': 1200,
            'player3': 1800,
            'player4': 1100
        }
        self.r.zadd('game:leaderboard', players)
    def add_with_expiration(self):
        """设置过期时间的数据"""
        # 验证码,5分钟过期
        self.r.setex('sms:code:13800000000', 300, '123456')
        # 会话token,1小时过期
        self.r.setex('session:token:abc123', 3600, 'user_data')
        # 使用pipeline批量操作
        pipe = self.r.pipeline()
        pipe.set('key1', 'value1')
        pipe.set('key2', 'value2')
        pipe.expire('key1', 60)  # 60秒过期
        pipe.expire('key2', 120)  # 120秒过期
        pipe.execute()
# 使用示例
if __name__ == '__main__':
    demo = RedisDemo()
    print("=== 新增字符串数据 ===")
    demo.add_string_data()
    print("=== 新增列表数据 ===")
    demo.add_list_data()
    print("=== 新增哈希数据 ===")
    demo.add_hash_data()
    print("=== 新增集合数据 ===")
    demo.add_set_data()
    print("=== 新增有序集合数据 ===")
    demo.add_sorted_set_data()
    print("=== 设置过期数据 ===")
    demo.add_with_expiration()
    print("数据添加完成!")
    # 验证数据
    r = redis.Redis(decode_responses=True)
    print(f"welcome_msg: {r.get('welcome_msg')}")
    print(f"user:info: {json.loads(r.get('user:info'))}")
    print(f"user:1001: {r.hgetall('user:1001')}")

高级用法

# 使用装饰器处理连接
def redis_connection(func):
    def wrapper(*args, **kwargs):
        r = redis.Redis(decode_responses=True)
        return func(r, *args, **kwargs)
    return wrapper
@redis_connection
def add_redis_data(r):
    r.set('decorator_test', '使用装饰器添加的数据')
    return r.get('decorator_test')
# 使用上下文管理器
with redis.Redis(decode_responses=True) as r:
    r.set('context_test', '上下文管理器添加的数据')

注意事项

  1. 编码问题:设置 decode_responses=True 避免字节串问题
  2. 批量操作:使用 pipeline 提高性能
  3. 错误处理:添加异常处理
  4. 密码验证:如果Redis有密码,需要提供密码参数

这样你就掌握了在Python中新增Redis数据的基础操作!

抱歉,评论功能暂时关闭!