本文目录导读:

优化Python中的查询语句通常指数据库查询(如SQL)或数据框查询(如Pandas),我会分别给出针对不同场景的优化策略和具体案例。
SQL查询优化(以SQLite/MySQL为例)
使用索引(最有效)
# 优化前:全表扫描
cursor.execute("SELECT * FROM users WHERE age > 30")
# 优化后:创建索引
cursor.execute("CREATE INDEX idx_age ON users(age)")
cursor.execute("SELECT * FROM users WHERE age > 30")
避免SELECT *,只取所需列
# 优化前
cursor.execute("SELECT * FROM orders WHERE user_id = 100")
# 优化后
cursor.execute("SELECT id, amount, status FROM orders WHERE user_id = 100")
使用批量操作代替循环
# 优化前:N次查询
user_ids = [101, 102, 103, 104, 105]
for uid in user_ids:
cursor.execute("SELECT * FROM users WHERE id = ?", (uid,))
rows = cursor.fetchall()
# 处理数据...
# 优化后:1次查询
placeholders = ','.join('?' * len(user_ids))
cursor.execute(f"SELECT * FROM users WHERE id IN ({placeholders})", user_ids)
rows = cursor.fetchall()
使用连接(JOIN)代替子查询
# 优化前:子查询
cursor.execute("""
SELECT * FROM orders
WHERE user_id IN (SELECT id FROM users WHERE age > 25)
""")
# 优化后:JOIN
cursor.execute("""
SELECT o.* FROM orders o
JOIN users u ON o.user_id = u.id
WHERE u.age > 25
""")
使用LIMIT分页(大数据量时)
# 优化前:一次查询全部
cursor.execute("SELECT * FROM logs")
# 优化后:分页查询
page_size = 1000
for offset in range(0, total_count, page_size):
cursor.execute("SELECT * FROM logs LIMIT ? OFFSET ?", (page_size, offset))
batch = cursor.fetchall()
process_batch(batch)
Pandas查询优化
使用向量化操作代替循环
import pandas as pd
import numpy as np
df = pd.DataFrame({'A': range(100000), 'B': range(100000, 200000)})
# 优化前:for循环
result = []
for i in range(len(df)):
result.append(df.iloc[i]['A'] + df.iloc[i]['B'])
df['C'] = result
# 优化后:向量化操作
df['C'] = df['A'] + df['B']
使用query()方法
# 优化前
filtered = df[(df['age'] > 30) & (df['salary'] > 50000)]
# 优化后:更清晰,有时更快
filtered = df.query('age > 30 and salary > 50000')
使用布尔索引
# 优化前 result = df[df['status'].isin(['active', 'pending'])] # 优化后:使用loc+布尔索引 mask = df['status'].isin(['active', 'pending']) result = df.loc[mask]
使用分类数据类型
# 优化前
df['category'] = df['category'].astype(str)
# 优化后:用category类型,查询更快,内存更少
df['category'] = df['category'].astype('category')
result = df[df['category'] == 'A']
使用eval()进行复杂计算
# 优化前
df['result'] = df['A'] * df['B'] + df['C'] / df['D']
# 优化后:使用eval(),大数据量时显著提升
df['result'] = df.eval('A * B + C / D')
ORM查询优化(以SQLAlchemy为例)
使用懒加载与急加载
from sqlalchemy.orm import joinedload # 优化前:懒加载(N+1问题) user = session.query(User).first() orders = user.orders # 触发新查询 # 优化后:急加载(1次查询) user = session.query(User).options(joinedload(User.orders)).first() orders = user.orders # 不会触发新查询
使用批量查询
# 优化前
for user_id in [1, 2, 3, 4, 5]:
user = session.query(User).get(user_id)
process(user)
# 优化后
users = session.query(User).filter(User.id.in_([1, 2, 3, 4, 5])).all()
for user in users:
process(user)
使用子查询过滤
# 优化前:先全部加载再过滤 orders = session.query(Order).all() active_orders = [o for o in orders if o.status == 'active'] # 优化后:数据库层面过滤 active_orders = session.query(Order).filter(Order.status == 'active').all()
实际综合案例
假设我们有一个电商系统,需要查询“过去7天购买金额超过1000元的活跃用户”:
优化前(性能差):
def get_high_value_users():
cursor.execute("SELECT * FROM users WHERE status = 'active'")
all_users = cursor.fetchall()
result = []
for user in all_users:
cursor.execute("""
SELECT SUM(amount) FROM orders
WHERE user_id = ? AND order_date >= date('now', '-7 days')
""", (user['id'],))
total = cursor.fetchone()[0]
if total and total > 1000:
result.append(user)
return result
优化后(高效):
def get_high_value_users():
# 一次性查询,使用JOIN、聚合、索引
cursor.execute("""
SELECT u.*, COALESCE(SUM(o.amount), 0) as total_amount
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
AND o.order_date >= date('now', '-7 days')
WHERE u.status = 'active'
GROUP BY u.id
HAVING total_amount > 1000
""")
return cursor.fetchall()
优化通用原则
| 原则 | 说明 |
|---|---|
| 减少数据量 | 只取需要的列和行,使用LIMIT |
| 减少查询次数 | 批量操作、JOIN代替子查询 |
| 使用索引 | 对WHERE、JOIN、ORDER BY列建索引 |
| 利用数据库能力 | 让数据库做聚合、过滤,而非Python |
| 使用缓存 | 对不常变的数据使用Redis等缓存 |
| 分析执行计划 | 使用EXPLAIN分析查询瓶颈 |