如何用Python案例实现批量图片压缩?

wen python案例 4

本文目录导读:

如何用Python案例实现批量图片压缩?

  1. 使用PIL/Pillow库压缩图片
  2. 进阶版本:支持更多选项
  3. 带命令行界面的版本
  4. 使用说明

我来为您介绍一个使用Python实现批量图片压缩的完整案例。

使用PIL/Pillow库压缩图片

安装依赖

pip install Pillow

基础压缩实现

import os
from PIL import Image
from pathlib import Path
def compress_image(input_path, output_path, quality=85):
    """
    压缩单张图片
    Args:
        input_path: 输入图片路径
        output_path: 输出图片路径
        quality: 压缩质量 (1-100),越小压缩率越高
    """
    try:
        # 打开图片
        img = Image.open(input_path)
        # 如果图片有透明度,转换为RGB
        if img.mode in ('RGBA', 'LA', 'P'):
            rgb_img = Image.new('RGB', img.size, (255, 255, 255))
            rgb_img.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
            img = rgb_img
        # 保存压缩后的图片
        img.save(output_path, 'JPEG', optimize=True, quality=quality)
        return True
    except Exception as e:
        print(f"压缩失败 {input_path}: {str(e)}")
        return False
def batch_compress_images(input_dir, output_dir, quality=85, file_types=['.jpg', '.jpeg', '.png']):
    """
    批量压缩图片
    Args:
        input_dir: 输入目录
        output_dir: 输出目录
        quality: 压缩质量
        file_types: 要处理的文件类型列表
    """
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    # 统计信息
    total_files = 0
    compressed_files = 0
    total_original_size = 0
    total_compressed_size = 0
    # 遍历输入目录
    for file_path in Path(input_dir).rglob('*'):
        if file_path.suffix.lower() in file_types:
            total_files += 1
            original_size = file_path.stat().st_size
            total_original_size += original_size
            # 构建输出路径(保持目录结构)
            relative_path = file_path.relative_to(input_dir)
            output_path = Path(output_dir) / relative_path
            output_path.parent.mkdir(parents=True, exist_ok=True)
            # 压缩图片
            if compress_image(str(file_path), str(output_path), quality):
                compressed_size = output_path.stat().st_size
                total_compressed_size += compressed_size
                compressed_files += 1
                # 计算压缩率
                compression_ratio = (1 - compressed_size / original_size) * 100
                print(f"✓ {relative_path}: {original_size/1024:.1f}KB -> {compressed_size/1024:.1f}KB ({compression_ratio:.1f}%)")
            else:
                print(f"✗ {relative_path}: 压缩失败")
    # 输出统计信息
    print("\n" + "="*50)
    print(f"压缩完成统计:")
    print(f"处理文件数: {compressed_files}/{total_files}")
    print(f"原始总大小: {total_original_size/1024/1024:.2f}MB")
    print(f"压缩后大小: {total_compressed_size/1024/1024:.2f}MB")
    print(f"总压缩率: {(1-total_compressed_size/total_original_size)*100:.1f}%")
# 使用示例
if __name__ == "__main__":
    # 压缩当前目录下的images文件夹中的图片到compressed_images文件夹
    batch_compress_images(
        input_dir="./images",
        output_dir="./compressed_images",
        quality=85,
        file_types=['.jpg', '.jpeg', '.png']
    )

进阶版本:支持更多选项

