如何用Python案例实现文件分片上传?

wen python案例 3

Python实战案例与核心原理详解

目录导读

  1. 为什么需要文件分片上传?
  2. 文件分片上传的核心流程
  3. Python实现文件分片上传的完整案例
  4. 代码逐段详解:切片、上传、合并
  5. 常见问题与优化技巧(含问答)
  6. 安全性与并发控制建议

为什么需要文件分片上传?

在网络传输中,大文件(如视频、高清图片、日志压缩包)直接上传会面临三大问题:

如何用Python案例实现文件分片上传?

  • 网络波动导致失败:一旦连接中断,整个文件需重传
  • 超时限制:服务器或浏览器可能对单次请求有大小限制(如Nginx默认1MB)
  • 内存溢出:Python读取整个大文件会耗尽内存

分片上传的核心思想是:将大文件切分为多个小块(通常1-10MB),分别上传,最后在服务端合并,这能显著提升上传的稳定性和用户体验。


文件分片上传的核心流程

  1. 客户端切片:按固定大小(如5MB)读取文件片段,生成唯一标识(如文件MD5+分片序号)
  2. 分片上传:将每个分片通过HTTP POST请求发送到服务器
  3. 服务端暂存:服务器接收分片,写入临时目录,记录已上传的分片列表
  4. 合并确认:所有分片上传完成后,客户端发起合并请求,服务端将分片按序合并为原始文件
  5. 清理临时文件:合并成功后删除分片文件

Python实现文件分片上传的完整案例

以下案例使用requests库实现客户端上传,Flask实现服务端接收。代码可直接运行测试

客户端代码(client.py)

import os  
import hashlib  
import requests  
def split_and_upload(file_path, chunk_size=5*1024*1024, server_url="http://127.0.0.1:5000/upload"):  
    """  
    将文件分片并逐片上传到服务器  
    :param file_path: 本地文件路径  
    :param chunk_size: 每片大小(默认5MB)  
    :param server_url: 服务器上传接口  
    """  
    # 计算文件MD5作为全局标识  
    file_md5 = hashlib.md5(open(file_path, 'rb').read()).hexdigest()  
    total_size = os.path.getsize(file_path)  
    chunk_count = (total_size + chunk_size - 1) // chunk_size  
    with open(file_path, 'rb') as f:  
        for chunk_index in range(chunk_count):  
            start = chunk_index * chunk_size  
            f.seek(start)  
            chunk_data = f.read(chunk_size)  
            # 构造分片元数据  
            params = {  
                'file_md5': file_md5,  
                'chunk_index': chunk_index,  
                'total_chunks': chunk_count,  
                'filename': os.path.basename(file_path)  
            }  
            # 上传分片(注意:files参数以'chunk'字段发送二进制数据)  
            response = requests.post(  
                server_url,  
                files={'chunk': chunk_data},  
                data=params  
            )  
            if response.status_code == 200:  
                print(f"分片 {chunk_index+1}/{chunk_count} 上传成功")  
            else:  
                print(f"分片 {chunk_index+1} 上传失败: {response.text}")  
                return False  
    # 所有分片上传完毕,通知服务器合并  
    merge_url = server_url + "/merge"  
    merge_resp = requests.post(merge_url, data={'file_md5': file_md5})  
    if merge_resp.status_code == 200:  
        print("文件合并成功!")  
        return True  
    else:  
        print(f"合并失败: {merge_resp.text}")  
        return False  

服务端代码(server.py)

from flask import Flask, request, jsonify  
import os  
app = Flask(__name__)  
UPLOAD_DIR = "./temp_chunks"  
os.makedirs(UPLOAD_DIR, exist_ok=True)  
@app.route('/upload', methods=['POST'])  
def upload_chunk():  
    file_md5 = request.form.get('file_md5')  
    chunk_index = int(request.form.get('chunk_index'))  
    total_chunks = int(request.form.get('total_chunks'))  
    filename = request.form.get('filename')  
    chunk_data = request.files.get('chunk')  
    if not all([file_md5, chunk_index, total_chunks, chunk_data]):  
        return jsonify({"error": "缺少参数"}), 400  
    # 按分片序号保存为临时文件(md5_0, md5_1,...)  
    chunk_dir = os.path.join(UPLOAD_DIR, file_md5)  
    os.makedirs(chunk_dir, exist_ok=True)  
    chunk_path = os.path.join(chunk_dir, str(chunk_index))  
    chunk_data.save(chunk_path)  
    # 检查是否所有分片已上传  
    uploaded_count = len(os.listdir(chunk_dir))  
    if uploaded_count == total_chunks:  
        return jsonify({"status": "chunk_received", "all_uploaded": True})  
    return jsonify({"status": "chunk_received", "all_uploaded": False})  
