本文目录导读:

我来详细介绍Python批量处理文件的几种常见方法和完整案例。
基础方法:os模块 + glob
示例1:批量重命名文件
import os
import glob
def batch_rename(directory, prefix="new_"):
"""
批量重命名指定目录下的所有文件
"""
# 获取所有文件
files = glob.glob(os.path.join(directory, "*"))
for i, file_path in enumerate(files):
if os.path.isfile(file_path):
# 获取原文件名和扩展名
dir_name = os.path.dirname(file_path)
file_name = os.path.basename(file_path)
ext = os.path.splitext(file_name)[1]
# 新文件名
new_name = f"{prefix}{i+1}{ext}"
new_path = os.path.join(dir_name, new_name)
# 重命名
os.rename(file_path, new_path)
print(f"重命名: {file_name} -> {new_name}")
# 使用示例
batch_rename("./my_files", "document_")
示例2:批量处理文本文件
import os
def batch_process_text(input_dir, output_dir, suffix="_processed"):
"""
批量处理文本文件,添加行号
"""
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 遍历所有txt文件
for filename in os.listdir(input_dir):
if filename.endswith(".txt"):
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}{suffix}.txt")
# 处理文件
with open(input_path, 'r', encoding='utf-8') as infile:
lines = infile.readlines()
# 添加行号
processed_lines = [f"{i+1}: {line}" for i, line in enumerate(lines)]
# 写入新文件
with open(output_path, 'w', encoding='utf-8') as outfile:
outfile.writelines(processed_lines)
print(f"处理完成: {filename}")
# 使用示例
batch_process_text("./input", "./output")
使用pathlib模块(Python 3.4+)
示例3:批量复制和重命名图片
from pathlib import Path
import shutil
def batch_copy_images(source_dir, dest_dir, new_format="jpg"):
"""
批量复制并转换图片格式
"""
source_path = Path(source_dir)
dest_path = Path(dest_dir)
dest_path.mkdir(exist_ok=True)
# 支持的文件格式
valid_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.bmp'}
for file in source_path.glob("*"):
if file.suffix.lower() in valid_extensions:
# 新文件名
new_name = f"image_{file.stem}.{new_format}"
new_file = dest_path / new_name
# 复制文件
shutil.copy2(file, new_file)
print(f"复制: {file.name} -> {new_name}")
# 使用示例
batch_copy_images("./images", "./converted_images", "png")
使用os.walk递归处理
示例4:批量统计文件信息
import os
import json
def batch_analyze_files(root_dir):
"""
递归分析目录中的所有文件
"""
file_info = []
for root, dirs, files in os.walk(root_dir):
for file in files:
file_path = os.path.join(root, file)
# 获取文件信息
stats = os.stat(file_path)
file_info.append({
"path": file_path,
"name": file,
"size": stats.st_size,
"modified": stats.st_mtime,
"extension": os.path.splitext(file)[1]
})
# 按文件类型分组统计
type_stats = {}
for info in file_info:
ext = info["extension"] or "no_extension"
if ext not in type_stats:
type_stats[ext] = {"count": 0, "total_size": 0}
type_stats[ext]["count"] += 1
type_stats[ext]["total_size"] += info["size"]
return file_info, type_stats
# 使用示例
files, stats = batch_analyze_files("./project")
print(f"文件总数: {len(files)}")
print(f"文件类型统计: {json.dumps(stats, indent=2, ensure_ascii=False)}")
使用concurrent.futures并行处理
示例5:批量压缩图片(并行处理)
import os
from pathlib import Path
from PIL import Image
from concurrent.futures import ThreadPoolExecutor, as_completed
def compress_image(file_path, output_dir, quality=85):
"""
压缩单个图片
"""
try:
img = Image.open(file_path)
output_path = Path(output_dir) / file_path.name
# 根据格式保存
if file_path.suffix.lower() in ['.jpg', '.jpeg']:
img.save(output_path, "JPEG", quality=quality, optimize=True)
elif file_path.suffix.lower() == '.png':
img.save(output_path, "PNG", optimize=True)
else:
shutil.copy2(file_path, output_path)
original_size = os.path.getsize(file_path)
new_size = os.path.getsize(output_path)
ratio = (1 - new_size / original_size) * 100
return file_path.name, original_size, new_size, ratio
except Exception as e:
return file_path.name, 0, 0, 0, str(e)
def batch_compress_images(input_dir, output_dir, max_workers=4):
"""
批量压缩图片(并行处理)
"""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
# 获取所有图片文件
image_files = list(input_path.glob("*.jpg")) + \
list(input_path.glob("*.jpeg")) + \
list(input_path.glob("*.png"))
# 使用线程池并行处理
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(compress_image, file, output_dir): file
for file in image_files
}
for future in as_completed(futures):
result = future.result()
results.append(result)
if len(result) == 4: # 成功
name, original, new, ratio = result
print(f"{name}: {original/1024:.1f}KB -> {new/1024:.1f}KB (减少{ratio:.1f}%)")
else: # 失败
name, _, _, _, error = result
print(f"{name}: 处理失败 - {error}")
return results
# 使用示例
results = batch_compress_images("./images", "./compressed", max_workers=4)
完整的批量处理框架
示例6:通用的批量处理类
import os
from pathlib import Path
from typing import Callable, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchProcessor:
"""
通用的批量文件处理器
"""
def __init__(self, input_dir: str, output_dir: Optional[str] = None):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir) if output_dir else self.input_dir
self.output_dir.mkdir(exist_ok=True)
def process_files(self,
pattern: str = "*",
handler: Callable = None,
recursive: bool = False,
max_workers: int = 1) -> list:
"""
批量处理文件
Args:
pattern: 文件匹配模式,如 "*.txt", "*.jpg"
handler: 处理函数,接收文件路径参数
recursive: 是否递归子目录
max_workers: 并行工作线程数
"""
if not handler:
raise ValueError("必须提供处理函数")
# 获取匹配的文件列表
if recursive:
files = list(self.input_dir.rglob(pattern))
else:
files = list(self.input_dir.glob(pattern))
print(f"找到 {len(files)} 个匹配文件")
results = []
# 选择处理方式
if max_workers > 1:
# 并行处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(handler, f): f for f in files}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"处理出错 {futures[future]}: {e}")
else:
# 串行处理
for file in files:
try:
result = handler(file)
results.append(result)
print(f"处理完成: {file.name}")
except Exception as e:
print(f"处理出错 {file.name}: {e}")
return results
@staticmethod
def copy_handler(file_path: Path, dest_dir: Path):
"""复制文件处理函数示例"""
import shutil
dest_file = dest_dir / file_path.name
shutil.copy2(file_path, dest_file)
return f"复制: {file_path.name} -> {dest_file}"
@staticmethod
def rename_handler(file_path: Path, prefix: str = ""):
"""重命名处理函数示例"""
new_name = f"{prefix}{file_path.stem}{file_path.suffix}"
new_path = file_path.parent / new_name
file_path.rename(new_path)
return f"重命名: {file_path.name} -> {new_name}"
# 使用示例
def main():
# 创建处理器
processor = BatchProcessor("./data", "./output")
# 示例1:批量复制txt文件
def copy_txt(file_path):
return BatchProcessor.copy_handler(file_path, processor.output_dir)
results = processor.process_files(
pattern="*.txt",
handler=copy_txt,
recursive=True,
max_workers=4
)
# 示例2:自定义处理函数
def process_csv(file_path):
import pandas as pd
df = pd.read_csv(file_path)
# 这里可以进行各种数据处理
output_path = processor.output_dir / f"processed_{file_path.name}"
df.to_csv(output_path, index=False)
return f"处理CSV: {file_path.name}"
csv_results = processor.process_files(
pattern="*.csv",
handler=process_csv,
max_workers=2
)
if __name__ == "__main__":
main()
实用技巧和注意事项
文件过滤和小批量测试
# 先处理少量文件测试
def safe_batch_process(input_dir, batch_size=10):
files = list(Path(input_dir).glob("*.txt"))
# 先测试处理前几个文件
test_files = files[:batch_size]
for file in test_files:
# 处理逻辑
pass
# 确认没问题后再处理全部
confirm = input(f"测试处理{batch_size}个文件成功,是否继续处理全部?(y/n): ")
if confirm.lower() == 'y':
for file in files[batch_size:]:
# 处理剩余文件
pass
错误处理和日志记录
import logging
from datetime import datetime
def setup_logging():
"""设置日志系统"""
log_file = f"batch_process_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
def process_with_logging(file_path, logger):
try:
# 处理逻辑
logger.info(f"成功处理: {file_path}")
return True
except Exception as e:
logger.error(f"处理失败 {file_path}: {str(e)}")
return False
这些案例涵盖了从基础到高级的批量文件处理方法,你可以根据具体需求选择合适的方案,建议在实际使用前先在测试数据上试运行,确保逻辑正确。