Python案例如何批量处理文件?

wen python案例 62

本文目录导读:

Python案例如何批量处理文件?

  1. 基础方法:os模块 + glob
  2. 使用pathlib模块(Python 3.4+)
  3. 使用os.walk递归处理
  4. 使用concurrent.futures并行处理
  5. 完整的批量处理框架
  6. 实用技巧和注意事项

我来详细介绍Python批量处理文件的几种常见方法和完整案例。

基础方法:os模块 + glob

示例1:批量重命名文件

import os
import glob
def batch_rename(directory, prefix="new_"):
    """
    批量重命名指定目录下的所有文件
    """
    # 获取所有文件
    files = glob.glob(os.path.join(directory, "*"))
    for i, file_path in enumerate(files):
        if os.path.isfile(file_path):
            # 获取原文件名和扩展名
            dir_name = os.path.dirname(file_path)
            file_name = os.path.basename(file_path)
            ext = os.path.splitext(file_name)[1]
            # 新文件名
            new_name = f"{prefix}{i+1}{ext}"
            new_path = os.path.join(dir_name, new_name)
            # 重命名
            os.rename(file_path, new_path)
            print(f"重命名: {file_name} -> {new_name}")
# 使用示例
batch_rename("./my_files", "document_")

示例2:批量处理文本文件

import os
def batch_process_text(input_dir, output_dir, suffix="_processed"):
    """
    批量处理文本文件,添加行号
    """
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    # 遍历所有txt文件
    for filename in os.listdir(input_dir):
        if filename.endswith(".txt"):
            input_path = os.path.join(input_dir, filename)
            output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}{suffix}.txt")
            # 处理文件
            with open(input_path, 'r', encoding='utf-8') as infile:
                lines = infile.readlines()
            # 添加行号
            processed_lines = [f"{i+1}: {line}" for i, line in enumerate(lines)]
            # 写入新文件
            with open(output_path, 'w', encoding='utf-8') as outfile:
                outfile.writelines(processed_lines)
            print(f"处理完成: {filename}")
# 使用示例
batch_process_text("./input", "./output")

使用pathlib模块(Python 3.4+)

示例3:批量复制和重命名图片

from pathlib import Path
import shutil
def batch_copy_images(source_dir, dest_dir, new_format="jpg"):
    """
    批量复制并转换图片格式
    """
    source_path = Path(source_dir)
    dest_path = Path(dest_dir)
    dest_path.mkdir(exist_ok=True)
    # 支持的文件格式
    valid_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.bmp'}
    for file in source_path.glob("*"):
        if file.suffix.lower() in valid_extensions:
            # 新文件名
            new_name = f"image_{file.stem}.{new_format}"
            new_file = dest_path / new_name
            # 复制文件
            shutil.copy2(file, new_file)
            print(f"复制: {file.name} -> {new_name}")
# 使用示例
batch_copy_images("./images", "./converted_images", "png")

使用os.walk递归处理

示例4:批量统计文件信息

import os
import json
def batch_analyze_files(root_dir):
    """
    递归分析目录中的所有文件
    """
    file_info = []
    for root, dirs, files in os.walk(root_dir):
        for file in files:
            file_path = os.path.join(root, file)
            # 获取文件信息
            stats = os.stat(file_path)
            file_info.append({
                "path": file_path,
                "name": file,
                "size": stats.st_size,
                "modified": stats.st_mtime,
                "extension": os.path.splitext(file)[1]
            })
    # 按文件类型分组统计
    type_stats = {}
    for info in file_info:
        ext = info["extension"] or "no_extension"
        if ext not in type_stats:
            type_stats[ext] = {"count": 0, "total_size": 0}
        type_stats[ext]["count"] += 1
        type_stats[ext]["total_size"] += info["size"]
    return file_info, type_stats
# 使用示例
files, stats = batch_analyze_files("./project")
print(f"文件总数: {len(files)}")
print(f"文件类型统计: {json.dumps(stats, indent=2, ensure_ascii=False)}")

使用concurrent.futures并行处理

示例5:批量压缩图片(并行处理)

import os
from pathlib import Path
from PIL import Image
from concurrent.futures import ThreadPoolExecutor, as_completed
def compress_image(file_path, output_dir, quality=85):
    """
    压缩单个图片
    """
    try:
        img = Image.open(file_path)
        output_path = Path(output_dir) / file_path.name
        # 根据格式保存
        if file_path.suffix.lower() in ['.jpg', '.jpeg']:
            img.save(output_path, "JPEG", quality=quality, optimize=True)
        elif file_path.suffix.lower() == '.png':
            img.save(output_path, "PNG", optimize=True)
        else:
            shutil.copy2(file_path, output_path)
        original_size = os.path.getsize(file_path)
        new_size = os.path.getsize(output_path)
        ratio = (1 - new_size / original_size) * 100
        return file_path.name, original_size, new_size, ratio
    except Exception as e:
        return file_path.name, 0, 0, 0, str(e)