@app.route('/upload/merge', methods=['POST'])  
def merge_chunks():  
    file_md5 = request.form.get('file_md5')  
    chunk_dir = os.path.join(UPLOAD_DIR, file_md5)  
    if not os.path.exists(chunk_dir):  
        return jsonify({"error": "分片目录不存在"}), 404  
    # 按序号排序并合并  
    chunk_files = sorted(os.listdir(chunk_dir), key=lambda x: int(x))  
    original_filename = request.form.get('filename', 'merged_file')  
    output_path = os.path.join(UPLOAD_DIR, original_filename)  
    with open(output_path, 'wb') as out_f:  
        for chunk_name in chunk_files:  
            chunk_path = os.path.join(chunk_dir, chunk_name)  
            with open(chunk_path, 'rb') as in_f:  
                out_f.write(in_f.read())  
    # 删除临时分片目录  
    import shutil  
    shutil.rmtree(chunk_dir)  
    return jsonify({"status": "merged", "path": output_path})  
if __name__ == '__main__':  
    app.run(host='0.0.0.0', port=5000, debug=True)  

代码逐段详解:切片、上传、合并

客户端切片逻辑

  • chunk_size:控制每片大小,建议5-10MB,平衡网络请求次数与失败重试成本
  • file_md5:确保不同文件即使同名也能区分(避免合并错乱)
  • f.seek(start):直接从指定位置读取,避免一次性加载整个文件

上传与容错

  • 每个分片通过requests.postfiles参数发送,Flask端用request.files['chunk']接收
  • 若单个分片上传失败,可加入重试机制(最多3次,间隔2秒)

服务端合并关键点

  • 排序合并:分片名使用序号(0,1,2...),sorted按整数排序保证顺序
  • 原子性检查:合并前检查分片数量是否等于total_chunks,避免缺片文件

前端调用示例

# 调用客户端  
split_and_upload("large_video.mp4", chunk_size=10*1024*1024)  

常见问题与优化技巧(问答)

问1:如何保证分片上传的完整性?
答:客户端可为每个分片计算SHA256值,服务端在接收后验证,也可在合并完成后,对比整体文件的MD5。

问2:网络中断后如何断点续传?
答:服务端增加/check接口,返回已上传的分片序号列表,客户端在开始上传前先查询缺失分片,只上传未完成的部分。

问3:分片上传速度太慢怎么办?
答:启用并发上传,使用ThreadPoolExecutor同时上传3-5个分片,注意控制并发数,避免触发服务器限流。

问4:分片存储临时目录会不会占满磁盘?
答:设置定时任务(如cron)删除超过24小时的临时分片目录,合并成功后立即清理。

问5:如何防止恶意上传?
答:增加身份验证Token(如JWT),限制单文件最大50GB,并对分片数量做上限检查(如最大1000片)。


安全性与并发控制建议

  1. 接口鉴权:所有上传/合并接口需携带有效的Token,防止非法调用
  2. 并发处理:服务端使用Redis记录每个文件的上传进度,避免多用户在合并阶段产生冲突
  3. 文件类型校验:合并完成后检查MIME类型或魔数,防止上传可执行文件
  4. 日志记录:详细记录每个分片的上传时间、IP、文件名称,便于排查异常

通过上述案例和原理,你已掌握Python实现文件分片上传的核心技术,实际生产环境中,可将客户端集成到Web应用中(如Vue + Axios),服务端改用Gunicorn部署以支持高并发,这种方案在视频平台、网盘系统中被广泛使用,是处理大文件传输的“标准答案”。

抱歉,评论功能暂时关闭!