本文目录导读:

我来详细介绍Python连接数据库的几种常见方法。
连接MySQL数据库
安装依赖
pip install pymysql # 或者 pip install mysql-connector-python
使用PyMySQL连接
import pymysql
# 连接数据库
connection = pymysql.connect(
host='localhost', # 数据库主机地址
port=3306, # MySQL端口号
user='root', # 用户名
password='123456', # 密码
database='test_db', # 数据库名称
charset='utf8mb4' # 字符集
)
try:
# 创建游标对象
with connection.cursor() as cursor:
# 执行SQL查询
sql = "SELECT * FROM users WHERE age > %s"
cursor.execute(sql, (20,))
# 获取结果
results = cursor.fetchall()
for row in results:
print(f"ID: {row[0]}, Name: {row[1]}, Age: {row[2]}")
# 提交事务
connection.commit()
finally:
# 关闭连接
connection.close()
使用上下文管理器(推荐方式)
from contextlib import closing
import pymysql
def query_database():
"""使用上下文管理器自动管理连接"""
with closing(pymysql.connect(
host='localhost',
user='root',
password='123456',
database='test_db'
)) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM users")
return cursor.fetchall()
# 使用
users = query_database()
print(users)
连接PostgreSQL数据库
安装依赖
pip install psycopg2
示例代码
import psycopg2
# 连接数据库
conn = psycopg2.connect(
host="localhost",
port="5432",
database="test_db",
user="postgres",
password="123456"
)
# 创建游标
cur = conn.cursor()
# 执行查询
cur.execute("SELECT * FROM employees WHERE salary > %s", (50000,))
# 获取数据
rows = cur.fetchall()
for row in rows:
print(row)
# 关闭连接
cur.close()
conn.close()
连接SQLite数据库(内置模块)
import sqlite3
# 连接数据库(如果不存在会自动创建)
conn = sqlite3.connect('example.db')
# 创建游标
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
age INTEGER
)
''')
# 插入数据
cursor.execute("INSERT INTO students (name, age) VALUES (?, ?)",
("张三", 20))
# 查询数据
cursor.execute("SELECT * FROM students")
rows = cursor.fetchall()
for row in rows:
print(row)
# 提交事务
conn.commit()
# 关闭连接
conn.close()
使用ORM框架(SQLAlchemy)
安装依赖
pip install sqlalchemy
示例代码
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 创建基类
Base = declarative_base()
# 定义模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
def __repr__(self):
return f"<User(name={self.name}, age={self.age})>"
# 创建数据库引擎
# MySQL: engine = create_engine('mysql+pymysql://user:password@localhost/dbname')
# PostgreSQL: engine = create_engine('postgresql://user:password@localhost/dbname')
# SQLite: engine = create_engine('sqlite:///example.db')
engine = create_engine('sqlite:///example.db')
# 创建表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 添加数据
new_user = User(name='李四', age=25)
session.add(new_user)
session.commit()
# 查询数据
users = session.query(User).filter(User.age > 20).all()
for user in users:
print(user)
# 关闭会话
session.close()
数据库连接池(提高性能)
from dbutils.pooled_db import PooledDB
import pymysql
# 创建连接池
pool = PooledDB(
creator=pymysql,
maxconnections=6, # 最大连接数
mincached=2, # 初始化时创建的最小连接数
maxcached=5, # 最大空闲连接数
maxusage=None, # 单个连接的最大复用次数
blocking=True, # 无可用连接时是否等待
host='localhost',
user='root',
password='123456',
database='test_db',
charset='utf8mb4'
)
# 从连接池获取连接
def get_user_by_id(user_id):
conn = pool.connection()
try:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return cursor.fetchone()
finally:
conn.close() # 不是真正关闭,而是返回到连接池
# 使用
user = get_user_by_id(1)
print(user)
完整的数据库操作封装
import pymysql
from typing import List, Tuple, Optional
class DatabaseManager:
"""数据库操作管理器"""
def __init__(self, config: dict):
self.config = config
self.connection = None
def connect(self) -> None:
"""建立数据库连接"""
self.connection = pymysql.connect(**self.config)
def disconnect(self) -> None:
"""关闭数据库连接"""
if self.connection:
self.connection.close()
def execute_query(self, sql: str, params: tuple = None) -> List[Tuple]:
"""执行查询操作"""
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
return cursor.fetchall()
def execute_update(self, sql: str, params: tuple = None) -> int:
"""执行更新操作"""
with self.connection.cursor() as cursor:
affected_rows = cursor.execute(sql, params)
self.connection.commit()
return affected_rows
def insert_data(self, table: str, data: dict) -> int:
"""插入数据"""
columns = ', '.join(data.keys())
values = ', '.join(['%s'] * len(data))
sql = f"INSERT INTO {table} ({columns}) VALUES ({values})"
with self.connection.cursor() as cursor:
cursor.execute(sql, tuple(data.values()))
self.connection.commit()
return cursor.lastrowid
# 使用示例
config = {
'host': 'localhost',
'user': 'root',
'password': '123456',
'database': 'test_db',
'charset': 'utf8mb4'
}
db = DatabaseManager(config)
db.connect()
# 查询数据
users = db.execute_query("SELECT * FROM users WHERE active = %s", (1,))
# 插入数据
new_id = db.insert_data('users', {
'name': '王五',
'age': 28,
'email': 'wangwu@example.com'
})
db.disconnect()
注意事项
- 敏感信息管理:不要硬编码数据库密码,使用环境变量或配置文件
- 错误处理:始终使用 try-except 处理数据库异常
- 资源释放:确保连接和游标被正确关闭
- SQL注入防护:使用参数化查询而不是字符串拼接
- 连接池:生产环境建议使用连接池提高性能
最佳实践
# 使用环境变量
import os
from dotenv import load_dotenv
load_dotenv()
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'database': os.getenv('DB_NAME'),
'charset': 'utf8mb4'
}
就是Python连接数据库的完整案例,包括了常见数据库类型和最佳实践。