本文目录导读:

Python中优化批量操作的核心思路是:减少I/O次数、利用向量化、并行计算、以及避免内存爆炸,下面我会从几个常见的场景出发,给出具体的优化案例和代码。
批量数据库操作
问题:逐条插入数据,每次都要建立和断开数据库连接。
优化方案:使用 executemany() 批量插入。
import sqlite3
# 假设有大量用户数据
users = [('Alice', 25), ('Bob', 30), ('Charlie', 22), ...] # 上万条
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
# 优化前:逐条插入(非常慢)
for user in users:
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", user)
# 优化后:批量插入
cursor.executemany("INSERT INTO users (name, age) VALUES (?, ?)", users)
conn.commit()
conn.close()
原理:executemany 将多条数据打包成一个请求发送给数据库,大幅减少网络和磁盘I/O。
批量文件读写
问题:逐行写入大量数据,频繁打开/关闭文件。
优化方案:缓冲写入(buffering)或使用 writelines()。
# 优化前:逐行写入(频繁刷新磁盘)
with open('output.txt', 'w') as f:
for line in large_list:
f.write(line + '\n')
# 优化后:批量写入(减少系统调用)
with open('output.txt', 'w') as f:
f.write('\n'.join(large_list))
# 或者使用缓冲区大小控制
with open('output.txt', 'w', buffering=65536) as f: # 64KB缓冲区
for line in large_list:
f.write(line + '\n')
对于读取,使用 readlines() 或迭代器配合缓冲区。
批量API请求
问题:串行调用外部API,等待响应时间长。
优化方案:使用 asyncio + aiohttp 并发请求。
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [f'http://api.example.com/item/{i}' for i in range(1000)]
async with aiohttp.ClientSession() as session:
# 并发请求(控制并发数)
tasks = [asyncio.create_task(fetch(session, url)) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())
原理:I/O密集型任务(网络请求、文件读写)使用异步或线程池,避免CPU空转等待。
批量数据处理(Pandas向量化)
问题:用显式循环处理数据(逐行遍历DataFrame)。
优化方案:使用向量化操作或 apply。
import pandas as pd
import numpy as np
df = pd.DataFrame({'values': np.random.rand(100000)})
# 优化前:逐行循环(极慢)
result = []
for i in range(len(df)):
result.append(np.sin(df.iloc[i, 0]) * 2)
# 优化后:向量化操作(快100倍以上)
result = np.sin(df['values']) * 2
如果必须使用复杂逻辑,优先用 apply(内置C优化)而非显式循环。
批量数据组合(itertools)
问题:生成大量组合或笛卡尔积,内存溢出。
优化方案:使用生成器(itertools.product)逐批处理。
import itertools
list_a = range(1000)
list_b = range(1000)
# 错误做法:一次性生成所有组合(100万个元组,内存爆炸)
# all_combinations = [(x, y) for x in list_a for y in list_b]
# 优化后:使用生成器,逐批处理
for x, y in itertools.product(list_a, list_b):
process(x, y) # 每次只处理一个,内存占用恒定
批量文件处理(多进程/多线程)
问题:处理大量文件,单进程读取CPU利用率低。
优化方案:使用 concurrent.futures 并行处理。
from concurrent.futures import ProcessPoolExecutor
import os
def process_file(filename):
with open(filename, 'r') as f:
data = f.read()
# 处理逻辑
return len(data)
file_list = [f'dir/{f}' for f in os.listdir('dir')]
# 优化后:多进程并行
with ProcessPoolExecutor(max_workers=8) as executor:
results = list(executor.map(process_file, file_list))
注意:CPU密集型用多进程,I/O密集型用多线程或异步。
批量数据写入(内存映射)
问题:处理超大文件(GB级别),普通读写效率低。
优化方案:使用 mmap 内存映射。
import mmap
with open('large_file.bin', 'r+b') as f:
# 将文件映射到内存
with mmap.mmap(f.fileno(), 0) as mmapped_file:
# 直接像操作字节数组一样读写
for i in range(0, len(mmapped_file), 1024):
chunk = mmapped_file[i:i+1024]
process(chunk)
原理:操作系统自动管理磁盘与内存之间的数据交换,避免Python层面频繁读取。
总结优化策略
| 场景 | 核心优化手段 | 建议工具/库 |
|---|---|---|
| 数据库 | 批量插入 | executemany、SQLAlchemy bulk |
| 文件I/O | 缓冲读写、批量写入 | buffering、writelines、mmap |
| 网络请求 | 并发请求 | asyncio、aiohttp、ThreadPoolExecutor |
| 数据计算 | 向量化 | NumPy、Pandas vectorize |
| 迭代组合 | 延迟计算 | itertools、生成器 |
| 文件处理 | 并行处理 | ProcessPoolExecutor、ThreadPoolExecutor |
| 内存限制 | 分批处理、流式处理 | 生成器、chunksize参数 |
最后一条黄金法则:能用内置函数/库的批量操作,就不要自己写循环。 Python的循环本身速度并不慢,但频繁的I/O和函数调用开销才是性能杀手。