本文目录导读:

我来介绍几种常用的Python查询数据库数据的方法:
MySQL数据库查询
使用mysql-connector-python
import mysql.connector
# 连接数据库
conn = mysql.connector.connect(
host="localhost",
user="your_username",
password="your_password",
database="your_database"
)
cursor = conn.cursor()
# 查询数据
cursor.execute("SELECT * FROM users WHERE age > %s", (18,))
results = cursor.fetchall()
for row in results:
print(f"ID: {row[0]}, Name: {row[1]}, Age: {row[2]}")
# 关闭连接
cursor.close()
conn.close()
使用PyMySQL
import pymysql
# 连接数据库
connection = pymysql.connect(
host='localhost',
user='root',
password='password',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# 查询所有数据
sql = "SELECT id, name, email FROM users"
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}")
# 带条件的查询
sql = "SELECT * FROM users WHERE name LIKE %s"
cursor.execute(sql, ('%张%',))
search_results = cursor.fetchall()
finally:
connection.close()
SQLite数据库查询
import sqlite3
# 连接数据库(如果不存在会自动创建)
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 创建表(如果不存在)
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
email TEXT
)
''')
# 插入示例数据
cursor.execute("INSERT INTO users (name, age, email) VALUES (?, ?, ?)",
('张三', 25, 'zhangsan@example.com'))
conn.commit()
# 查询数据
# 1. 查询所有数据
cursor.execute("SELECT * FROM users")
all_users = cursor.fetchall()
print("所有用户:", all_users)
# 2. 带条件查询
cursor.execute("SELECT * FROM users WHERE age > ?", (20,))
older_users = cursor.fetchall()
print("年龄大于20的用户:", older_users)
# 3. 模糊查询
cursor.execute("SELECT * FROM users WHERE name LIKE ?", ('%张%',))
zhang_users = cursor.fetchall()
# 4. 使用fetchone()
cursor.execute("SELECT * FROM users LIMIT 1")
first_user = cursor.fetchone()
print("第一个用户:", first_user)
conn.close()
PostgreSQL查询
import psycopg2
# 连接数据库
conn = psycopg2.connect(
host="localhost",
database="testdb",
user="postgres",
password="password"
)
cursor = conn.cursor()
# 查询数据
cursor.execute("SELECT id, name, age FROM users WHERE age BETWEEN %s AND %s", (20, 30))
results = cursor.fetchall()
# 使用列名访问
for row in results:
print(f"ID: {row[0]}, Name: {row[1]}, Age: {row[2]}")
# 使用DictCursor获取字典格式结果
from psycopg2.extras import DictCursor
cursor = conn.cursor(cursor_factory=DictCursor)
cursor.execute("SELECT * FROM users")
for row in cursor:
print(f"Name: {row['name']}, Age: {row['age']}")
cursor.close()
conn.close()
高级查询示例
使用上下文管理器(推荐)
import sqlite3
from contextlib import closing
def query_database():
with closing(sqlite3.connect('example.db')) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 多表连接查询
cursor.execute('''
SELECT u.name, o.order_date, o.total
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE o.total > ?
''', (100,))
for row in cursor:
print(f"用户: {row['name']}, 订单日期: {row['order_date']}, 金额: {row['total']}")
使用参数化查询(防止SQL注入)
# 不推荐(容易SQL注入)
cursor.execute("SELECT * FROM users WHERE name = '" + user_input + "'")
# 推荐(参数化查询)
cursor.execute("SELECT * FROM users WHERE name = %s", (user_input,))
# SQLite的占位符是 ?
cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))
查询结果转换为字典
import sqlite3
def query_to_dict(conn, query, params=None):
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
rows = cursor.fetchall()
return [dict(row) for row in rows]
# 使用示例
conn = sqlite3.connect('example.db')
results = query_to_dict(conn, "SELECT * FROM users WHERE age > ?", (20,))
print(results[0]['name']) # 使用列名访问
使用ORM(以SQLAlchemy为例)
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
email = Column(String)
# 创建引擎和会话
engine = create_engine('sqlite:///example.db', echo=True)
Session = sessionmaker(bind=engine)
session = Session()
# 查询数据
# 1. 查询所有用户
users = session.query(User).all()
for user in users:
print(f"Name: {user.name}, Age: {user.age}")
# 2. 带条件的查询
young_users = session.query(User).filter(User.age < 30).all()
# 3. 单个结果
user = session.query(User).filter_by(name='张三').first()
# 4. 排序
ordered_users = session.query(User).order_by(User.age.desc()).all()
session.close()
实用技巧
连接池管理
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'mysql+mysqlconnector://user:password@localhost/dbname',
poolclass=QueuePool,
pool_size=5,
max_overflow=10
)
错误处理
try:
cursor.execute(query, params)
conn.commit()
except Exception as e:
conn.rollback()
print(f"查询失败: {e}")
finally:
cursor.close()
conn.close()
选择哪个方案取决于你的具体需求:
- 小型项目:使用SQLite + 原生SQL
- Web应用:使用SQLAlchemy ORM
- 数据量大:使用连接池 + 参数化查询
- 需要高性能:考虑使用异步数据库驱动
记得在生产环境中总是使用参数化查询来防止SQL注入!