def batch_compress_images(input_dir, output_dir, max_workers=4):
    """
    批量压缩图片(并行处理)
    """
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)
    # 获取所有图片文件
    image_files = list(input_path.glob("*.jpg")) + \
                  list(input_path.glob("*.jpeg")) + \
                  list(input_path.glob("*.png"))
    # 使用线程池并行处理
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(compress_image, file, output_dir): file
            for file in image_files
        }
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
            if len(result) == 4:  # 成功
                name, original, new, ratio = result
                print(f"{name}: {original/1024:.1f}KB -> {new/1024:.1f}KB (减少{ratio:.1f}%)")
            else:  # 失败
                name, _, _, _, error = result
                print(f"{name}: 处理失败 - {error}")
    return results
# 使用示例
results = batch_compress_images("./images", "./compressed", max_workers=4)

完整的批量处理框架

示例6:通用的批量处理类

import os
from pathlib import Path
from typing import Callable, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchProcessor:
    """
    通用的批量文件处理器
    """
    def __init__(self, input_dir: str, output_dir: Optional[str] = None):
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir) if output_dir else self.input_dir
        self.output_dir.mkdir(exist_ok=True)
    def process_files(self, 
                     pattern: str = "*", 
                     handler: Callable = None,
                     recursive: bool = False,
                     max_workers: int = 1) -> list:
        """
        批量处理文件
        Args:
            pattern: 文件匹配模式,如 "*.txt", "*.jpg"
            handler: 处理函数,接收文件路径参数
            recursive: 是否递归子目录
            max_workers: 并行工作线程数
        """
        if not handler:
            raise ValueError("必须提供处理函数")
        # 获取匹配的文件列表
        if recursive:
            files = list(self.input_dir.rglob(pattern))
        else:
            files = list(self.input_dir.glob(pattern))
        print(f"找到 {len(files)} 个匹配文件")
        results = []
        # 选择处理方式
        if max_workers > 1:
            # 并行处理
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = {executor.submit(handler, f): f for f in files}
                for future in as_completed(futures):
                    try:
                        result = future.result()
                        results.append(result)
                    except Exception as e:
                        print(f"处理出错 {futures[future]}: {e}")
        else:
            # 串行处理
            for file in files:
                try:
                    result = handler(file)
                    results.append(result)
                    print(f"处理完成: {file.name}")
                except Exception as e:
                    print(f"处理出错 {file.name}: {e}")
        return results
    @staticmethod
    def copy_handler(file_path: Path, dest_dir: Path):
        """复制文件处理函数示例"""
        import shutil
        dest_file = dest_dir / file_path.name
        shutil.copy2(file_path, dest_file)
        return f"复制: {file_path.name} -> {dest_file}"
    @staticmethod
    def rename_handler(file_path: Path, prefix: str = ""):
        """重命名处理函数示例"""
        new_name = f"{prefix}{file_path.stem}{file_path.suffix}"
        new_path = file_path.parent / new_name
        file_path.rename(new_path)
        return f"重命名: {file_path.name} -> {new_name}"
# 使用示例
def main():
    # 创建处理器
    processor = BatchProcessor("./data", "./output")
    # 示例1:批量复制txt文件
    def copy_txt(file_path):
        return BatchProcessor.copy_handler(file_path, processor.output_dir)
    results = processor.process_files(
        pattern="*.txt",
        handler=copy_txt,
        recursive=True,
        max_workers=4
    )
    # 示例2:自定义处理函数
    def process_csv(file_path):
        import pandas as pd
        df = pd.read_csv(file_path)
        # 这里可以进行各种数据处理
        output_path = processor.output_dir / f"processed_{file_path.name}"
        df.to_csv(output_path, index=False)
        return f"处理CSV: {file_path.name}"
    csv_results = processor.process_files(
        pattern="*.csv",
        handler=process_csv,
        max_workers=2
    )
if __name__ == "__main__":
    main()

实用技巧和注意事项

文件过滤和小批量测试

# 先处理少量文件测试
def safe_batch_process(input_dir, batch_size=10):
    files = list(Path(input_dir).glob("*.txt"))
    # 先测试处理前几个文件
    test_files = files[:batch_size]
    for file in test_files:
        # 处理逻辑
        pass
    # 确认没问题后再处理全部
    confirm = input(f"测试处理{batch_size}个文件成功,是否继续处理全部?(y/n): ")
    if confirm.lower() == 'y':
        for file in files[batch_size:]:
            # 处理剩余文件
            pass

错误处理和日志记录

import logging
from datetime import datetime
def setup_logging():
    """设置日志系统"""
    log_file = f"batch_process_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger(__name__)
def process_with_logging(file_path, logger):
    try:
        # 处理逻辑
        logger.info(f"成功处理: {file_path}")
        return True
    except Exception as e:
        logger.error(f"处理失败 {file_path}: {str(e)}")
        return False

这些案例涵盖了从基础到高级的批量文件处理方法,你可以根据具体需求选择合适的方案,建议在实际使用前先在测试数据上试运行,确保逻辑正确。

抱歉,评论功能暂时关闭!