Python案例如何合并分片文件?

wen python案例 9

本文目录导读:

Python案例如何合并分片文件?

  1. 基础文件合并
  2. 支持断点续传的合并
  3. 智能分片合并器
  4. 处理特殊格式文件
  5. 命令行工具版本
  6. 关键要点

我来为你详细介绍Python合并分片文件的几种常见方法。

基础文件合并

按顺序合并二进制文件

def merge_files_chunked(input_files, output_file, chunk_size=8192):
    """
    合并分片文件
    :param input_files: 分片文件列表(需按顺序)
    :param output_file: 输出文件路径
    :param chunk_size: 每次读取的块大小
    """
    with open(output_file, 'wb') as outfile:
        for file_path in input_files:
            with open(file_path, 'rb') as infile:
                while True:
                    chunk = infile.read(chunk_size)
                    if not chunk:
                        break
                    outfile.write(chunk)
    print(f"文件合并完成: {output_file}")
# 使用示例
file_parts = ['file.part1', 'file.part2', 'file.part3']
merge_files_chunked(file_parts, 'merged_file.mp4')

处理带有编号的文件

import os
import re
def merge_numbered_files(directory, output_file, pattern=r'part_(\d+)\.*'):
    """
    合并编号分片文件
    :param directory: 分片文件所在目录
    :param output_file: 输出文件路径
    :param pattern: 文件名匹配模式
    """
    # 获取并排序所有分片文件
    files = []
    for filename in os.listdir(directory):
        match = re.search(pattern, filename)
        if match:
            part_num = int(match.group(1))
            files.append((part_num, os.path.join(directory, filename)))
    # 按编号排序
    files.sort(key=lambda x: x[0])
    # 合并文件
    with open(output_file, 'wb') as outfile:
        for _, file_path in files:
            with open(file_path, 'rb') as infile:
                outfile.write(infile.read())
                print(f"已合并: {os.path.basename(file_path)}")
    print(f"合并完成!输出文件: {output_file}")
# 使用示例
merge_numbered_files('./chunks', 'merged_result.zip')

支持断点续传的合并

import hashlib
def merge_with_progress(input_files, output_file):
    """
    带进度显示的合并
    """
    total_size = 0
    file_sizes = []
    # 计算总大小
    for file_path in input_files:
        size = os.path.getsize(file_path)
        file_sizes.append(size)
        total_size += size
    # 开始合并
    merged_size = 0
    with open(output_file, 'wb') as outfile:
        for i, (file_path, size) in enumerate(zip(input_files, file_sizes)):
            print(f"合并分片 {i+1}/{len(input_files)}: {os.path.basename(file_path)}")
            with open(file_path, 'rb') as infile:
                while True:
                    chunk = infile.read(8192)
                    if not chunk:
                        break
                    outfile.write(chunk)
                    merged_size += len(chunk)
                    # 显示进度
                    progress = (merged_size / total_size) * 100
                    print(f"\r进度: {progress:.1f}%", end='')
            print()  # 换行
    print(f"\n合并完成!文件大小: {merged_size / 1024 / 1024:.2f} MB")
def merge_with_checksum(input_files, output_file, checksum_file=None):
    """
    合并并校验文件完整性
    """
    sha256_hash = hashlib.sha256()
    with open(output_file, 'wb') as outfile:
        for file_path in input_files:
            with open(file_path, 'rb') as infile:
                for chunk in iter(lambda: infile.read(4096), b''):
                    outfile.write(chunk)
                    sha256_hash.update(chunk)
    final_hash = sha256_hash.hexdigest()
    print(f"文件合并完成,SHA256: {final_hash}")
    # 验证校验和
    if checksum_file:
        with open(checksum_file, 'r') as f:
            expected_hash = f.read().strip()
            if final_hash == expected_hash:
                print("✓ 校验通过,文件完整")
            else:
                print("✗ 校验失败,文件可能损坏")
    return final_hash
# 使用示例
files = ['video.part1', 'video.part2', 'video.part3']
merge_with_progress(files, 'complete_video.mp4')
merge_with_checksum(files, 'video.mp4', 'checksum.txt')

智能分片合并器