import os
from PIL import Image
from pathlib import Path
from typing import List, Optional
import concurrent.futures
class ImageCompressor:
    """图片压缩器类"""
    def __init__(self, quality=85, max_size=None, scale_factor=1.0):
        """
        初始化压缩器
        Args:
            quality: JPEG压缩质量
            max_size: 最大尺寸(像素),如果图片长边超过此值则进行缩放
            scale_factor: 缩放因子
        """
        self.quality = quality
        self.max_size = max_size
        self.scale_factor = scale_factor
    def compress_single(self, input_path: str, output_path: str) -> Optional[dict]:
        """
        压缩单张图片,返回压缩信息
        Returns:
            压缩信息字典,失败返回None
        """
        try:
            img = Image.open(input_path)
            # 处理透明度
            if img.mode in ('RGBA', 'LA', 'P'):
                if img.mode == 'P':
                    img = img.convert('RGBA')
                background = Image.new('RGB', img.size, (255, 255, 255))
                background.paste(img, mask=img.split()[-1])
                img = background
            elif img.mode != 'RGB':
                img = img.convert('RGB')
            # 缩放处理
            if self.max_size or self.scale_factor != 1.0:
                width, height = img.size
                if self.scale_factor != 1.0:
                    new_width = int(width * self.scale_factor)
                    new_height = int(height * self.scale_factor)
                elif self.max_size:
                    ratio = min(self.max_size / width, self.max_size / height)
                    new_width = int(width * ratio)
                    new_height = int(height * ratio)
                img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
            # 根据原图格式选择保存方式
            suffix = Path(input_path).suffix.lower()
            if suffix in ['.png']:
                # PNG格式使用不同的保存参数
                img.save(output_path, 'PNG', optimize=True)
            else:
                img.save(output_path, 'JPEG', optimize=True, quality=self.quality)
            # 返回压缩信息
            original_size = Path(input_path).stat().st_size
            compressed_size = Path(output_path).stat().st_size
            return {
                'path': input_path,
                'original_size': original_size,
                'compressed_size': compressed_size,
                'compression_ratio': (1 - compressed_size / original_size) * 100
            }
        except Exception as e:
            print(f"压缩失败 {input_path}: {str(e)}")
            return None
    def batch_compress(self, input_dir: str, output_dir: str, 
                      file_types: List[str] = None, 
                      max_workers: int = 4,
                      preserve_structure: bool = True):
        """
        批量压缩图片
        Args:
            input_dir: 输入目录
            output_dir: 输出目录
            file_types: 文件类型列表
            max_workers: 并行工作线程数
            preserve_structure: 是否保持目录结构
        """
        if file_types is None:
            file_types = ['.jpg', '.jpeg', '.png', '.bmp', '.webp']
        os.makedirs(output_dir, exist_ok=True)
        # 收集所有图片文件
        image_files = []
        for file_path in Path(input_dir).rglob('*'):
            if file_path.suffix.lower() in [ft.lower() for ft in file_types]:
                image_files.append(file_path)
        print(f"找到 {len(image_files)} 个图片文件")
        # 使用线程池并行处理
        results = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for file_path in image_files:
                if preserve_structure:
                    relative_path = file_path.relative_to(input_dir)
                    output_path = Path(output_dir) / relative_path
                else:
                    output_path = Path(output_dir) / file_path.name
                output_path.parent.mkdir(parents=True, exist_ok=True)
                # 提交任务
                future = executor.submit(
                    self.compress_single,
                    str(file_path),
                    str(output_path)
                )
                futures.append(future)
            # 收集结果
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result:
                    results.append(result)
                    print(f"✓ {Path(result['path']).name}: "
                          f"{result['original_size']/1024:.1f}KB → "
                          f"{result['compressed_size']/1024:.1f}KB "
                          f"({result['compression_ratio']:.1f}%)")
        # 输出汇总
        self._print_summary(results)
    def _print_summary(self, results: List[dict]):
        """打印汇总信息"""
        if not results:
            print("没有成功处理的文件")
            return
        total_original = sum(r['original_size'] for r in results)
        total_compressed = sum(r['compressed_size'] for r in results)
        avg_ratio = sum(r['compression_ratio'] for r in results) / len(results)
        print("\n" + "="*50)
        print("压缩完成统计:")
        print(f"成功处理: {len(results)} 个文件")
        print(f"原始总大小: {total_original/1024/1024:.2f} MB")
        print(f"压缩后大小: {total_compressed/1024/1024:.2f} MB")
        print(f"总压缩率: {(1-total_compressed/total_original)*100:.1f}%")
        print(f"平均压缩率: {avg_ratio:.1f}%")
# 使用示例
def main():
    # 创建压缩器实例
    compressor = ImageCompressor(
        quality=80,        # 压缩质量
        max_size=1920,      # 最大边长限制
        scale_factor=1.0    # 缩放因子(1.0保持不变)
    )
    # 批量压缩(保持目录结构)
    compressor.batch_compress(
        input_dir="./images",
        output_dir="./compressed",
        file_types=['.jpg', '.jpeg', '.png'],
        max_workers=4
    )
if __name__ == "__main__":
    main()

带命令行界面的版本

import argparse
from pathlib import Path
def main_cli():
    """命令行界面"""
    parser = argparse.ArgumentParser(description='批量图片压缩工具')
    parser.add_argument('input_dir', help='输入目录')
    parser.add_argument('output_dir', help='输出目录')
    parser.add_argument('-q', '--quality', type=int, default=85, 
                       help='压缩质量 (1-100),默认85')
    parser.add_argument('-m', '--max-size', type=int, default=None,
                       help='最大尺寸,超过则缩放')
    parser.add_argument('-s', '--scale', type=float, default=1.0,
                       help='缩放因子,默认1.0')
    parser.add_argument('-w', '--workers', type=int, default=4,
                       help='并行处理数量,默认4')
    parser.add_argument('-t', '--types', nargs='+', 
                       default=['.jpg', '.jpeg', '.png'],
                       help='处理的文件类型')
    args = parser.parse_args()
    # 验证参数
    if not Path(args.input_dir).exists():
        print(f"输入目录不存在: {args.input_dir}")
        return
    if args.quality < 1 or args.quality > 100:
        print("质量参数必须在1-100之间")
        return
    # 创建压缩器并执行
    compressor = ImageCompressor(
        quality=args.quality,
        max_size=args.max_size,
        scale_factor=args.scale
    )
    compressor.batch_compress(
        input_dir=args.input_dir,
        output_dir=args.output_dir,
        file_types=args.types,
        max_workers=args.workers
    )
if __name__ == "__main__":
    main_cli()

使用说明

命令行使用

# 基本使用
python image_compressor.py ./images ./compressed
# 指定质量参数
python image_compressor.py ./images ./compressed -q 70
# 限制图片大小
python image_compressor.py ./images ./compressed -m 1920
# 缩放图片
python image_compressor.py ./images ./compressed -s 0.5
# 自定义文件类型
python image_compressor.py ./images ./compressed -t .jpg .png
# 使用更多线程
python image_compressor.py ./images ./compressed -w 8

作为模块使用

from image_compressor import ImageCompressor
# 创建压缩器
compressor = ImageCompressor(quality=80, max_size=1920)
# 批量压缩
compressor.batch_compress(
    input_dir="./photos",
    output_dir="./compressed_photos"
)

这个方案具有以下特点:

  1. 性能优化:使用线程池并行处理,提高处理速度
  2. 质量可控:可调整压缩质量和缩放比例
  3. 格式支持:支持常见图片格式(JPG、PNG、BMP等)
  4. 目录结构保持:可选择保持原始目录结构
  5. 错误处理:单个文件失败不影响其他文件处理
  6. 统计信息:输出压缩前后的详细信息

您可以根据实际需求选择合适的版本或进行定制。

抱歉,评论功能暂时关闭!