如何用Python案例实现批量加水印?

wen python案例 2

本文目录导读:

如何用Python案例实现批量加水印?

  1. 使用 PIL/Pillow 库(图片水印)
  2. 使用 OpenCV(更高级的水印)
  3. 批量处理视频水印
  4. 完整的命令行工具
  5. 使用方法
  6. 特点

下面是一个完整的 Python 批量加水印案例,包含详细的代码、说明和可配置项。

使用 PIL/Pillow 库(图片水印)

安装依赖

pip install Pillow

完整代码

import os
from PIL import Image, ImageDraw, ImageFont
from pathlib import Path
class ImageWatermark:
    def __init__(self, input_dir="input", output_dir="output", 
                 watermark_text="Watermark", font_size=50,
                 opacity=128, position="center"):
        """
        图片水印处理类
        Args:
            input_dir: 输入文件夹
            output_dir: 输出文件夹
            watermark_text: 水印文字
            font_size: 字体大小
            opacity: 透明度 (0-255)
            position: 位置 (center, top-left, top-right, bottom-left, bottom-right, tile)
        """
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.watermark_text = watermark_text
        self.font_size = font_size
        self.opacity = opacity
        self.position = position
        # 确保输出目录存在
        Path(output_dir).mkdir(parents=True, exist_ok=True)
    def get_font(self):
        """获取字体,尝试系统字体"""
        try:
            # Windows 系统字体
            font_path = "C:/Windows/Fonts/msyh.ttc"  # 微软雅黑
            return ImageFont.truetype(font_path, self.font_size)
        except:
            try:
                # Linux/Mac 系统字体
                return ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", self.font_size)
            except:
                # 默认字体
                return ImageFont.load_default()
    def get_watermark_position(self, img, text_size):
        """计算水印位置"""
        width, height = img.size
        text_width, text_height = text_size
        positions = {
            "center": ((width - text_width) // 2, (height - text_height) // 2),
            "top-left": (20, 20),
            "top-right": (width - text_width - 20, 20),
            "bottom-left": (20, height - text_height - 20),
            "bottom-right": (width - text_width - 20, height - text_height - 20),
            "tile": None  # 平铺模式需要特殊处理
        }
        return positions.get(self.position, positions["center"])
    def add_watermark_single(self, img_path):
        """为单张图片添加水印"""
        try:
            # 打开图片并转换为RGBA模式以支持透明度
            img = Image.open(img_path).convert("RGBA")
            # 创建水印层
            watermark_layer = Image.new("RGBA", img.size, (0, 0, 0, 0))
            draw = ImageDraw.Draw(watermark_layer)
            # 获取字体
            font = self.get_font()
            # 计算文字大小
            text_bbox = draw.textbbox((0, 0), self.watermark_text, font=font)
            text_width = text_bbox[2] - text_bbox[0]
            text_height = text_bbox[3] - text_bbox[1]
            text_size = (text_width, text_height)
            if self.position == "tile":
                # 平铺水印
                x, y = 0, 0
                while y < img.height:
                    while x < img.width:
                        draw.text((x, y), self.watermark_text, 
                                 font=font, fill=(255, 255, 255, self.opacity))
                        x += text_width + 50  # 间距
                    x = 0
                    y += text_height + 50
            else:
                # 单次水印
                position = self.get_watermark_position(img, text_size)
                draw.text(position, self.watermark_text, 
                         font=font, fill=(255, 255, 255, self.opacity))
            # 合并图片和水印层
            watermarked = Image.alpha_composite(img, watermark_layer)
            # 转换回RGB模式保存
            output_path = os.path.join(self.output_dir, os.path.basename(img_path))
            watermarked.convert("RGB").save(output_path, "JPEG", quality=95)
            print(f"✅ 已处理: {os.path.basename(img_path)}")
            return True
        except Exception as e:
            print(f"❌ 处理失败 {img_path}: {e}")
            return False
    def process_batch(self):
        """批量处理所有图片"""
        # 支持的图片格式
        image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff'}
        # 获取输入目录中所有图片
        images = []
        for ext in image_extensions:
            images.extend(Path(self.input_dir).glob(f"*{ext}"))
            images.extend(Path(self.input_dir).glob(f"*{ext.upper()}"))
        if not images:
            print(f"⚠️ 在 {self.input_dir} 目录未找到图片文件")
            return
        print(f"📷 找到 {len(images)} 张图片")
        print("🚀 开始批量处理...\n")
        success_count = 0
        for img_path in images:
            if self.add_watermark_single(str(img_path)):
                success_count += 1
        print(f"\n✅ 处理完成! 成功: {success_count}/{len(images)}")
# 使用示例
if __name__ == "__main__":
    # 创建输入输出目录
    Path("input_images").mkdir(exist_ok=True)
    Path("output_images").mkdir(exist_ok=True)
    # 配置水印参数
    watermarker = ImageWatermark(
        input_dir="input_images",
        output_dir="output_images",
        watermark_text="© 2024 Your Company",  # 水印文字
        font_size=40,                           # 字体大小
        opacity=128,                            # 透明度 (0=透明, 255=不透明)
        position="bottom-right"                 # 位置
    )
    # 批量处理
    watermarker.process_batch()

使用 OpenCV(更高级的水印)

安装依赖

pip install opencv-python numpy

图片水印(Logo方式)

import cv2
import numpy as np
from pathlib import Path
def add_logo_watermark(input_dir, output_dir, logo_path, alpha=0.3, position="bottom-right"):
    """
    使用图片作为水印
    Args:
        input_dir: 输入目录
        output_dir: 输出目录
        logo_path: 水印logo路径
        alpha: 透明度
        position: 位置
    """
    # 读取logo并确保有透明通道
    logo = cv2.imread(logo_path, cv2.IMREAD_UNCHANGED)
    if logo is None:
        print("❌ 无法读取logo图片")
        return
    # 如果logo没有alpha通道,添加
    if logo.shape[2] == 3:
        logo = cv2.cvtColor(logo, cv2.COLOR_BGR2BGRA)
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    # 支持的图片格式
    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp'}
    images = []
    for ext in image_extensions:
        images.extend(Path(input_dir).glob(f"*{ext}"))
        images.extend(Path(input_dir).glob(f"*{ext.upper()}"))
    for img_path in images:
        # 读取图片
        img = cv2.imread(str(img_path), cv2.IMREAD_UNCHANGED)
        if img.shape[2] == 3:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA)
        # 调整logo大小(按比例)
        logo_height = int(img.shape[0] * 0.1)  # 图片高度的10%
        aspect_ratio = logo.shape[1] / logo.shape[0]
        logo_width = int(logo_height * aspect_ratio)
        logo_resized = cv2.resize(logo, (logo_width, logo_height))
        # 计算位置
        positions = {
            "top-left": (20, 20),
            "top-right": (img.shape[1] - logo_width - 20, 20),
            "bottom-left": (20, img.shape[0] - logo_height - 20),
            "bottom-right": (img.shape[1] - logo_width - 20, img.shape[0] - logo_height - 20)
        }
        pos = positions.get(position, positions["bottom-right"])
        # ROI区域
        roi = img[pos[1]:pos[1]+logo_height, pos[0]:pos[0]+logo_width]
        # 创建mask
        logo_rgba = logo_resized[:, :, :]
        logo_rgb = logo_rgba[:, :, :3]
        mask = logo_rgba[:, :, 3] / 255.0
        # 叠加水印
        for c in range(3):
            roi[:, :, c] = (1 - alpha * mask) * roi[:, :, c] + alpha * mask * logo_rgb[:, :, c]
        # 保存结果
        output_path = os.path.join(output_dir, os.path.basename(str(img_path)))
        cv2.imwrite(output_path, img)
        print(f"✅ 已处理: {os.path.basename(str(img_path))}")
