Python案例如何优化批量操作?

wen python案例 57

本文目录导读:

Python案例如何优化批量操作?

  1. 批量数据库操作
  2. 批量文件读写
  3. 批量API请求
  4. 批量数据处理(Pandas向量化)
  5. 批量数据组合(itertools)
  6. 批量文件处理(多进程/多线程)
  7. 批量数据写入(内存映射)
  8. 总结优化策略

Python中优化批量操作的核心思路是:减少I/O次数、利用向量化、并行计算、以及避免内存爆炸,下面我会从几个常见的场景出发,给出具体的优化案例和代码。


批量数据库操作

问题:逐条插入数据,每次都要建立和断开数据库连接。

优化方案:使用 executemany() 批量插入。

import sqlite3
# 假设有大量用户数据
users = [('Alice', 25), ('Bob', 30), ('Charlie', 22), ...]  # 上万条
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
# 优化前:逐条插入(非常慢)
for user in users:
    cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", user)
# 优化后:批量插入
cursor.executemany("INSERT INTO users (name, age) VALUES (?, ?)", users)
conn.commit()
conn.close()

原理executemany 将多条数据打包成一个请求发送给数据库,大幅减少网络和磁盘I/O。


批量文件读写

问题:逐行写入大量数据,频繁打开/关闭文件。

优化方案:缓冲写入(buffering)或使用 writelines()

# 优化前:逐行写入(频繁刷新磁盘)
with open('output.txt', 'w') as f:
    for line in large_list:
        f.write(line + '\n')
# 优化后:批量写入(减少系统调用)
with open('output.txt', 'w') as f:
    f.write('\n'.join(large_list))
# 或者使用缓冲区大小控制
with open('output.txt', 'w', buffering=65536) as f:  # 64KB缓冲区
    for line in large_list:
        f.write(line + '\n')

对于读取,使用 readlines() 或迭代器配合缓冲区。


批量API请求

问题:串行调用外部API,等待响应时间长。

优化方案:使用 asyncio + aiohttp 并发请求。

import asyncio
import aiohttp
async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()
async def main():
    urls = [f'http://api.example.com/item/{i}' for i in range(1000)]
    async with aiohttp.ClientSession() as session:
        # 并发请求(控制并发数)
        tasks = [asyncio.create_task(fetch(session, url)) for url in urls]
        results = await asyncio.gather(*tasks)
asyncio.run(main())

原理:I/O密集型任务(网络请求、文件读写)使用异步或线程池,避免CPU空转等待。


批量数据处理(Pandas向量化)

问题:用显式循环处理数据(逐行遍历DataFrame)。

优化方案:使用向量化操作或 apply

import pandas as pd
import numpy as np
df = pd.DataFrame({'values': np.random.rand(100000)})
# 优化前:逐行循环(极慢)
result = []
for i in range(len(df)):
    result.append(np.sin(df.iloc[i, 0]) * 2)
# 优化后:向量化操作(快100倍以上)
result = np.sin(df['values']) * 2

如果必须使用复杂逻辑,优先用 apply(内置C优化)而非显式循环。


批量数据组合(itertools)

问题:生成大量组合或笛卡尔积,内存溢出。

优化方案:使用生成器(itertools.product)逐批处理。

import itertools
list_a = range(1000)
list_b = range(1000)
# 错误做法:一次性生成所有组合(100万个元组,内存爆炸)
# all_combinations = [(x, y) for x in list_a for y in list_b]
# 优化后:使用生成器,逐批处理
for x, y in itertools.product(list_a, list_b):
    process(x, y)  # 每次只处理一个,内存占用恒定

批量文件处理(多进程/多线程)

问题:处理大量文件,单进程读取CPU利用率低。

优化方案:使用 concurrent.futures 并行处理。

from concurrent.futures import ProcessPoolExecutor
import os
def process_file(filename):
    with open(filename, 'r') as f:
        data = f.read()
    # 处理逻辑
    return len(data)
file_list = [f'dir/{f}' for f in os.listdir('dir')]
# 优化后:多进程并行
with ProcessPoolExecutor(max_workers=8) as executor:
    results = list(executor.map(process_file, file_list))

注意:CPU密集型用多进程,I/O密集型用多线程或异步。


批量数据写入(内存映射)

问题:处理超大文件(GB级别),普通读写效率低。

优化方案:使用 mmap 内存映射。

import mmap
with open('large_file.bin', 'r+b') as f:
    # 将文件映射到内存
    with mmap.mmap(f.fileno(), 0) as mmapped_file:
        # 直接像操作字节数组一样读写
        for i in range(0, len(mmapped_file), 1024):
            chunk = mmapped_file[i:i+1024]
            process(chunk)

原理:操作系统自动管理磁盘与内存之间的数据交换,避免Python层面频繁读取。


总结优化策略

场景 核心优化手段 建议工具/库
数据库 批量插入 executemany、SQLAlchemy bulk
文件I/O 缓冲读写、批量写入 bufferingwritelinesmmap
网络请求 并发请求 asyncioaiohttpThreadPoolExecutor
数据计算 向量化 NumPy、Pandas vectorize
迭代组合 延迟计算 itertools、生成器
文件处理 并行处理 ProcessPoolExecutorThreadPoolExecutor
内存限制 分批处理、流式处理 生成器、chunksize参数

最后一条黄金法则能用内置函数/库的批量操作,就不要自己写循环。 Python的循环本身速度并不慢,但频繁的I/O和函数调用开销才是性能杀手。

抱歉,评论功能暂时关闭!