本文目录导读:

我来详细介绍Python连接和操作数据库的常见方法,主要针对MySQL和SQLite两种常用数据库。
连接MySQL数据库
安装依赖库
pip install pymysql # 或者 pip install mysql-connector-python
使用PyMySQL连接MySQL
import pymysql
# 连接数据库
conn = pymysql.connect(
host='localhost', # 数据库地址
user='root', # 用户名
password='password', # 密码
database='test_db', # 数据库名
charset='utf8mb4', # 字符集
port=3306 # 端口号
)
# 创建游标对象
cursor = conn.cursor()
# 创建表
create_table_sql = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
age INT,
email VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_table_sql)
# 插入数据
insert_sql = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
user_data = [
('张三', 25, 'zhangsan@example.com'),
('李四', 30, 'lisi@example.com'),
('王五', 28, 'wangwu@example.com')
]
cursor.executemany(insert_sql, user_data)
conn.commit() # 提交事务
# 查询数据
select_sql = "SELECT * FROM users WHERE age > %s"
cursor.execute(select_sql, (25,))
results = cursor.fetchall()
print("查询结果:")
for row in results:
print(f"ID: {row[0]}, 姓名: {row[1]}, 年龄: {row[2]}, 邮箱: {row[3]}")
# 更新数据
update_sql = "UPDATE users SET age = %s WHERE name = %s"
cursor.execute(update_sql, (26, '张三'))
conn.commit()
# 删除数据
delete_sql = "DELETE FROM users WHERE name = %s"
cursor.execute(delete_sql, ('王五',))
conn.commit()
# 关闭连接
cursor.close()
conn.close()
使用SQLite(Python内置数据库)
SQLite是Python自带的轻量级数据库,无需额外安装。
import sqlite3
# 连接数据库(如果不存在会自动创建)
conn = sqlite3.connect('example.db')
# 创建游标
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
score REAL,
class_name TEXT
)
''')
# 插入数据
students_data = [
('小明', 95.5, '一班'),
('小红', 88.0, '二班'),
('小刚', 92.5, '一班'),
('小丽', 78.5, '三班')
]
cursor.executemany(
'INSERT INTO students (name, score, class_name) VALUES (?, ?, ?)',
students_data
)
conn.commit()
# 查询数据 - 带条件
cursor.execute(
'SELECT * FROM students WHERE score >= ? ORDER BY score DESC',
(90.0,)
)
high_score_students = cursor.fetchall()
print("高分段学生(>=90分):")
for student in high_score_students:
print(f"ID: {student[0]}, 姓名: {student[1]}, 成绩: {student[2]}, 班级: {student[3]}")
# 聚合查询
cursor.execute('''
SELECT class_name, AVG(score) as avg_score, COUNT(*) as student_count
FROM students
GROUP BY class_name
''')
class_stats = cursor.fetchall()
print("\n班级统计:")
for stat in class_stats:
print(f"班级: {stat[0]}, 平均分: {stat[1]:.2f}, 学生数: {stat[2]}")
# 关闭连接
conn.close()
使用上下文管理器(推荐方式)
使用with语句可以自动管理数据库连接,避免忘记关闭连接。
import pymysql
# 使用上下文管理器
try:
with pymysql.connect(
host='localhost',
user='root',
password='password',
database='test_db',
charset='utf8mb4'
) as conn:
with conn.cursor() as cursor:
# 执行SQL操作
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
# 注意:读取操作不需要commit
# 但写入操作需要commit
# conn.commit() # 如果需要写入操作
for row in results:
print(row)
except pymysql.Error as e:
print(f"数据库错误: {e}")
使用ORM框架(SQLAlchemy)
对于复杂项目,推荐使用ORM框架。
安装
pip install sqlalchemy
示例代码
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 创建基类
Base = declarative_base()
# 定义模型
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False)
price = Column(Float)
stock = Column(Integer)
# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost:3306/test_db')
# 创建所有表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 插入数据
new_product = Product(name='笔记本电脑', price=5999.99, stock=100)
session.add(new_product)
session.commit()
# 查询数据
products = session.query(Product).filter(Product.price > 5000).all()
for product in products:
print(f"产品: {product.name}, 价格: {product.price}, 库存: {product.stock}")
# 更新数据
product_to_update = session.query(Product).filter_by(name='笔记本电脑').first()
if product_to_update:
product_to_update.price = 5499.99
session.commit()
# 关闭会话
session.close()
数据库连接池
对于高并发应用,使用连接池提高性能:
from dbutils.pooled_db import PooledDB
import pymysql
# 创建连接池
pool = PooledDB(
creator=pymysql,
maxconnections=10, # 最大连接数
mincached=2, # 最小空闲连接数
maxcached=5, # 最大空闲连接数
host='localhost',
user='root',
password='password',
database='test_db',
charset='utf8mb4'
)
# 从连接池获取连接
conn = pool.connection()
cursor = conn.cursor()
# 执行操作
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
# 归还连接到连接池
cursor.close()
conn.close()
| 数据库类型 | 适用场景 | 推荐库 |
|---|---|---|
| SQLite | 本地小应用、嵌入式 | sqlite3(内置) |
| MySQL | Web应用、企业级 | pymysql |
| PostgreSQL | 复杂查询、金融系统 | psycopg2 |
| ORM框架 | 大型项目、快速开发 | SQLAlchemy |
最佳实践建议:
- 使用参数化查询防止SQL注入
- 使用上下文管理器管理连接
- 适当使用事务处理
- 根据场景选择合适的数据库和库
- 生产环境考虑使用连接池