# 使用示例
# add_logo_watermark("input_images", "output_images", "logo.png")

批量处理视频水印

import cv2
from pathlib import Path
def add_video_watermark(input_video, output_video, watermark_text, position="bottom-right"):
    """
    为视频添加文字水印
    Args:
        input_video: 输入视频路径
        output_video: 输出视频路径
        watermark_text: 水印文字
        position: 水印位置
    """
    cap = cv2.VideoCapture(input_video)
    # 获取视频属性
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # 创建视频写入器
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))
    # 字体设置
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 1
    font_thickness = 2
    text_size = cv2.getTextSize(watermark_text, font, font_scale, font_thickness)[0]
    # 计算位置
    positions = {
        "top-left": (20, 30),
        "top-right": (width - text_size[0] - 20, 30),
        "bottom-left": (20, height - 20),
        "bottom-right": (width - text_size[0] - 20, height - 20)
    }
    pos = positions.get(position, positions["bottom-right"])
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        # 添加水印
        cv2.putText(frame, watermark_text, pos, font, font_scale, 
                   (255, 255, 255), font_thickness, cv2.LINE_AA)
        out.write(frame)
        frame_count += 1
        if frame_count % 30 == 0:
            print(f"⏳ 进度: {frame_count}/{total_frames}")
    cap.release()
    out.release()
    print(f"✅ 视频处理完成: {output_video}")