import json
from typing import List, Optional
class SmartFileMerger:
    """智能文件合并器"""
    def __init__(self, manifest_file: Optional[str] = None):
        self.manifest = {}
        if manifest_file:
            self.load_manifest(manifest_file)
    def load_manifest(self, manifest_file: str):
        """加载清单文件"""
        with open(manifest_file, 'r') as f:
            self.manifest = json.load(f)
    def analyze_files(self, directory: str, pattern: str = "*.part*"):
        """分析分片文件"""
        import glob
        files = glob.glob(os.path.join(directory, pattern))
        # 提取信息
        self.manifest['directory'] = directory
        self.manifest['files'] = []
        self.manifest['total_size'] = 0
        for file_path in sorted(files):
            size = os.path.getsize(file_path)
            self.manifest['files'].append({
                'name': os.path.basename(file_path),
                'path': file_path,
                'size': size
            })
            self.manifest['total_size'] += size
        # 保存清单
        self.save_manifest(os.path.join(directory, 'manifest.json'))
        return self.manifest
    def save_manifest(self, file_path: str):
        """保存清单文件"""
        with open(file_path, 'w') as f:
            json.dump(self.manifest, f, indent=2)
    def merge(self, output_file: str, verify: bool = True):
        """合并所有分片"""
        if not self.manifest.get('files'):
            raise ValueError("没有要合并的文件")
        total_size = self.manifest['total_size']
        merged_size = 0
        print(f"开始合并 {len(self.manifest['files'])} 个分片文件")
        print(f"总大小: {total_size / 1024 / 1024:.2f} MB")
        # 创建圆环进度条
        from itertools import cycle
        spinner = cycle(['-', '/', '|', '\\'])
        sha256_hash = hashlib.sha256()
        with open(output_file, 'wb') as outfile:
            for i, file_info in enumerate(self.manifest['files']):
                file_path = file_info['path']
                file_size = file_info['size']
                print(f"\n合并分片 {i+1}/{len(self.manifest['files'])}: "
                      f"{file_info['name']} ({file_size/1024/1024:.2f} MB)")
                with open(file_path, 'rb') as infile:
                    while True:
                        chunk = infile.read(8192)
                        if not chunk:
                            break
                        outfile.write(chunk)
                        merged_size += len(chunk)
                        sha256_hash.update(chunk)
                        # 更新进度
                        progress = (merged_size / total_size) * 100
                        print(f"\r{next(spinner)} 进度: {progress:.2f}% "
                              f"({merged_size/1024/1024:.2f} MB)", end='')
        print(f"\n\n合并完成!输出文件: {output_file}")
        print(f"文件大小: {os.path.getsize(output_file) / 1024 / 1024:.2f} MB")
        if verify:
            self._verify_checksum(output_file, sha256_hash.hexdigest())
    def _verify_checksum(self, file_path: str, hash_value: str):
        """校验文件完整性"""
        print(f"SHA256: {hash_value}")
        if 'checksum' in self.manifest:
            if hash_value == self.manifest['checksum']:
                print("✓ 文件完整性验证通过")
            else:
                print("✗ 文件完整性验证失败!")
# 使用示例
merger = SmartFileMerger()
# 分析分片文件
merger.analyze_files('./downloads', 'video_*.part*')
# 执行合并
merger.merge('final_video.mp4', verify=True)

处理特殊格式文件

def merge_text_files_with_encoding(input_files, output_file, encoding='utf-8'):
    """
    合并文本文件(处理编码)
    """
    with open(output_file, 'w', encoding=encoding) as outfile:
        for file_path in input_files:
            with open(file_path, 'r', encoding=encoding) as infile:
                outfile.write(infile.read())
                outfile.write('\n')  # 添加换行符
def merge_csv_files(input_files, output_file, exclude_header=False):
    """
    合并CSV文件
    """
    import csv
    header_written = False
    with open(output_file, 'w', newline='', encoding='utf-8') as outfile:
        writer = csv.writer(outfile)
        for i, file_path in enumerate(input_files):
            with open(file_path, 'r', encoding='utf-8') as infile:
                reader = csv.reader(infile)
                for j, row in enumerate(reader):
                    # 跳过标题行(除了第一个文件)
                    if exclude_header and j == 0 and i > 0:
                        continue
                    writer.writerow(row)
# 使用示例
merge_csv_files(['data_part1.csv', 'data_part2.csv'], 
                'merged_data.csv', 
                exclude_header=True)

命令行工具版本

#!/usr/bin/env python3
import argparse
import sys
def main():
    parser = argparse.ArgumentParser(description='文件分片合并工具')
    parser.add_argument('output', help='输出文件路径')
    parser.add_argument('input', nargs='+', help='输入分片文件列表')
    parser.add_argument('--chunk-size', type=int, default=8192,
                       help='读取缓冲区大小(字节)')
    parser.add_argument('--verify', action='store_true',
                       help='计算并显示SHA256校验和')
    args = parser.parse_args()
    merger = SmartFileMerger()
    merger.manifest['files'] = [
        {'name': f, 'path': f, 'size': os.path.getsize(f)}
        for f in sorted(args.input)
    ]
    merger.manifest['total_size'] = sum(
        f['size'] for f in merger.manifest['files']
    )
    merger.merge(args.output, args.verify)
if __name__ == '__main__':
    main()
# 命令行使用
# python file_merger.py output.mp4 part1 part2 part3 --verify

关键要点

  1. 文件顺序:确保按正确顺序合并
  2. 缓冲区大小:根据文件大小选择合适的缓冲区
  3. 内存管理:分批读取,避免内存溢出
  4. 完整性验证:使用校验和确保文件完整
  5. 错误处理:添加适当的异常处理

选择哪种方法取决于你的具体需求:文件大小、分片方式、是否需要续传等。

抱歉,评论功能暂时关闭!