完整的命令行工具

创建一个命令行可执行的批量水印工具 batch_watermarker.py

import argparse
import sys
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
def main():
    parser = argparse.ArgumentParser(description="批量图片水印工具")
    parser.add_argument("input", help="输入文件夹路径")
    parser.add_argument("-o", "--output", default="output", help="输出文件夹路径")
    parser.add_argument("-t", "--text", default="Watermark", help="水印文字")
    parser.add_argument("-s", "--size", type=int, default=40, help="字体大小")
    parser.add_argument("-a", "--alpha", type=int, default=128, help="透明度 (0-255)")
    parser.add_argument("-p", "--position", default="center", 
                       choices=["center", "top-left", "top-right", "bottom-left", "bottom-right", "tile"],
                       help="水印位置")
    parser.add_argument("-f", "--font", help="自定义字体路径")
    args = parser.parse_args()
    # 检查输入目录
    if not Path(args.input).exists():
        print(f"❌ 输入目录不存在: {args.input}")
        sys.exit(1)
    # 创建水印处理器
    watermarker = ImageWatermark(
        input_dir=args.input,
        output_dir=args.output,
        watermark_text=args.text,
        font_size=args.size,
        opacity=args.alpha,
        position=args.position
    )
    # 如果提供了自定义字体
    if args.font and Path(args.font).exists():
        from PIL import ImageFont
        watermarker.font_path = args.font
    # 开始处理
    watermarker.process_batch()
if __name__ == "__main__":
    main()

使用方法

基本使用

# 处理 input 目录中的所有图片
python batch_watermarker.py input_images
# 自定义水印文字和位置
python batch_watermarker.py input_images -t "© 2024" -p "bottom-right" -s 50
# 设置透明度和字体大小
python batch_watermarker.py input_images -t "Confidential" -a 100 -s 60

目录结构

project/
├── input_images/        # 放入需要加水印的图片
│   ├── photo1.jpg
│   ├── photo2.jpg
│   └── photo3.png
├── output_images/       # 处理后的图片将保存到这里
├── batch_watermarker.py # 主程序
└── logo.png            # (可选)Logo水印图片

特点

  • 批量处理:自动处理整个文件夹的图片
  • 多种位置:支持6种水印位置
  • 透明度控制:可调整水印透明度
  • 多格式支持:支持JPG、PNG、BMP等格式
  • 错误处理:单个文件失败不影响其他文件
  • 进度显示:实时显示处理进度

需要根据你的具体需求调整参数即可。

抱歉,评论功能暂时关闭!